Skip to content

Commit

Permalink
[#11779][CDCSDK] Add option to send a DDL record based on a flag valu…
Browse files Browse the repository at this point in the history
…e in GetChangesRequest

Summary:
Before this, the issue was that if for a stream ID, some data was consumed and a client comes up with the same stream ID and requests for changes, it will only receive the changes.

Now the issue with this was with `Debezium` that when the connector was restarted, it directly received the changes without any DDL record, this DDL record was essential for Debezium since it was used to process the schema info for the columns in Debezium and in case it was not there, it lead to a `NullPointerException` on the client side, thus causing a connector crash effectively.

Test Plan:
Tested the complete CDC with Debezium pipeline with the specified change.

Command to run test:
`ybd --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNeedSchemaInfoFlag`

Reviewers: rahuldesirazu, jhe, skumar, sergei

Reviewed By: sergei

Subscribers: iamoncar, sdash, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D16057
  • Loading branch information
vaibhav-yb committed Mar 25, 2022
1 parent 7898813 commit cf5fead
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 13 deletions.
8 changes: 6 additions & 2 deletions ent/src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,15 @@ class CDCServiceImpl::Impl {
it->last_streamed_op_id = op_id;
}

std::shared_ptr<Schema> GetOrAddSchema(const ProducerTabletInfo& producer_tablet) {
std::shared_ptr<Schema> GetOrAddSchema(const ProducerTabletInfo &producer_tablet,
const bool need_schema_info) {
std::lock_guard<decltype(mutex_)> l(mutex_);
auto it = cdc_state_metadata_.find(producer_tablet);

if (it != cdc_state_metadata_.end()) {
if (need_schema_info) {
it->current_schema = std::make_shared<Schema>();
}
return it->current_schema;
}
CDCStateMetadataInfo info = CDCStateMetadataInfo {
Expand Down Expand Up @@ -1116,7 +1120,7 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
std::string commit_timestamp;
OpId last_streamed_op_id;

auto cached_schema = impl_->GetOrAddSchema(producer_tablet);
auto cached_schema = impl_->GetOrAddSchema(producer_tablet, req->need_schema_info());
s = cdc::GetChangesForCDCSDK(
req->stream_id(), req->tablet_id(), cdc_sdk_op_id, record, tablet_peer, mem_tracker,
&msgs_holder, resp, &commit_timestamp, &cached_schema,
Expand Down
89 changes: 88 additions & 1 deletion ent/src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {
return resp.cluster_config().cluster_uuid();
}

// the range is exclusive of end i.e. [start, end)
// The range is exclusive of end i.e. [start, end)
void WriteRows(uint32_t start, uint32_t end, Cluster* cluster) {
auto conn = EXPECT_RESULT(cluster->ConnectToDB(kNamespaceName));
LOG(INFO) << "Writing " << end - start << " row(s)";
Expand Down Expand Up @@ -159,6 +159,18 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {
change_req->mutable_from_cdc_sdk_checkpoint()->set_write_id(0);
}

void PrepareChangeRequest(
GetChangesRequestPB* change_req, const CDCStreamId& stream_id,
const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& tablets,
const CDCSDKCheckpointPB& cp) {
change_req->set_stream_id(stream_id);
change_req->set_tablet_id(tablets.Get(0).tablet_id());
change_req->mutable_from_cdc_sdk_checkpoint()->set_index(cp.index());
change_req->mutable_from_cdc_sdk_checkpoint()->set_term(cp.term());
change_req->mutable_from_cdc_sdk_checkpoint()->set_key(cp.key());
change_req->mutable_from_cdc_sdk_checkpoint()->set_write_id(cp.write_id());
}

void PrepareSetCheckpointRequest(
SetCDCCheckpointRequestPB* set_checkpoint_req,
const CDCStreamId stream_id,
Expand Down Expand Up @@ -253,6 +265,44 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {
LOG(INFO) << "Got " << ins_count << " insert records";
ASSERT_EQ(expected_records_size, ins_count);
}

Result<GetChangesResponsePB> VerifyIfDDLRecordPresent(
const CDCStreamId& stream_id,
const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& tablets,
bool expect_ddl_record, bool is_first_call, const CDCSDKCheckpointPB* cp = nullptr) {
GetChangesRequestPB req;
GetChangesResponsePB resp;

if (cp == nullptr) {
PrepareChangeRequest(&req, stream_id, tablets);
} else {
PrepareChangeRequest(&req, stream_id, tablets, *cp);
}

// The default value for need_schema_info is false.
if (expect_ddl_record) {
req.set_need_schema_info(true);
}

RpcController get_changes_rpc;
RETURN_NOT_OK(cdc_proxy_->GetChanges(req, &resp, &get_changes_rpc));

if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}

auto record = resp.cdc_sdk_proto_records(0);

// If it's the first call to GetChanges, we will get a DDL record irrespective of the
// value of need_schema_info.
if (is_first_call || expect_ddl_record) {
EXPECT_EQ(record.row_message().op(), RowMessage::DDL);
} else {
EXPECT_NE(record.row_message().op(), RowMessage::DDL);
}

return resp;
}
};

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestBaseFunctions)) {
Expand Down Expand Up @@ -328,6 +378,43 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(MultiRowInsertion)) {
LOG(INFO) << "Got " << ins_count << " insert records";
ASSERT_EQ(expected_records_size, ins_count);
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestNeedSchemaInfoFlag)) {
ASSERT_OK(SetUpWithParams(3, 1, false));

auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName));

google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(
table, 0, &tablets, /* partition_list_version = */ nullptr));

