Skip to content

Commit

Permalink
[BACKPORT 2.12][CDCSDK] [#9019] CDC SDK Tx/xCluster layer changes
Browse files Browse the repository at this point in the history
Summary:
Original commit: 1371944 / D13838
Github Master Ticket: #9019
Design DocumentL https://docs.google.com/document/d/1_xZqU5UgzCu1W--kci3ajU7_iYXXHMQvudmybDI-Xsk/edit
Functional Spec: https://docs.google.com/document/u/2/d/1nHuzHQ-qYVPbKi2dqo_drzSXMq00h7w5oi0JDf0GD1U/edit#heading=h.jmqfs7jgvvg8

  - Added a new CDC Type EXTERNAL
  - Have added CDCSDK naming conventions to avoid confusion with common codes with cluster
  - Read the changes from IntentDB for UPDATE_TRANSACTION_OP op type
  - Batch the changes from IntentDB depending on the maximum batch size defined by cdc_max_stream_intent_records
  - Send CDCSDKCheckpoint with every record
  - CDCSDKCheckpoint will have term, index, reverse_index_key, and write_id
  - Mark the record as INSERT/UPDATE/DELETE depending on the type of operations that were performed.
  - An update of the primary key will generate two events DELETE and INSERT
  - UPDATE of multiple columns is 'broken' into multiple records of single column UPDATE record in case of multi-shard transactions
  - Send the DDL events found in the WAL to the subscriber

Fix compilation error

Test Plan:
We have unit tests as well as tests using the ConsoleSubscriber.

  - Added CPP Unit tests to verify op types INSERT/UPDATE/DELETE types
  - Verify the ordering of events of records
  - Added Java Unit tests using CDC Java Console Client, verifying
  - Multiple data types (To be enhanced)
  - Large SQL scripts with varying DMLs with the expected output
  - Correctness in case of the composite primary key

We also have run long-running tests with failover to verify if the number of streamed are expected.

Reviewers: bogdan, nicolas, rahuldesirazu, sergei

Reviewed By: sergei

Differential Revision: https://phabricator.dev.yugabyte.com/D16235
  • Loading branch information
suranjan committed Mar 29, 2022
1 parent 3ad4697 commit 361db73
Show file tree
Hide file tree
Showing 83 changed files with 5,190 additions and 508 deletions.
4 changes: 2 additions & 2 deletions ent/src/yb/cdc/CMakeLists-include.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ set(CDC_SRCS_EXTENSIONS
${YB_ENT_CURRENT_SOURCE_DIR}/cdc_service.cc
${YB_ENT_CURRENT_SOURCE_DIR}/cdc_metrics.cc
${YB_ENT_CURRENT_SOURCE_DIR}/cdc_producer.cc
${YB_ENT_CURRENT_SOURCE_DIR}/cdc_rpc.cc
${YB_ENT_CURRENT_SOURCE_DIR}/cdc_error.cc)
${YB_ENT_CURRENT_SOURCE_DIR}/cdcsdk_producer.cc
${YB_ENT_CURRENT_SOURCE_DIR}/cdc_rpc.cc)

