Skip to content

Commit

Permalink
[yugabyte#15136] CDCSDK: Fix for crash when running cdcsdk with befor…
Browse files Browse the repository at this point in the history
…e image

Summary:
We observed a crash while running TPCC workload with CDCSDK enabled.
The stack trace is:

```
(gdb) bt
#0  0x0000557f25b11910 in yb::DatumMessagePB::MergeFrom(yb::DatumMessagePB const&) ()
#1  0x0000557f258a41ef in yb::cdc::PopulateBeforeImage(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::ReadHybridTime const&, yb::cdc::RowMessage*, std::__1::unordered_map<unsigned int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::hash<unsigned int>, std::__1::equal_to<unsigned int>, std::__1::allocator<std::__1::pair<unsigned int const, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > > const&, std::__1::unordered_map<unsigned int, std::__1::vector<yb::master::PgAttributePB, std::__1::allocator<yb::master::PgAttributePB> >, std::__1::hash<unsigned int>, std::__1::equal_to<unsigned int>, std::__1::allocator<std::__1::pair<unsigned int const, std::__1::vector<yb::master::PgAttributePB, std::__1::allocator<yb::master::PgAttributePB> > > > > const&, yb::docdb::SubDocKey const&, yb::Schema const&, unsigned int) ()
#2  0x0000557f258a7304 in yb::cdc::PopulateCDCSDKIntentRecord(yb::OpId const&, yb::StronglyTypedUuid<yb::TransactionId_Tag> const&, std::__1::vector<yb::docdb::IntentKeyValueForCDC, std::__1::allocator<yb::docdb::IntentKeyValueForCDC> > const&, yb::cdc::StreamMetadata const&, std::__1::shared_ptr<yb::tablet::TabletPeer> const&, std::__1::unordered_map<unsigned int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::hash<unsigned int>, std::__1::equal_to<unsigned int>, std::__1::allocator<std::__1::pair<unsigned int const, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > > const&, std::__1::unordered_map<unsigned int, std::__1::vector<yb::master::PgAttributePB, std::__1::allocator<yb::master::PgAttributePB> >, std::__1::hash<unsigned int>, std::__1::equal_to<unsigned int>, std::__1::allocator<std::__1::pair<unsigned int const, std::__1::vector<yb::master::PgAttributePB, std::__1::allocator<yb::master::PgAttributePB> > > > > const&, yb::cdc::GetChangesResponsePB*, yb::ScopedTrackedConsumption*, unsigned int*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, yb::Schema*, unsigned int, unsigned long const&) ()
#3  0x0000557f258aaa27 in yb::cdc::ProcessIntents(yb::OpId const&, yb::StronglyTypedUuid<yb::TransactionId_Tag> const&, yb::cdc::StreamMetadata const&, std::__1::unordered_map<unsigned int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::hash<unsigned int>, std::__1::equal_to<unsigned int>, std::__1::allocator<std::__1::pair<unsigned int const, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > > const&, std::__1::unordered_map<unsigned int, std::__1::vector<yb::master::PgAttributePB, std::__1::allocator<yb::master::PgAttributePB> >, std::__1::hash<unsigned int>, std::__1::equal_to<unsigned int>, std::__1::allocator<std::__1::pair<unsigned int const, std::__1::vector<yb::master::PgAttributePB, std::__1::allocator<yb::master::PgAttributePB> > > > > const&, yb::cdc::GetChangesResponsePB*, yb::ScopedTrackedConsumption*, yb::cdc::CDCSDKCheckpointPB*, std::__1::shared_ptr<yb::tablet::TabletPeer> const&, std::__1::vector<yb::docdb::IntentKeyValueForCDC, std::__1::allocator<yb::docdb::IntentKeyValueForCDC> >*, yb::docdb::ApplyTransactionState*, yb::client::YBClient*, std::__1::shared_ptr<yb::Schema>*, unsigned int*, unsigned long const&) ()
#4  0x0000557f258b00c1 in yb::cdc::GetChangesForCDCSDK(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::cdc::CDCSDKCheckpointPB const&, yb::cdc::StreamMetadata const&, std::__1::shared_ptr<yb::tablet::TabletPeer> const&, std::__1::shared_ptr<yb::MemTracker> const&, std::__1::unordered_map<unsigned int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::hash<unsigned int>, std::__1::equal_to<unsigned int>, std::__1::allocator<std::__1::pair<unsigned int const, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > > const&, std::__1::unordered_map<unsigned int, std::__1::vector<yb::master::PgAttributePB, std::__1::allocator<yb::master::PgAttributePB> >, std::__1::hash<unsigned int>, std::__1::equal_to<unsigned int>, std::__1::allocator<std::__1::pair<unsigned int const, std::__1::vector<yb::master::PgAttributePB, std::__1::allocator<yb::master::PgAttributePB> > > > > const&, yb::client::YBClient*, yb::consensus::ReplicateMsgsHolder*, yb::cdc::GetChangesResponsePB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, std::__1::shared_ptr<yb::Schema>*, unsigned int*, yb::OpId*, long*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) ()
#5  0x0000557f2586c448 in yb::cdc::CDCServiceImpl::GetChanges(yb::cdc::GetChangesRequestPB const*, yb::cdc::GetChangesResponsePB*, yb::rpc::RpcContext) ()
#6  0x0000557f25908246 in std::__1::__function::__func<yb::cdc::CDCServiceIf::InitMethods(scoped_refptr<yb::MetricEntity> const&)::$_3, std::__1::allocator<yb::cdc::CDCServiceIf::InitMethods(scoped_refptr<yb::MetricEntity> const&)::$_3>, void (std::__1::shared_ptr<yb::rpc::InboundCall>)>::operator()(std::__1::shared_ptr<yb::rpc::InboundCall>&&) ()
#7  0x0000557f2590a6af in yb::cdc::CDCServiceIf::Handle(std::__1::shared_ptr<yb::rpc::InboundCall>) ()
#8  0x0000557f26227a1e in yb::rpc::ServicePoolImpl::Handle(std::__1::shared_ptr<yb::rpc::InboundCall>) ()
#9  0x0000557f2616db2f in yb::rpc::InboundCall::InboundCallTask::Run() ()
#10 0x0000557f26236583 in yb::rpc::(anonymous namespace)::Worker::Execute() ()
#11 0x0000557f268698cf in yb::Thread::SuperviseThread(void*) ()
#12 0x00007fa6fce89694 in ?? ()
#13 0x0000000000000000 in ?? ()
```

The problem is in the method: PopulateBeforeImage
When we drop a column, the the row won't have data for the dropped column, and hence will not be added to the "old_tuple" member of RowMessage. This will mean the size of "old_tuple" does not match the number of columns in the schema.
Which means this line: "row_message->old_tuple(static_cast<int>(index))" could lead to an out of bounds exception.
Instead,  now we are keeping track of the found columns in the row.

Test Plan: Running existing ctests

Reviewers: srangavajjula, sdash, skumar

Reviewed By: sdash, skumar

Differential Revision: https://phabricator.dev.yugabyte.com/D21338
  • Loading branch information
Adithya Bharadwaj committed Dec 1, 2022
1 parent 524fd98 commit 136e713
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 7 deletions.
17 changes: 10 additions & 7 deletions ent/src/yb/cdc/cdcsdk_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ Status PopulateBeforeImage(

std::vector<ColumnSchema> columns(schema.columns());

size_t found_columns = 0;
if (row.ColumnCount() == columns.size()) {
for (size_t index = 0; index < row.ColumnCount(); ++index) {
bool column_updated = false;
Expand All @@ -236,25 +237,27 @@ Status PopulateBeforeImage(
tablet_peer, columns[index], PrimitiveValue(), enum_oid_label_map, composite_atts_map,
row_message->add_old_tuple(), &ql_value.value()));
if (row_message->op() == RowMessage_Op_UPDATE) {
const auto& old_tuple_column_name =
row_message->old_tuple(static_cast<int>(found_columns)).column_name();
for (int new_tuple_index = 0; new_tuple_index < row_message->new_tuple_size();
++new_tuple_index) {
if (row_message->new_tuple(static_cast<int>(new_tuple_index)).column_name() ==
columns[index].name()) {
old_tuple_column_name) {
column_updated = true;
break;
}
}
if (!column_updated) {
*(row_message->add_new_tuple()) = row_message->old_tuple(static_cast<int>(index));
auto new_tuple_pb = row_message->mutable_new_tuple()->Add();
new_tuple_pb->CopyFrom(row_message->old_tuple(static_cast<int>(found_columns)));
}
}
found_columns += 1;
}
}
} else {
if (row_message->op() != RowMessage_Op_DELETE) {
for (size_t index = 0; index < schema.num_columns(); ++index) {
row_message->add_old_tuple();
}
} else if (row_message->op() != RowMessage_Op_DELETE) {
for (size_t index = 0; index < schema.num_columns(); ++index) {
row_message->add_old_tuple();
}
}
return Status::OK();
Expand Down
42 changes: 42 additions & 0 deletions ent/src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9778,6 +9778,48 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestExpiredStreamWithCompaction))
ASSERT_LE(count_compaction_after_expired, count_after_compaction);
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestColumnDropBeforeImage)) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_timestamp_history_retention_interval_sec) = 0;
ASSERT_OK(SetUpWithParams(3, 1, false));
auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName));
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr));
ASSERT_EQ(tablets.size(), 1);
CDCStreamId stream_id =
ASSERT_RESULT(CreateDBStream(CDCCheckpointType::IMPLICIT, CDCRecordType::ALL));
auto set_resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets));
ASSERT_FALSE(set_resp.has_error());

auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));

ASSERT_OK(conn.Execute("INSERT INTO test_table VALUES (1, 2)"));
ASSERT_OK(conn.Execute("UPDATE test_table SET value_1 = 3 WHERE key = 1"));
ASSERT_OK(conn.Execute("ALTER TABLE test_table ADD COLUMN value_2 INT"));
ASSERT_OK(conn.Execute("UPDATE test_table SET value_2 = 4 WHERE key = 1"));
ASSERT_OK(conn.Execute("ALTER TABLE test_table DROP COLUMN value_2"));

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order.
const uint32_t expected_count[] = {3, 1, 2, 0, 0, 0};
uint32_t count[] = {0, 0, 0, 0, 0, 0};

ExpectedRecordWithThreeColumns expected_records[] = {
{0, 0, 0}, {1, 2, INT_MAX}, {1, 3, INT_MAX}, {0, 0, INT_MAX}, {1, 3, 4}, {}};
ExpectedRecordWithThreeColumns expected_before_image_records[] = {
{}, {}, {1, 2, INT_MAX}, {}, {1, 3, INT_MAX}, {}};

GetChangesResponsePB change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets));

uint32_t record_size = change_resp.cdc_sdk_proto_records_size();
for (uint32_t i = 0; i < record_size; ++i) {
const CDCSDKProtoRecordPB record = change_resp.cdc_sdk_proto_records(i);
CheckRecordWithThreeColumns(
record, expected_records[i], count, true, expected_before_image_records[i]);
}
LOG(INFO) << "Got " << count[1] << " insert record and " << count[2] << " update record";

CheckCount(expected_count, count);
}

} // namespace enterprise
} // namespace cdc
} // namespace yb

0 comments on commit 136e713

Please sign in to comment.