std::string table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));
CDCStreamId stream_id = ASSERT_RESULT(CreateDBStream());

ASSERT_OK(SetInitialCheckpoint(stream_id, tablets));

// This will write one row with PK = 0.
WriteRows(0 /* start */, 1 /* end */, &test_cluster_);

// This is the first call to GetChanges, we will get a DDL record.
GetChangesResponsePB resp = ASSERT_RESULT(VerifyIfDDLRecordPresent(stream_id, tablets, false,
true));

// Write another row to the database with PK = 1.
WriteRows(1 /* start */, 2 /* end */, &test_cluster_);

// We will not get any DDL record here since this is not the first call and the flag
// need_schema_info is also unset.
resp = ASSERT_RESULT(VerifyIfDDLRecordPresent(stream_id, tablets, false, false,
&resp.cdc_sdk_checkpoint()));

// Write another row to the database with PK = 2.
WriteRows(2 /* start */, 3 /* end */, &test_cluster_);

// We will get a DDL record since we have enabled the need_schema_info flag.
resp = ASSERT_RESULT(VerifyIfDDLRecordPresent(stream_id, tablets, true, false,
&resp.cdc_sdk_checkpoint()));
}
} // namespace enterprise
} // namespace cdc
} // namespace yb
11 changes: 10 additions & 1 deletion java/yb-cdc/src/main/java/org/yb/cdc/ConcurrentPoller.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class ConcurrentPoller {

YBClient synClient;

// We need the schema information in a DDL the very first time we send a getChanges request.
boolean needSchemaInfo = true;

public ConcurrentPoller(YBClient synClient,
AsyncYBClient client,
OutputClient outputClient,
Expand Down Expand Up @@ -111,6 +114,7 @@ public void poll() throws Exception {
final List result = new ArrayList();
queue.addAll(listTabletIdTableIdPair);
queue.add(END_PAIR);

while (true) {
if (stopExecution) {
// This signals the CDCConsoleSubscriber to stop polling further and exit.
Expand All @@ -134,12 +138,17 @@ public void poll() throws Exception {

Deferred<GetChangesResponse> response = asyncYBClient.getChangesCDCSDK(
table, streamId, entry.getKey() /*tabletId*/,
cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWriteId(), cp.getSnapshotTime());
cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWriteId(), cp.getSnapshotTime(),
needSchemaInfo);

// Once we got the response, we do not need the schema in further calls so unset the flag.
needSchemaInfo = false;

response.addCallback(resCallback);
response.addErrback(errCallback);

deferredList.add(response);

}

AtomicInteger totalException = new AtomicInteger();
Expand Down
3 changes: 2 additions & 1 deletion java/yb-cdc/src/test/java/org/yb/cdc/TestGetChanges.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public void testGettingChangesWithNegativeIndex() {
// An exception would be thrown for an index less than 0.
try {
GetChangesResponse changesResponse =
myClient.getChangesCDCSDK(table, dbStreamId, tabletId, 0, -1, new byte[]{}, 0, 0L);
myClient.getChangesCDCSDK(
table, dbStreamId, tabletId, 0, -1, new byte[]{}, 0, 0L, false);
} catch (Exception e) {
exceptionThrown = true;
break;
Expand Down
16 changes: 14 additions & 2 deletions java/yb-cdc/src/test/java/org/yb/cdc/util/CDCSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class CDCSubscriber {

private Checkpoint checkpoint;

private boolean needSchemaInfo = false;

/**
* This is the default number of tablets as specified in AsyncYBClient
* @see AsyncYBClient
Expand Down Expand Up @@ -88,6 +90,14 @@ public void setTableName(String tableName) {
this.tableName = tableName;
}

public boolean shouldSendSchema() {
return needSchemaInfo;
}

public void setNeedSchemaInfo(boolean needSchemaInfo) {
this.needSchemaInfo = needSchemaInfo;
}

public void setNumberOfTablets(int numberOfTablets) {
this.numberOfTablets = numberOfTablets;
}
Expand Down Expand Up @@ -461,8 +471,10 @@ public void getResponseFromCDC(List records, Checkpoint cp) throws Exception {

for (String tabletId : tabletIds) {
GetChangesResponse changesResponse =
syncClient.getChangesCDCSDK(table, dbStreamId, tabletId,
cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWriteId(), cp.getSnapshotTime());
syncClient.getChangesCDCSDK(
table, dbStreamId, tabletId,
cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWriteId(), cp.getSnapshotTime(),
shouldSendSchema());

if (FORMAT.equalsIgnoreCase("PROTO")) {
// Add records in proto.
Expand Down
5 changes: 3 additions & 2 deletions java/yb-client/src/main/java/org/yb/client/AsyncYBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,11 @@ public Object call(Object o) throws Exception {
public Deferred<GetChangesResponse> getChangesCDCSDK(YBTable table, String streamId,
String tabletId, long term,
long index, byte[] key,
int write_id, long time) {
int write_id, long time,
boolean needSchemaInfo) {
checkIsClosed();
GetChangesRequest rpc = new GetChangesRequest(table, streamId, tabletId, term,
index, key, write_id, time);
index, key, write_id, time, needSchemaInfo);
Deferred d = rpc.getDeferred();
d.addErrback(new Callback<Exception, Exception>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ public String getTabletId() {
private final byte[] key;
private final int write_id;
private final long time;
private final boolean needSchemaInfo;

public GetChangesRequest(YBTable table, String streamId, String tabletId,
long term, long index, byte[] key, int write_id, long time) {
long term, long index, byte[] key, int write_id, long time, boolean needSchemaInfo) {
super(table);
this.streamId = streamId;
this.tabletId = tabletId;
Expand All @@ -46,6 +47,7 @@ public GetChangesRequest(YBTable table, String streamId, String tabletId,
this.key = key;
this.write_id = write_id;
this.time = time;
this.needSchemaInfo = needSchemaInfo;
}

@Override
Expand All @@ -54,6 +56,7 @@ ChannelBuffer serialize(Message header) {
final GetChangesRequestPB.Builder builder = GetChangesRequestPB.newBuilder();
builder.setDbStreamId(ByteString.copyFromUtf8(this.streamId));
builder.setTabletId(ByteString.copyFromUtf8(this.tabletId));
builder.setNeedSchemaInfo(this.needSchemaInfo);
if (term != 0 || index != 0) {
CdcService.CDCSDKCheckpointPB.Builder checkpointBuilder =
CdcService.CDCSDKCheckpointPB.newBuilder();
Expand Down
7 changes: 4 additions & 3 deletions java/yb-client/src/main/java/org/yb/client/YBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1430,9 +1430,10 @@ public IsSetupUniverseReplicationDoneResponse isAlterUniverseReplicationDone(
public GetChangesResponse getChangesCDCSDK(YBTable table, String streamId,
String tabletId, long term,
long index, byte[] key,
int write_id, long time) throws Exception {
Deferred<GetChangesResponse> d = asyncClient
.getChangesCDCSDK(table, streamId, tabletId, term, index, key, write_id, time);
int write_id, long time,
boolean needSchemaInfo) throws Exception {
Deferred<GetChangesResponse> d = asyncClient.getChangesCDCSDK(
table, streamId, tabletId, term, index, key, write_id, time, needSchemaInfo);
return d.join(2*getDefaultAdminOperationTimeoutMs());
}

Expand Down
2 changes: 2 additions & 0 deletions src/yb/cdc/cdc_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ message GetChangesRequestPB {
optional bytes table_id = 7;

optional CDCSDKCheckpointPB from_cdc_sdk_checkpoint = 8;

optional bool need_schema_info = 9 [default = false];
}

message KeyValuePairPB {
Expand Down

0 comments on commit cf5fead

Please sign in to comment.