ADD_YB_LIBRARY(
cdc
Expand Down
101 changes: 101 additions & 0 deletions ent/src/yb/cdc/cdc_common_util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#ifndef ENT_SRC_YB_CDC_CDC_COMMON_UTIL_H
#define ENT_SRC_YB_CDC_CDC_COMMON_UTIL_H

#include <string>
#include <boost/functional/hash.hpp>

#include "yb/cdc/cdc_service.pb.h"

#include "yb/common/common_fwd.h"
#include "yb/common/wire_protocol.h"

#include "yb/consensus/raft_consensus.h"
#include "yb/consensus/replicate_msgs_holder.h"

#include "yb/docdb/docdb.h"
#include "yb/docdb/primitive_value.h"
#include "yb/docdb/ql_rowwise_iterator_interface.h"
#include "yb/docdb/value_type.h"

#include "yb/tablet/tablet.h"
#include "yb/tablet/tablet_metadata.h"
#include "yb/tablet/tablet_peer.h"
#include "yb/tablet/transaction_participant.h"

#include "yb/tserver/tablet_server.h"
#include "yb/tserver/ts_tablet_manager.h"

#include "yb/util/format.h"

namespace yb {
namespace cdc {

YB_STRONGLY_TYPED_BOOL(ReplicateIntents);

// Use boost::unordered_map instead of std::unordered_map because gcc release build
// fails to compile correctly when TxnStatusMap is used with Result<> (due to what seems like
// a bug in gcc where it tries to incorrectly destroy Status part of Result).
typedef boost::unordered_map<TransactionId,
TransactionStatusResult,
TransactionIdHash> TxnStatusMap;
typedef std::pair<uint64_t, size_t> RecordTimeIndex;

void AddColumnToMap(
const ColumnSchema &col_schema,
const docdb::PrimitiveValue &col,
cdc::KeyValuePairPB *kv_pair);

void AddProtoRecordColumnToMap(
const ColumnSchema &col_schema,
const docdb::PrimitiveValue &col,
cdc::KeyValuePairPB *kv_pair,
bool is_proto_record,
DatumMessagePB *cdc_datum_message = nullptr);

Result<bool> SetCommittedRecordIndexForReplicateMsg(
const consensus::ReplicateMsgPtr &msg,
size_t index,
const TxnStatusMap &txn_map,
ReplicateIntents replicate_intents,
std::vector<RecordTimeIndex> *records);

Result<std::vector<RecordTimeIndex>> GetCommittedRecordIndexes(
const consensus::ReplicateMsgs &msgs,
const TxnStatusMap &txn_map,
ReplicateIntents replicate_intents,
OpId *checkpoint);

Result<consensus::ReplicateMsgs> FilterAndSortWrites(
const consensus::ReplicateMsgs &msgs,
const TxnStatusMap &txn_map,
ReplicateIntents replicate_intents,
OpId *checkpoint);

Result<TransactionStatusResult> GetTransactionStatus(
const TransactionId &txn_id,
const HybridTime &hybrid_time,
tablet::TransactionParticipant *txn_participant);

Result<TxnStatusMap> BuildTxnStatusMap(
const consensus::ReplicateMsgs &messages,
bool more_replicate_msgs,
const std::shared_ptr<tablet::TabletPeer> &tablet_peer,
tablet::TransactionParticipant *txn_participant);

} // namespace cdc
} // namespace yb

#endif // ENT_SRC_YB_CDC_CDC_COMMON_UTIL_H
34 changes: 11 additions & 23 deletions ent/src/yb/cdc/cdc_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// under the License.

#include "yb/cdc/cdc_producer.h"
#include "yb/cdc/cdc_common_util.h"

#include "yb/cdc/cdc_service.pb.h"
#include "yb/common/schema.h"
Expand Down Expand Up @@ -49,17 +50,6 @@ using consensus::ReplicateMsgs;
using docdb::PrimitiveValue;
using tablet::TransactionParticipant;

YB_STRONGLY_TYPED_BOOL(ReplicateIntents);

namespace {

// Use boost::unordered_map instead of std::unordered_map because gcc release build
// fails to compile correctly when TxnStatusMap is used with Result<> (due to what seems like
// a bug in gcc where it tries to incorrectly destroy Status part of Result).
typedef boost::unordered_map<
TransactionId, TransactionStatusResult, TransactionIdHash> TxnStatusMap;
typedef std::pair<uint64_t, size_t> RecordTimeIndex;

void AddColumnToMap(const ColumnSchema& col_schema,
const docdb::PrimitiveValue& col,
cdc::KeyValuePairPB* kv_pair) {
Expand Down Expand Up @@ -402,18 +392,16 @@ CHECKED_STATUS PopulateTransactionRecord(const ReplicateMsgPtr& msg,
return Status::OK();
}

} // namespace

Status GetChanges(const std::string& stream_id,
const std::string& tablet_id,
const OpId& from_op_id,
const StreamMetadata& stream_metadata,
const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
const MemTrackerPtr& mem_tracker,
consensus::ReplicateMsgsHolder* msgs_holder,
GetChangesResponsePB* resp,
int64_t* last_readable_opid_index,
const CoarseTimePoint deadline) {
Status GetChangesForXCluster(const std::string& stream_id,
const std::string& tablet_id,
const OpId& from_op_id,
const StreamMetadata& stream_metadata,
const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
const MemTrackerPtr& mem_tracker,
consensus::ReplicateMsgsHolder* msgs_holder,
GetChangesResponsePB* resp,
int64_t* last_readable_opid_index,
const CoarseTimePoint deadline) {
auto replicate_intents = ReplicateIntents(GetAtomicFlag(&FLAGS_cdc_enable_replicate_intents));
// Request scope on transaction participant so that transactions are not removed from participant
// while RequestScope is active.
Expand Down
33 changes: 23 additions & 10 deletions ent/src/yb/cdc/cdc_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,30 @@ struct StreamMetadata {
}
};

CHECKED_STATUS GetChanges(const std::string& stream_id,
const std::string& tablet_id,
const OpId& op_id,
const StreamMetadata& record,
const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
const std::shared_ptr<MemTracker>& mem_tracker,
consensus::ReplicateMsgsHolder* msgs_holder,
GetChangesResponsePB* resp,
int64_t* last_readable_opid_index = nullptr,
const CoarseTimePoint deadline = CoarseTimePoint::max());
CHECKED_STATUS GetChangesForCDCSDK(const std::string& stream_id,
const std::string& tablet_id,
const CDCSDKCheckpointPB& op_id,
const StreamMetadata& record,
const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
const std::shared_ptr<MemTracker>& mem_tracker,
consensus::ReplicateMsgsHolder* msgs_holder,
GetChangesResponsePB* resp,
std::string* commit_timestamp,
std::shared_ptr<Schema>* cached_schema,
OpId* last_streamed_op_id,
int64_t* last_readable_opid_index = nullptr,
const CoarseTimePoint deadline = CoarseTimePoint::max());

CHECKED_STATUS GetChangesForXCluster(const std::string& stream_id,
const std::string& tablet_id,
const OpId& op_id,
const StreamMetadata& record,
const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
const std::shared_ptr<MemTracker>& mem_tracker,
consensus::ReplicateMsgsHolder* msgs_holder,
GetChangesResponsePB* resp,
int64_t* last_readable_opid_index = nullptr,
const CoarseTimePoint deadline = CoarseTimePoint::max());
} // namespace cdc
} // namespace yb

Expand Down
Loading

0 comments on commit 361db73

Please sign in to comment.