From cf5fead12edfa5572304153a4c061e294e8f698a Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Fri, 25 Mar 2022 05:05:24 +0000 Subject: [PATCH] [#11779][CDCSDK] Add option to send a DDL record based on a flag value in GetChangesRequest Summary: Before this, the issue was that if for a stream ID, some data was consumed and a client comes up with the same stream ID and requests for changes, it will only receive the changes. Now the issue with this was with `Debezium` that when the connector was restarted, it directly received the changes without any DDL record, this DDL record was essential for Debezium since it was used to process the schema info for the columns in Debezium and in case it was not there, it lead to a `NullPointerException` on the client side, thus causing a connector crash effectively. Test Plan: Tested the complete CDC with Debezium pipeline with the specified change. Command to run test: `ybd --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNeedSchemaInfoFlag` Reviewers: rahuldesirazu, jhe, skumar, sergei Reviewed By: sergei Subscribers: iamoncar, sdash, ybase Differential Revision: https://phabricator.dev.yugabyte.com/D16057 --- ent/src/yb/cdc/cdc_service.cc | 8 +- .../yb/integration-tests/cdcsdk_ysql-test.cc | 89 ++++++++++++++++++- .../java/org/yb/cdc/ConcurrentPoller.java | 11 ++- .../test/java/org/yb/cdc/TestGetChanges.java | 3 +- .../java/org/yb/cdc/util/CDCSubscriber.java | 16 +++- .../java/org/yb/client/AsyncYBClient.java | 5 +- .../java/org/yb/client/GetChangesRequest.java | 5 +- .../src/main/java/org/yb/client/YBClient.java | 7 +- src/yb/cdc/cdc_service.proto | 2 + 9 files changed, 133 insertions(+), 13 deletions(-) diff --git a/ent/src/yb/cdc/cdc_service.cc b/ent/src/yb/cdc/cdc_service.cc index 4d3b8f485f8c..68941476d3c4 100644 --- a/ent/src/yb/cdc/cdc_service.cc +++ b/ent/src/yb/cdc/cdc_service.cc @@ -257,11 +257,15 @@ class CDCServiceImpl::Impl { it->last_streamed_op_id = op_id; } - std::shared_ptr GetOrAddSchema(const ProducerTabletInfo& producer_tablet) { + std::shared_ptr GetOrAddSchema(const ProducerTabletInfo &producer_tablet, + const bool need_schema_info) { std::lock_guard l(mutex_); auto it = cdc_state_metadata_.find(producer_tablet); if (it != cdc_state_metadata_.end()) { + if (need_schema_info) { + it->current_schema = std::make_shared(); + } return it->current_schema; } CDCStateMetadataInfo info = CDCStateMetadataInfo { @@ -1116,7 +1120,7 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req, std::string commit_timestamp; OpId last_streamed_op_id; - auto cached_schema = impl_->GetOrAddSchema(producer_tablet); + auto cached_schema = impl_->GetOrAddSchema(producer_tablet, req->need_schema_info()); s = cdc::GetChangesForCDCSDK( req->stream_id(), req->tablet_id(), cdc_sdk_op_id, record, tablet_peer, mem_tracker, &msgs_holder, resp, &commit_timestamp, &cached_schema, diff --git a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc index f2946ee5458a..8f12e7c1d7e8 100644 --- a/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/ent/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -123,7 +123,7 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { return resp.cluster_config().cluster_uuid(); } - // the range is exclusive of end i.e. [start, end) + // The range is exclusive of end i.e. [start, end) void WriteRows(uint32_t start, uint32_t end, Cluster* cluster) { auto conn = EXPECT_RESULT(cluster->ConnectToDB(kNamespaceName)); LOG(INFO) << "Writing " << end - start << " row(s)"; @@ -159,6 +159,18 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { change_req->mutable_from_cdc_sdk_checkpoint()->set_write_id(0); } + void PrepareChangeRequest( + GetChangesRequestPB* change_req, const CDCStreamId& stream_id, + const google::protobuf::RepeatedPtrField& tablets, + const CDCSDKCheckpointPB& cp) { + change_req->set_stream_id(stream_id); + change_req->set_tablet_id(tablets.Get(0).tablet_id()); + change_req->mutable_from_cdc_sdk_checkpoint()->set_index(cp.index()); + change_req->mutable_from_cdc_sdk_checkpoint()->set_term(cp.term()); + change_req->mutable_from_cdc_sdk_checkpoint()->set_key(cp.key()); + change_req->mutable_from_cdc_sdk_checkpoint()->set_write_id(cp.write_id()); + } + void PrepareSetCheckpointRequest( SetCDCCheckpointRequestPB* set_checkpoint_req, const CDCStreamId stream_id, @@ -253,6 +265,44 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { LOG(INFO) << "Got " << ins_count << " insert records"; ASSERT_EQ(expected_records_size, ins_count); } + + Result VerifyIfDDLRecordPresent( + const CDCStreamId& stream_id, + const google::protobuf::RepeatedPtrField& tablets, + bool expect_ddl_record, bool is_first_call, const CDCSDKCheckpointPB* cp = nullptr) { + GetChangesRequestPB req; + GetChangesResponsePB resp; + + if (cp == nullptr) { + PrepareChangeRequest(&req, stream_id, tablets); + } else { + PrepareChangeRequest(&req, stream_id, tablets, *cp); + } + + // The default value for need_schema_info is false. + if (expect_ddl_record) { + req.set_need_schema_info(true); + } + + RpcController get_changes_rpc; + RETURN_NOT_OK(cdc_proxy_->GetChanges(req, &resp, &get_changes_rpc)); + + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + + auto record = resp.cdc_sdk_proto_records(0); + + // If it's the first call to GetChanges, we will get a DDL record irrespective of the + // value of need_schema_info. + if (is_first_call || expect_ddl_record) { + EXPECT_EQ(record.row_message().op(), RowMessage::DDL); + } else { + EXPECT_NE(record.row_message().op(), RowMessage::DDL); + } + + return resp; + } }; TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestBaseFunctions)) { @@ -328,6 +378,43 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(MultiRowInsertion)) { LOG(INFO) << "Got " << ins_count << " insert records"; ASSERT_EQ(expected_records_size, ins_count); } + +TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestNeedSchemaInfoFlag)) { + ASSERT_OK(SetUpWithParams(3, 1, false)); + + auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName)); + + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(test_client()->GetTablets( + table, 0, &tablets, /* partition_list_version = */ nullptr)); + + std::string table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); + CDCStreamId stream_id = ASSERT_RESULT(CreateDBStream()); + + ASSERT_OK(SetInitialCheckpoint(stream_id, tablets)); + + // This will write one row with PK = 0. + WriteRows(0 /* start */, 1 /* end */, &test_cluster_); + + // This is the first call to GetChanges, we will get a DDL record. + GetChangesResponsePB resp = ASSERT_RESULT(VerifyIfDDLRecordPresent(stream_id, tablets, false, + true)); + + // Write another row to the database with PK = 1. + WriteRows(1 /* start */, 2 /* end */, &test_cluster_); + + // We will not get any DDL record here since this is not the first call and the flag + // need_schema_info is also unset. + resp = ASSERT_RESULT(VerifyIfDDLRecordPresent(stream_id, tablets, false, false, + &resp.cdc_sdk_checkpoint())); + + // Write another row to the database with PK = 2. + WriteRows(2 /* start */, 3 /* end */, &test_cluster_); + + // We will get a DDL record since we have enabled the need_schema_info flag. + resp = ASSERT_RESULT(VerifyIfDDLRecordPresent(stream_id, tablets, true, false, + &resp.cdc_sdk_checkpoint())); +} } // namespace enterprise } // namespace cdc } // namespace yb diff --git a/java/yb-cdc/src/main/java/org/yb/cdc/ConcurrentPoller.java b/java/yb-cdc/src/main/java/org/yb/cdc/ConcurrentPoller.java index d43bf62bb3b5..273bf5aa6e88 100644 --- a/java/yb-cdc/src/main/java/org/yb/cdc/ConcurrentPoller.java +++ b/java/yb-cdc/src/main/java/org/yb/cdc/ConcurrentPoller.java @@ -49,6 +49,9 @@ public class ConcurrentPoller { YBClient synClient; + // We need the schema information in a DDL the very first time we send a getChanges request. + boolean needSchemaInfo = true; + public ConcurrentPoller(YBClient synClient, AsyncYBClient client, OutputClient outputClient, @@ -111,6 +114,7 @@ public void poll() throws Exception { final List result = new ArrayList(); queue.addAll(listTabletIdTableIdPair); queue.add(END_PAIR); + while (true) { if (stopExecution) { // This signals the CDCConsoleSubscriber to stop polling further and exit. @@ -134,12 +138,17 @@ public void poll() throws Exception { Deferred response = asyncYBClient.getChangesCDCSDK( table, streamId, entry.getKey() /*tabletId*/, - cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWriteId(), cp.getSnapshotTime()); + cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWriteId(), cp.getSnapshotTime(), + needSchemaInfo); + + // Once we got the response, we do not need the schema in further calls so unset the flag. + needSchemaInfo = false; response.addCallback(resCallback); response.addErrback(errCallback); deferredList.add(response); + } AtomicInteger totalException = new AtomicInteger(); diff --git a/java/yb-cdc/src/test/java/org/yb/cdc/TestGetChanges.java b/java/yb-cdc/src/test/java/org/yb/cdc/TestGetChanges.java index 9983ee8ab9e8..3f7679fa7bd6 100644 --- a/java/yb-cdc/src/test/java/org/yb/cdc/TestGetChanges.java +++ b/java/yb-cdc/src/test/java/org/yb/cdc/TestGetChanges.java @@ -77,7 +77,8 @@ public void testGettingChangesWithNegativeIndex() { // An exception would be thrown for an index less than 0. try { GetChangesResponse changesResponse = - myClient.getChangesCDCSDK(table, dbStreamId, tabletId, 0, -1, new byte[]{}, 0, 0L); + myClient.getChangesCDCSDK( + table, dbStreamId, tabletId, 0, -1, new byte[]{}, 0, 0L, false); } catch (Exception e) { exceptionThrown = true; break; diff --git a/java/yb-cdc/src/test/java/org/yb/cdc/util/CDCSubscriber.java b/java/yb-cdc/src/test/java/org/yb/cdc/util/CDCSubscriber.java index fea19f9be1d0..9640df157f48 100644 --- a/java/yb-cdc/src/test/java/org/yb/cdc/util/CDCSubscriber.java +++ b/java/yb-cdc/src/test/java/org/yb/cdc/util/CDCSubscriber.java @@ -45,6 +45,8 @@ public class CDCSubscriber { private Checkpoint checkpoint; + private boolean needSchemaInfo = false; + /** * This is the default number of tablets as specified in AsyncYBClient * @see AsyncYBClient @@ -88,6 +90,14 @@ public void setTableName(String tableName) { this.tableName = tableName; } + public boolean shouldSendSchema() { + return needSchemaInfo; + } + + public void setNeedSchemaInfo(boolean needSchemaInfo) { + this.needSchemaInfo = needSchemaInfo; + } + public void setNumberOfTablets(int numberOfTablets) { this.numberOfTablets = numberOfTablets; } @@ -461,8 +471,10 @@ public void getResponseFromCDC(List records, Checkpoint cp) throws Exception { for (String tabletId : tabletIds) { GetChangesResponse changesResponse = - syncClient.getChangesCDCSDK(table, dbStreamId, tabletId, - cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWriteId(), cp.getSnapshotTime()); + syncClient.getChangesCDCSDK( + table, dbStreamId, tabletId, + cp.getTerm(), cp.getIndex(), cp.getKey(), cp.getWriteId(), cp.getSnapshotTime(), + shouldSendSchema()); if (FORMAT.equalsIgnoreCase("PROTO")) { // Add records in proto. diff --git a/java/yb-client/src/main/java/org/yb/client/AsyncYBClient.java b/java/yb-client/src/main/java/org/yb/client/AsyncYBClient.java index c422a9f820a5..b36500743166 100644 --- a/java/yb-client/src/main/java/org/yb/client/AsyncYBClient.java +++ b/java/yb-client/src/main/java/org/yb/client/AsyncYBClient.java @@ -432,10 +432,11 @@ public Object call(Object o) throws Exception { public Deferred getChangesCDCSDK(YBTable table, String streamId, String tabletId, long term, long index, byte[] key, - int write_id, long time) { + int write_id, long time, + boolean needSchemaInfo) { checkIsClosed(); GetChangesRequest rpc = new GetChangesRequest(table, streamId, tabletId, term, - index, key, write_id, time); + index, key, write_id, time, needSchemaInfo); Deferred d = rpc.getDeferred(); d.addErrback(new Callback() { @Override diff --git a/java/yb-client/src/main/java/org/yb/client/GetChangesRequest.java b/java/yb-client/src/main/java/org/yb/client/GetChangesRequest.java index 8a11c16724b4..74cbd0f45afe 100644 --- a/java/yb-client/src/main/java/org/yb/client/GetChangesRequest.java +++ b/java/yb-client/src/main/java/org/yb/client/GetChangesRequest.java @@ -35,9 +35,10 @@ public String getTabletId() { private final byte[] key; private final int write_id; private final long time; + private final boolean needSchemaInfo; public GetChangesRequest(YBTable table, String streamId, String tabletId, - long term, long index, byte[] key, int write_id, long time) { + long term, long index, byte[] key, int write_id, long time, boolean needSchemaInfo) { super(table); this.streamId = streamId; this.tabletId = tabletId; @@ -46,6 +47,7 @@ public GetChangesRequest(YBTable table, String streamId, String tabletId, this.key = key; this.write_id = write_id; this.time = time; + this.needSchemaInfo = needSchemaInfo; } @Override @@ -54,6 +56,7 @@ ChannelBuffer serialize(Message header) { final GetChangesRequestPB.Builder builder = GetChangesRequestPB.newBuilder(); builder.setDbStreamId(ByteString.copyFromUtf8(this.streamId)); builder.setTabletId(ByteString.copyFromUtf8(this.tabletId)); + builder.setNeedSchemaInfo(this.needSchemaInfo); if (term != 0 || index != 0) { CdcService.CDCSDKCheckpointPB.Builder checkpointBuilder = CdcService.CDCSDKCheckpointPB.newBuilder(); diff --git a/java/yb-client/src/main/java/org/yb/client/YBClient.java b/java/yb-client/src/main/java/org/yb/client/YBClient.java index 00f90e3d7545..0754d1ef8d93 100644 --- a/java/yb-client/src/main/java/org/yb/client/YBClient.java +++ b/java/yb-client/src/main/java/org/yb/client/YBClient.java @@ -1430,9 +1430,10 @@ public IsSetupUniverseReplicationDoneResponse isAlterUniverseReplicationDone( public GetChangesResponse getChangesCDCSDK(YBTable table, String streamId, String tabletId, long term, long index, byte[] key, - int write_id, long time) throws Exception { - Deferred d = asyncClient - .getChangesCDCSDK(table, streamId, tabletId, term, index, key, write_id, time); + int write_id, long time, + boolean needSchemaInfo) throws Exception { + Deferred d = asyncClient.getChangesCDCSDK( + table, streamId, tabletId, term, index, key, write_id, time, needSchemaInfo); return d.join(2*getDefaultAdminOperationTimeoutMs()); } diff --git a/src/yb/cdc/cdc_service.proto b/src/yb/cdc/cdc_service.proto index 6efe0b7a523c..97b532da240c 100644 --- a/src/yb/cdc/cdc_service.proto +++ b/src/yb/cdc/cdc_service.proto @@ -195,6 +195,8 @@ message GetChangesRequestPB { optional bytes table_id = 7; optional CDCSDKCheckpointPB from_cdc_sdk_checkpoint = 8; + + optional bool need_schema_info = 9 [default = false]; } message KeyValuePairPB {