From da54a59526e1390b65bd4fc0dbd4ab8e394c9e75 Mon Sep 17 00:00:00 2001 From: Sergei Politov Date: Wed, 16 Feb 2022 08:49:02 +0300 Subject: [PATCH] [#11409] DocDB: Skip creating write batch while writing to rocksdb Summary: RocksDB WAL is disabled in our system. So we could avoid creating write batch while writing to RocksDB. This diff introduces DirectWriter,that allows us to avoid serializing key and values before writing them to RocksDB. DirectWriter is used when applying WriteOperation. In a follow-up diff we could use DirectWriter to also apply transaction intents. Test Plan: Jenkins Reviewers: timur, mbautin Reviewed By: timur, mbautin Subscribers: mbautin, ybase, bogdan Differential Revision: https://phabricator.dev.yugabyte.com/D15368 --- .../yb/master/restore_sys_catalog_state.cc | 5 +- src/yb/client/ql-dml-test.cc | 2 + src/yb/docdb/CMakeLists.txt | 21 +- src/yb/docdb/doc_kv_util.cc | 19 +- src/yb/docdb/doc_kv_util.h | 8 + src/yb/docdb/docdb-internal.h | 3 - src/yb/docdb/docdb.cc | 338 ++---------------- src/yb/docdb/docdb.h | 24 +- src/yb/docdb/docdb_debug.cc | 3 +- src/yb/docdb/docdb_util.cc | 35 +- src/yb/docdb/rocksdb_writer.cc | 332 +++++++++++++++++ src/yb/docdb/rocksdb_writer.h | 117 ++++++ src/yb/rocksdb/db/db_impl.cc | 3 + src/yb/rocksdb/db/db_log_iter_test.cc | 8 +- src/yb/rocksdb/db/dbformat.cc | 16 +- src/yb/rocksdb/db/dbformat.h | 9 +- src/yb/rocksdb/db/flush_job_test.cc | 10 +- src/yb/rocksdb/db/memtable.cc | 124 ++++--- src/yb/rocksdb/db/memtable.h | 18 +- src/yb/rocksdb/db/user_op_id_test.cc | 8 +- src/yb/rocksdb/db/write_batch.cc | 84 ++++- src/yb/rocksdb/db/write_batch_test.cc | 45 +-- src/yb/rocksdb/table/table_test.cc | 4 +- .../transactions/transaction_impl.cc | 6 +- src/yb/rocksdb/utilities/ttl/db_ttl_impl.cc | 9 +- .../write_batch_with_index_test.cc | 6 +- src/yb/rocksdb/write_batch.h | 37 +- src/yb/rocksutil/write_batch_formatter.cc | 8 +- src/yb/rocksutil/write_batch_formatter.h | 4 +- src/yb/tablet/tablet.cc | 58 +-- src/yb/tablet/tablet.h | 4 +- src/yb/tablet/transaction_participant.cc | 55 ++- src/yb/tablet/transaction_participant.h | 4 +- src/yb/util/slice.cc | 13 +- src/yb/util/slice.h | 8 +- 35 files changed, 892 insertions(+), 556 deletions(-) create mode 100644 src/yb/docdb/rocksdb_writer.cc create mode 100644 src/yb/docdb/rocksdb_writer.h diff --git a/ent/src/yb/master/restore_sys_catalog_state.cc b/ent/src/yb/master/restore_sys_catalog_state.cc index 52b4b54e854b..067cf0979a0c 100644 --- a/ent/src/yb/master/restore_sys_catalog_state.cc +++ b/ent/src/yb/master/restore_sys_catalog_state.cc @@ -26,6 +26,7 @@ #include "yb/docdb/doc_write_batch.h" #include "yb/docdb/docdb.h" #include "yb/docdb/pgsql_operation.h" +#include "yb/docdb/rocksdb_writer.h" #include "yb/master/catalog_loaders.h" #include "yb/master/master_backup.pb.h" @@ -445,9 +446,9 @@ void RestoreSysCatalogState::WriteToRocksDB( docdb::KeyValueWriteBatchPB kv_write_batch; write_batch->MoveToWriteBatchPB(&kv_write_batch); + docdb::NonTransactionalWriter writer(kv_write_batch, write_time); rocksdb::WriteBatch rocksdb_write_batch; - PrepareNonTransactionWriteBatch( - kv_write_batch, write_time, nullptr, &rocksdb_write_batch, nullptr); + rocksdb_write_batch.SetDirectWriter(&writer); docdb::ConsensusFrontiers frontiers; set_op_id(op_id, &frontiers); set_hybrid_time(write_time, &frontiers); diff --git a/src/yb/client/ql-dml-test.cc b/src/yb/client/ql-dml-test.cc index 99d52acb4a4d..4b71ff2e11d7 100644 --- a/src/yb/client/ql-dml-test.cc +++ b/src/yb/client/ql-dml-test.cc @@ -26,6 +26,8 @@ #include "yb/master/master_util.h" +#include "yb/rocksdb/db.h" + #include "yb/tablet/tablet.h" #include "yb/tablet/tablet_peer.h" diff --git a/src/yb/docdb/CMakeLists.txt b/src/yb/docdb/CMakeLists.txt index a4a2e4e9800e..da7f3f1c071d 100644 --- a/src/yb/docdb/CMakeLists.txt +++ b/src/yb/docdb/CMakeLists.txt @@ -28,16 +28,16 @@ ADD_YB_LIBRARY(docdb_proto NONLINK_DEPS ${DOCDB_PROTO_TGTS}) set(DOCDB_ENCODING_SRCS - doc_key.cc - doc_kv_util.cc - doc_path.cc - key_bounds.cc - key_bytes.cc - primitive_value.cc - primitive_value_util.cc - intent.cc - doc_scanspec_util.cc - ) + doc_key.cc + doc_kv_util.cc + doc_path.cc + key_bounds.cc + key_bytes.cc + primitive_value.cc + primitive_value_util.cc + intent.cc + doc_scanspec_util.cc + ) set(DOCDB_ENCODING_DEPS docdb_proto @@ -87,6 +87,7 @@ set(DOCDB_SRCS ql_rocksdb_storage.cc ql_rowwise_iterator_interface.cc redis_operation.cc + rocksdb_writer.cc shared_lock_manager.cc subdocument.cc subdoc_reader.cc diff --git a/src/yb/docdb/doc_kv_util.cc b/src/yb/docdb/doc_kv_util.cc index a5ed3414cf0b..ecbd66c82f21 100644 --- a/src/yb/docdb/doc_kv_util.cc +++ b/src/yb/docdb/doc_kv_util.cc @@ -14,7 +14,7 @@ #include "yb/docdb/doc_kv_util.h" #include "yb/docdb/docdb_fwd.h" -#include "yb/docdb/docdb-internal.h" +#include "yb/docdb/docdb.h" #include "yb/docdb/value_type.h" #include "yb/util/bytes_formatter.h" @@ -196,12 +196,9 @@ Result DecodeInvertedDocHt(Slice key_slice) { "Invalid doc hybrid time in reverse intent record suffix: $0", key_slice.ToDebugHexString()); } - size_t doc_ht_buffer[kMaxWordsPerEncodedHybridTimeWithValueType]; - memcpy(doc_ht_buffer, key_slice.data(), key_slice.size()); - for (size_t i = 0; i != kMaxWordsPerEncodedHybridTimeWithValueType; ++i) { - doc_ht_buffer[i] = ~doc_ht_buffer[i]; - } - key_slice = Slice(pointer_cast(doc_ht_buffer), key_slice.size()); + + DocHybridTimeWordBuffer doc_ht_buffer; + key_slice = InvertEncodedDocHT(key_slice, &doc_ht_buffer); if (static_cast(key_slice[0]) != ValueType::kHybridTime) { return STATUS_FORMAT( @@ -215,5 +212,13 @@ Result DecodeInvertedDocHt(Slice key_slice) { return doc_ht; } +Slice InvertEncodedDocHT(const Slice& input, DocHybridTimeWordBuffer* buffer) { + memcpy(buffer->data(), input.data(), input.size()); + for (size_t i = 0; i != kMaxWordsPerEncodedHybridTimeWithValueType; ++i) { + (*buffer)[i] = ~(*buffer)[i]; + } + return {pointer_cast(buffer->data()), input.size()}; +} + } // namespace docdb } // namespace yb diff --git a/src/yb/docdb/doc_kv_util.h b/src/yb/docdb/doc_kv_util.h index f82abedde2e1..493ebc22755f 100644 --- a/src/yb/docdb/doc_kv_util.h +++ b/src/yb/docdb/doc_kv_util.h @@ -146,6 +146,14 @@ inline std::string ToShortDebugStr(const std::string& raw_str) { Result DecodeInvertedDocHt(Slice key_slice); +constexpr size_t kMaxWordsPerEncodedHybridTimeWithValueType = + ((kMaxBytesPerEncodedHybridTime + 1) + sizeof(size_t) - 1) / sizeof(size_t); + +// Puts inverted encoded doc hybrid time specified by input to buffer. +// And returns slice to it. +using DocHybridTimeWordBuffer = std::array; +Slice InvertEncodedDocHT(const Slice& input, DocHybridTimeWordBuffer* buffer); + } // namespace docdb } // namespace yb diff --git a/src/yb/docdb/docdb-internal.h b/src/yb/docdb/docdb-internal.h index 001764cb653b..826db1cbd46b 100644 --- a/src/yb/docdb/docdb-internal.h +++ b/src/yb/docdb/docdb-internal.h @@ -74,9 +74,6 @@ namespace docdb { // Infer the key type from the given slice, given whether this is regular or intents RocksDB. KeyType GetKeyType(const Slice& slice, StorageDbType db_type); -constexpr size_t kMaxWordsPerEncodedHybridTimeWithValueType = - ((kMaxBytesPerEncodedHybridTime + 1) + sizeof(size_t) - 1) / sizeof(size_t); - } // namespace docdb } // namespace yb diff --git a/src/yb/docdb/docdb.cc b/src/yb/docdb/docdb.cc index 35afc15dc4ce..9750caefdfca 100644 --- a/src/yb/docdb/docdb.cc +++ b/src/yb/docdb/docdb.cc @@ -30,11 +30,13 @@ #include "yb/docdb/docdb-internal.h" #include "yb/docdb/docdb.pb.h" #include "yb/docdb/docdb_debug.h" +#include "yb/docdb/doc_kv_util.h" #include "yb/docdb/docdb_rocksdb_util.h" #include "yb/docdb/docdb_types.h" #include "yb/docdb/intent.h" #include "yb/docdb/intent_aware_iterator.h" #include "yb/docdb/pgsql_operation.h" +#include "yb/docdb/rocksdb_writer.h" #include "yb/docdb/subdocument.h" #include "yb/docdb/transaction_dump.h" #include "yb/docdb/value.h" @@ -77,10 +79,6 @@ using strings::Substitute; using namespace std::placeholders; -DEFINE_test_flag(bool, docdb_sort_weak_intents, false, - "Sort weak intents to make their order deterministic."); -DEFINE_bool(enable_transaction_sealing, false, - "Whether transaction sealing is enabled."); DEFINE_test_flag(bool, fail_on_replicated_batch_idx_set_in_txn_record, false, "Fail when a set of replicated batch indexes is found in txn record."); DEFINE_int32(txn_max_apply_batch_records, 100000, @@ -93,60 +91,6 @@ namespace docdb { namespace { -// Slice parts with the number of slices fixed at compile time. -template -struct FixedSliceParts { - FixedSliceParts(const std::array& input) : parts(input.data()) { // NOLINT - } - - operator SliceParts() const { - return SliceParts(parts, N); - } - - const Slice* parts; -}; - -Slice InvertedDocHt(const Slice& input, size_t* buffer) { - memcpy(buffer, input.data(), input.size()); - for (size_t i = 0; i != kMaxWordsPerEncodedHybridTimeWithValueType; ++i) { - buffer[i] = ~buffer[i]; - } - return {pointer_cast(buffer), input.size()}; -} - -// Main intent data:: -// Prefix + DocPath + IntentType + DocHybridTime -> TxnId + value of the intent -// Reverse index by txn id: -// Prefix + TxnId + DocHybridTime -> Main intent data key -// -// Expects that last entry of key is DocHybridTime. -template -void AddIntent( - const TransactionId& transaction_id, - const FixedSliceParts& key, - const SliceParts& value, - rocksdb::WriteBatch* rocksdb_write_batch, - Slice reverse_value_prefix = Slice()) { - char reverse_key_prefix[1] = { ValueTypeAsChar::kTransactionId }; - size_t doc_ht_buffer[kMaxWordsPerEncodedHybridTimeWithValueType]; - auto doc_ht_slice = InvertedDocHt(key.parts[N - 1], doc_ht_buffer); - - std::array reverse_key = {{ - Slice(reverse_key_prefix, sizeof(reverse_key_prefix)), - transaction_id.AsSlice(), - doc_ht_slice, - }}; - rocksdb_write_batch->Put(key, value); - if (reverse_value_prefix.empty()) { - rocksdb_write_batch->Put(reverse_key, key); - } else { - std::array reverse_value; - reverse_value[0] = reverse_value_prefix; - memcpy(&reverse_value[1], key.parts, sizeof(*key.parts) * N); - rocksdb_write_batch->Put(reverse_key, reverse_value); - } -} - // key should be valid prefix of doc key, ending with some complete pritimive value or group end. CHECKED_STATUS ApplyIntent(RefCntPrefix key, const IntentTypeSet intent_types, @@ -295,25 +239,6 @@ void FilterKeysToLock(LockBatchEntries *keys_locked) { keys_locked->erase(w, keys_locked->end()); } -// Buffer for encoding DocHybridTime -class DocHybridTimeBuffer { - public: - DocHybridTimeBuffer() { - buffer_[0] = ValueTypeAsChar::kHybridTime; - } - - Slice EncodeWithValueType(const DocHybridTime& doc_ht) { - auto end = doc_ht.EncodedInDocDbFormat(buffer_.data() + 1); - return Slice(buffer_.data(), end); - } - - Slice EncodeWithValueType(HybridTime ht, IntraTxnWriteId write_id) { - return EncodeWithValueType(DocHybridTime(ht, write_id)); - } - private: - std::array buffer_; -}; - } // namespace Result PrepareDocWriteOperation( @@ -534,7 +459,7 @@ ExternalTxnApplyState ProcessApplyExternalTransactions(const KeyValueWriteBatchP } // namespace -void AddPairToWriteBatch( +bool AddExternalPairToWriteBatch( const KeyValuePairPB& kv_pair, HybridTime hybrid_time, int write_id, @@ -542,24 +467,14 @@ void AddPairToWriteBatch( rocksdb::WriteBatch* regular_write_batch, rocksdb::WriteBatch* intents_write_batch) { DocHybridTimeBuffer doc_ht_buffer; - size_t inverted_doc_ht_buffer[kMaxWordsPerEncodedHybridTimeWithValueType]; + DocHybridTimeWordBuffer inverted_doc_ht_buffer; CHECK(!kv_pair.key().empty()); CHECK(!kv_pair.value().empty()); - bool regular_entry = kv_pair.key()[0] != ValueTypeAsChar::kExternalTransactionId; - -#ifndef NDEBUG - // Debug-only: ensure all keys we get in Raft replication can be decoded. - if (regular_entry) { - SubDocKey subdoc_key; - Status s = subdoc_key.FullyDecodeFromKeyWithOptionalHybridTime(kv_pair.key()); - CHECK(s.ok()) - << "Failed decoding key: " << s.ToString() << "; " - << "Problematic key: " << BestEffortDocDBKeyToStr(KeyBytes(kv_pair.key())) << "\n" - << "value: " << FormatBytesAsStr(kv_pair.value()); + if (kv_pair.key()[0] != ValueTypeAsChar::kExternalTransactionId) { + return true; } -#endif // We replicate encoded SubDocKeys without a HybridTime at the end, and only append it here. // The reason for this is that the HybridTime timestamp is only picked at the time of @@ -578,26 +493,22 @@ void AddPairToWriteBatch( doc_ht_buffer.EncodeWithValueType(hybrid_time, write_id), }}; Slice key_value = kv_pair.value(); - rocksdb::WriteBatch* batch; - if (regular_entry) { - batch = regular_write_batch; - } else { - // This entry contains external intents. - Slice key = kv_pair.key(); - key.consume_byte(); - auto txn_id = CHECK_RESULT(DecodeTransactionId(&key)); - auto it = apply_external_transactions->find(txn_id); - if (it != apply_external_transactions->end()) { - // The same write operation could contain external intents and instruct us to apply them. - CHECK_OK(PrepareApplyExternalIntentsBatch( - it->second.commit_ht, key_value, regular_write_batch, &it->second.write_id)); - return; - } - batch = intents_write_batch; - key_parts[1] = InvertedDocHt(key_parts[1], inverted_doc_ht_buffer); + // This entry contains external intents. + Slice key = kv_pair.key(); + key.consume_byte(); + auto txn_id = CHECK_RESULT(DecodeTransactionId(&key)); + auto it = apply_external_transactions->find(txn_id); + if (it != apply_external_transactions->end()) { + // The same write operation could contain external intents and instruct us to apply them. + CHECK_OK(PrepareApplyExternalIntentsBatch( + it->second.commit_ht, key_value, regular_write_batch, &it->second.write_id)); + return false; } + key_parts[1] = InvertEncodedDocHT(key_parts[1], &inverted_doc_ht_buffer); constexpr size_t kNumValueParts = 1; - batch->Put(key_parts, { &key_value, kNumValueParts }); + intents_write_batch->Put(key_parts, { &key_value, kNumValueParts }); + + return false; } // Usually put_batch contains only records that should be applied to regular DB. @@ -617,7 +528,7 @@ void AddPairToWriteBatch( // But if apply_external_transactions contains transaction for those external intents, then // those intents will be applied directly to regular DB, avoiding unnecessary write to intents DB. // This case is very common for short running transactions. -void PrepareNonTransactionWriteBatch( +bool PrepareExternalWriteBatch( const KeyValueWriteBatchPB& put_batch, HybridTime hybrid_time, rocksdb::DB* intents_db, @@ -630,11 +541,13 @@ void PrepareNonTransactionWriteBatch( CHECK_OK(PrepareApplyExternalIntents( &apply_external_transactions, regular_write_batch, intents_db, intents_write_batch)); + bool has_non_external_kvs = false; for (int write_id = 0; write_id < put_batch.write_pairs_size(); ++write_id) { - AddPairToWriteBatch( + has_non_external_kvs = AddExternalPairToWriteBatch( put_batch.write_pairs(write_id), hybrid_time, write_id, &apply_external_transactions, - regular_write_batch, intents_write_batch); + regular_write_batch, intents_write_batch) || has_non_external_kvs; } + return has_non_external_kvs; } namespace { @@ -831,207 +744,6 @@ Status EnumerateIntents( return Status::OK(); } -class PrepareTransactionWriteBatchHelper { - public: - PrepareTransactionWriteBatchHelper(const PrepareTransactionWriteBatchHelper&) = delete; - void operator=(const PrepareTransactionWriteBatchHelper&) = delete; - - // `rocksdb_write_batch` - in-out parameter is filled by this prepare. - PrepareTransactionWriteBatchHelper(HybridTime hybrid_time, - rocksdb::WriteBatch* rocksdb_write_batch, - const TransactionId& transaction_id, - const SubTransactionId subtransaction_id, - const Slice& replicated_batches_state, - IntraTxnWriteId* intra_txn_write_id) - : hybrid_time_(hybrid_time), - rocksdb_write_batch_(rocksdb_write_batch), - transaction_id_(transaction_id), - subtransaction_id_(subtransaction_id), - replicated_batches_state_(replicated_batches_state), - intra_txn_write_id_(intra_txn_write_id) { - } - - void Setup( - IsolationLevel isolation_level, - OperationKind kind, - RowMarkType row_mark) { - row_mark_ = row_mark; - strong_intent_types_ = GetStrongIntentTypeSet(isolation_level, kind, row_mark); - } - - // Using operator() to pass this object conveniently to EnumerateIntents. - CHECKED_STATUS operator()(IntentStrength intent_strength, FullDocKey, Slice value_slice, - KeyBytes* key, LastKey last_key) { - if (intent_strength == IntentStrength::kWeak) { - weak_intents_[key->data()] |= StrongToWeak(strong_intent_types_); - return Status::OK(); - } - - const auto transaction_value_type = ValueTypeAsChar::kTransactionId; - const auto write_id_value_type = ValueTypeAsChar::kWriteId; - const auto row_lock_value_type = ValueTypeAsChar::kRowLock; - IntraTxnWriteId big_endian_write_id = BigEndian::FromHost32(*intra_txn_write_id_); - - const auto subtransaction_value_type = ValueTypeAsChar::kSubTransactionId; - SubTransactionId big_endian_subtxn_id; - Slice subtransaction_marker; - Slice subtransaction_id; - if (subtransaction_id_ > kMinSubTransactionId) { - subtransaction_marker = Slice(&subtransaction_value_type, 1); - big_endian_subtxn_id = BigEndian::FromHost32(subtransaction_id_); - subtransaction_id = Slice::FromPod(&big_endian_subtxn_id); - } else { - DCHECK_EQ(subtransaction_id_, kMinSubTransactionId); - } - - std::array value = {{ - Slice(&transaction_value_type, 1), - transaction_id_.AsSlice(), - subtransaction_marker, - subtransaction_id, - Slice(&write_id_value_type, 1), - Slice::FromPod(&big_endian_write_id), - value_slice, - }}; - // Store a row lock indicator rather than data (in value_slice) for row lock intents. - if (IsValidRowMarkType(row_mark_)) { - value.back() = Slice(&row_lock_value_type, 1); - } - - ++*intra_txn_write_id_; - - char intent_type[2] = { ValueTypeAsChar::kIntentTypeSet, - static_cast(strong_intent_types_.ToUIntPtr()) }; - - DocHybridTimeBuffer doc_ht_buffer; - - constexpr size_t kNumKeyParts = 3; - std::array key_parts = {{ - key->AsSlice(), - Slice(intent_type, 2), - doc_ht_buffer.EncodeWithValueType(hybrid_time_, write_id_++), - }}; - - Slice reverse_value_prefix; - if (last_key && FLAGS_enable_transaction_sealing) { - reverse_value_prefix = replicated_batches_state_; - } - AddIntent( - transaction_id_, key_parts, value, rocksdb_write_batch_, reverse_value_prefix); - - return Status::OK(); - } - - void Finish() { - char transaction_id_value_type = ValueTypeAsChar::kTransactionId; - - DocHybridTimeBuffer doc_ht_buffer; - - std::array value = {{ - Slice(&transaction_id_value_type, 1), - transaction_id_.AsSlice(), - }}; - - if (PREDICT_FALSE(FLAGS_TEST_docdb_sort_weak_intents)) { - // This is done in tests when deterministic DocDB state is required. - std::vector> intents_and_types( - weak_intents_.begin(), weak_intents_.end()); - sort(intents_and_types.begin(), intents_and_types.end()); - for (const auto& intent_and_types : intents_and_types) { - AddWeakIntent(intent_and_types, value, &doc_ht_buffer); - } - return; - } - - for (const auto& intent_and_types : weak_intents_) { - AddWeakIntent(intent_and_types, value, &doc_ht_buffer); - } - } - - private: - void AddWeakIntent( - const std::pair& intent_and_types, - const std::array& value, - DocHybridTimeBuffer* doc_ht_buffer) { - char intent_type[2] = { ValueTypeAsChar::kIntentTypeSet, - static_cast(intent_and_types.second.ToUIntPtr()) }; - constexpr size_t kNumKeyParts = 3; - std::array key = {{ - intent_and_types.first.AsSlice(), - Slice(intent_type, 2), - doc_ht_buffer->EncodeWithValueType(hybrid_time_, write_id_++), - }}; - - AddIntent(transaction_id_, key, value, rocksdb_write_batch_); - } - - // TODO(dtxn) weak & strong intent in one batch. - // TODO(dtxn) extract part of code knowning about intents structure to lower level. - RowMarkType row_mark_; - HybridTime hybrid_time_; - rocksdb::WriteBatch* rocksdb_write_batch_; - const TransactionId& transaction_id_; - const SubTransactionId subtransaction_id_; - Slice replicated_batches_state_; - IntentTypeSet strong_intent_types_; - std::unordered_map weak_intents_; - IntraTxnWriteId write_id_ = 0; - IntraTxnWriteId* intra_txn_write_id_; -}; - -// We have the following distinct types of data in this "intent store": -// Main intent data: -// Prefix + SubDocKey (no HybridTime) + IntentType + HybridTime -> TxnId + value of the intent -// Transaction metadata -// TxnId -> status tablet id + isolation level -// Reverse index by txn id -// TxnId + HybridTime -> Main intent data key -// -// Where prefix is just a single byte prefix. TxnId, IntentType, HybridTime all prefixed with -// appropriate value type. -void PrepareTransactionWriteBatch( - const KeyValueWriteBatchPB& put_batch, - HybridTime hybrid_time, - rocksdb::WriteBatch* rocksdb_write_batch, - const TransactionId& transaction_id, - IsolationLevel isolation_level, - PartialRangeKeyIntents partial_range_key_intents, - const Slice& replicated_batches_state, - IntraTxnWriteId* write_id) { - VLOG(4) << "PrepareTransactionWriteBatch(), write_id = " << *write_id; - - RowMarkType row_mark = GetRowMarkTypeFromPB(put_batch); - - auto subtransaction_id = put_batch.has_subtransaction() - ? put_batch.subtransaction().subtransaction_id() - : kMinSubTransactionId; - - PrepareTransactionWriteBatchHelper helper( - hybrid_time, rocksdb_write_batch, transaction_id, subtransaction_id, replicated_batches_state, - write_id); - - if (!put_batch.write_pairs().empty()) { - if (IsValidRowMarkType(row_mark)) { - LOG(WARNING) << "Performing a write with row lock " - << RowMarkType_Name(row_mark) - << " when only reads are expected"; - } - helper.Setup(isolation_level, OperationKind::kWrite, row_mark); - - // We cannot recover from failures here, because it means that we cannot apply replicated - // operation. - CHECK_OK(EnumerateIntents(put_batch.write_pairs(), std::ref(helper), - partial_range_key_intents)); - } - - if (!put_batch.read_pairs().empty()) { - helper.Setup(isolation_level, OperationKind::kRead, row_mark); - CHECK_OK(EnumerateIntents(put_batch.read_pairs(), std::ref(helper), partial_range_key_intents)); - } - - helper.Finish(); -} - // ------------------------------------------------------------------------------------------------ // Standalone functions // ------------------------------------------------------------------------------------------------ diff --git a/src/yb/docdb/docdb.h b/src/yb/docdb/docdb.h index cf0643f6a912..a23f67f4c341 100644 --- a/src/yb/docdb/docdb.h +++ b/src/yb/docdb/docdb.h @@ -36,7 +36,7 @@ #include "yb/docdb/subdocument.h" #include "yb/docdb/value.h" -#include "yb/rocksdb/db.h" +#include "yb/rocksdb/rocksdb_fwd.h" #include "yb/util/result.h" #include "yb/util/strongly_typed_bool.h" @@ -146,7 +146,9 @@ struct ExternalTxnApplyStateData { using ExternalTxnApplyState = std::map; -void AddPairToWriteBatch( +// Adds external pair to write batch. +// Returns true if add was skipped because pair is a regular (non external) record. +bool AddExternalPairToWriteBatch( const KeyValuePairPB& kv_pair, HybridTime hybrid_time, int write_id, @@ -154,10 +156,12 @@ void AddPairToWriteBatch( rocksdb::WriteBatch* regular_write_batch, rocksdb::WriteBatch* intents_write_batch); -// Prepares non transaction write batch. +// Prepares external part of non transaction write batch. // Batch could contain intents for external transactions, in this case those intents // will be added to intents_write_batch. -void PrepareNonTransactionWriteBatch( +// +// Returns true if batch contains regular entries. +bool PrepareExternalWriteBatch( const docdb::KeyValueWriteBatchPB& put_batch, HybridTime hybrid_time, rocksdb::DB* intents_db, @@ -198,18 +202,6 @@ CHECKED_STATUS EnumerateIntents( KeyBytes* encoded_key_buffer, PartialRangeKeyIntents partial_range_key_intents, LastKey last_key = LastKey::kFalse); -// replicated_batches_state format does not matter at this point, because it is just -// appended to appropriate value. -void PrepareTransactionWriteBatch( - const docdb::KeyValueWriteBatchPB& put_batch, - HybridTime hybrid_time, - rocksdb::WriteBatch* rocksdb_write_batch, - const TransactionId& transaction_id, - IsolationLevel isolation_level, - PartialRangeKeyIntents partial_range_key_intents, - const Slice& replicated_batches_state, - IntraTxnWriteId* write_id); - // See ApplyTransactionStatePB for details. struct ApplyTransactionState { std::string key; diff --git a/src/yb/docdb/docdb_debug.cc b/src/yb/docdb/docdb_debug.cc index 3a3dbc66a155..d095fb6e2774 100644 --- a/src/yb/docdb/docdb_debug.cc +++ b/src/yb/docdb/docdb_debug.cc @@ -84,8 +84,7 @@ void DocDBDebugDump(rocksdb::DB* rocksdb, ostream& out, StorageDbType db_type, DocDBDebugDump(rocksdb, db_type, include_binary, std::bind(&AppendLineToStream, _1, &out)); } -std::string DocDBDebugDumpToStr( - DocDB docdb, IncludeBinary include_binary) { +std::string DocDBDebugDumpToStr(DocDB docdb, IncludeBinary include_binary) { std::stringstream ss; DocDBDebugDump(docdb.regular, ss, StorageDbType::kRegular, include_binary); if (docdb.intents) { diff --git a/src/yb/docdb/docdb_util.cc b/src/yb/docdb/docdb_util.cc index 3e424e9cf990..ee8864de0e44 100644 --- a/src/yb/docdb/docdb_util.cc +++ b/src/yb/docdb/docdb_util.cc @@ -18,6 +18,7 @@ #include "yb/docdb/docdb.h" #include "yb/docdb/docdb_debug.h" #include "yb/docdb/docdb_rocksdb_util.h" +#include "yb/docdb/rocksdb_writer.h" #include "yb/rocksutil/write_batch_formatter.h" #include "yb/rocksutil/yb_rocksdb.h" @@ -91,9 +92,26 @@ void DocDBRocksDBUtil::ResetMonotonicCounter() { monotonic_counter_.store(0); } +namespace { + +class DirectWriteToWriteBatchHandler : public rocksdb::DirectWriteHandler { + public: + explicit DirectWriteToWriteBatchHandler(rocksdb::WriteBatch *write_batch) + : write_batch_(write_batch) {} + + void Put(const SliceParts& key, const SliceParts& value) override { + write_batch_->Put(key, value); + } + + private: + rocksdb::WriteBatch *write_batch_; +}; + +} // namespace + Status DocDBRocksDBUtil::PopulateRocksDBWriteBatch( const DocWriteBatch& dwb, - rocksdb::WriteBatch *rocksdb_write_batch, + rocksdb::WriteBatch* rocksdb_write_batch, HybridTime hybrid_time, bool decode_dockey, bool increment_write_id, @@ -119,11 +137,14 @@ Status DocDBRocksDBUtil::PopulateRocksDBWriteBatch( } KeyValueWriteBatchPB kv_write_batch; dwb.TEST_CopyToWriteBatchPB(&kv_write_batch); - PrepareTransactionWriteBatch( - kv_write_batch, hybrid_time, rocksdb_write_batch, *current_txn_id_, txn_isolation_level_, - partial_range_key_intents, /* replicated_batches_state= */ Slice(), &intra_txn_write_id_); + TransactionalWriter writer( + kv_write_batch, hybrid_time, *current_txn_id_, txn_isolation_level_, + partial_range_key_intents, /* replicated_batches_state= */ Slice(), intra_txn_write_id_); + DirectWriteToWriteBatchHandler handler(rocksdb_write_batch); + RETURN_NOT_OK(writer.Apply(&handler)); + intra_txn_write_id_ = writer.intra_txn_write_id(); } else { - // TODO: this block has common code with docdb::PrepareNonTransactionWriteBatch and probably + // TODO: this block has common code with docdb::PrepareExternalWriteBatch and probably // can be refactored, so common code is reused. IntraTxnWriteId write_id = 0; for (const auto& entry : dwb.key_value_pairs()) { @@ -290,7 +311,9 @@ Status DocDBRocksDBUtil::AddExternalIntents( kv_pair.set_key(key_.ToStringBuffer()); kv_pair.set_value(value_.ToStringBuffer()); ExternalTxnApplyState external_txn_apply_state; - AddPairToWriteBatch(kv_pair, hybrid_time_, 0, &external_txn_apply_state, nullptr, batch); + AddExternalPairToWriteBatch( + kv_pair, hybrid_time_, /* write_id= */ 0, &external_txn_apply_state, + /* regular_write_batch= */ nullptr, batch); } boost::optional> Next() override { diff --git a/src/yb/docdb/rocksdb_writer.cc b/src/yb/docdb/rocksdb_writer.cc new file mode 100644 index 000000000000..148d8364dc46 --- /dev/null +++ b/src/yb/docdb/rocksdb_writer.cc @@ -0,0 +1,332 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/docdb/rocksdb_writer.h" + +#include "yb/common/row_mark.h" + +#include "yb/docdb/docdb.pb.h" +#include "yb/docdb/docdb-internal.h" +#include "yb/docdb/doc_key.h" +#include "yb/docdb/doc_kv_util.h" +#include "yb/docdb/value_type.h" + +#include "yb/gutil/walltime.h" + +#include "yb/util/flag_tags.h" + +DEFINE_bool(enable_transaction_sealing, false, + "Whether transaction sealing is enabled."); +DEFINE_test_flag(bool, docdb_sort_weak_intents, false, + "Sort weak intents to make their order deterministic."); + +namespace yb { +namespace docdb { + +namespace { + +// Slice parts with the number of slices fixed at compile time. +template +struct FixedSliceParts { + FixedSliceParts(const std::array& input) : parts(input.data()) { // NOLINT + } + + operator SliceParts() const { + return SliceParts(parts, N); + } + + const Slice* parts; +}; + +// Main intent data:: +// Prefix + DocPath + IntentType + DocHybridTime -> TxnId + value of the intent +// Reverse index by txn id: +// Prefix + TxnId + DocHybridTime -> Main intent data key +// +// Expects that last entry of key is DocHybridTime. +template +void AddIntent( + const TransactionId& transaction_id, + const FixedSliceParts& key, + const SliceParts& value, + rocksdb::DirectWriteHandler* handler, + Slice reverse_value_prefix = Slice()) { + char reverse_key_prefix[1] = { ValueTypeAsChar::kTransactionId }; + DocHybridTimeWordBuffer doc_ht_buffer; + auto doc_ht_slice = InvertEncodedDocHT(key.parts[N - 1], &doc_ht_buffer); + + std::array reverse_key = {{ + Slice(reverse_key_prefix, sizeof(reverse_key_prefix)), + transaction_id.AsSlice(), + doc_ht_slice, + }}; + handler->Put(key, value); + if (reverse_value_prefix.empty()) { + handler->Put(reverse_key, key); + } else { + std::array reverse_value; + reverse_value[0] = reverse_value_prefix; + memcpy(&reverse_value[1], key.parts, sizeof(*key.parts) * N); + handler->Put(reverse_key, reverse_value); + } +} + +} // namespace + +NonTransactionalWriter::NonTransactionalWriter( + std::reference_wrapper put_batch, HybridTime hybrid_time) + : put_batch_(put_batch), hybrid_time_(hybrid_time) { +} + +bool NonTransactionalWriter::Empty() const { + return put_batch_.write_pairs().empty(); +} + +Status NonTransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) { + DocHybridTimeBuffer doc_ht_buffer; + + int write_id = 0; + for (const auto& kv_pair : put_batch_.write_pairs()) { + + CHECK(!kv_pair.key().empty()); + CHECK(!kv_pair.value().empty()); + + if (kv_pair.key()[0] == ValueTypeAsChar::kExternalTransactionId) { + continue; + } + +#ifndef NDEBUG + // Debug-only: ensure all keys we get in Raft replication can be decoded. + SubDocKey subdoc_key; + Status s = subdoc_key.FullyDecodeFromKeyWithOptionalHybridTime(kv_pair.key()); + CHECK(s.ok()) + << "Failed decoding key: " << s.ToString() << "; " + << "Problematic key: " << BestEffortDocDBKeyToStr(KeyBytes(kv_pair.key())) << "\n" + << "value: " << FormatBytesAsStr(kv_pair.value()); +#endif + + // We replicate encoded SubDocKeys without a HybridTime at the end, and only append it here. + // The reason for this is that the HybridTime timestamp is only picked at the time of + // appending an entry to the tablet's Raft log. Also this is a good way to save network + // bandwidth. + // + // "Write id" is the final component of our HybridTime encoding (or, to be more precise, + // DocHybridTime encoding) that helps disambiguate between different updates to the + // same key (row/column) within a transaction. We set it based on the position of the write + // operation in its write batch. + + auto hybrid_time = kv_pair.has_external_hybrid_time() ? + HybridTime(kv_pair.external_hybrid_time()) : hybrid_time_; + std::array key_parts = {{ + Slice(kv_pair.key()), + doc_ht_buffer.EncodeWithValueType(hybrid_time, write_id), + }}; + Slice key_value = kv_pair.value(); + handler->Put(key_parts, SliceParts(&key_value, 1)); + + ++write_id; + } + + return Status::OK(); +} + +TransactionalWriter::TransactionalWriter( + std::reference_wrapper put_batch, + HybridTime hybrid_time, + const TransactionId& transaction_id, + IsolationLevel isolation_level, + PartialRangeKeyIntents partial_range_key_intents, + const Slice& replicated_batches_state, + IntraTxnWriteId intra_txn_write_id) + : put_batch_(put_batch), + hybrid_time_(hybrid_time), + transaction_id_(transaction_id), + isolation_level_(isolation_level), + partial_range_key_intents_(partial_range_key_intents), + replicated_batches_state_(replicated_batches_state), + intra_txn_write_id_(intra_txn_write_id) { +} + +// We have the following distinct types of data in this "intent store": +// Main intent data: +// Prefix + SubDocKey (no HybridTime) + IntentType + HybridTime -> TxnId + value of the intent +// Transaction metadata +// TxnId -> status tablet id + isolation level +// Reverse index by txn id +// TxnId + HybridTime -> Main intent data key +// +// Where prefix is just a single byte prefix. TxnId, IntentType, HybridTime all prefixed with +// appropriate value type. +CHECKED_STATUS TransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) { + VLOG(4) << "PrepareTransactionWriteBatch(), write_id = " << write_id_; + + row_mark_ = GetRowMarkTypeFromPB(put_batch_); + handler_ = handler; + + if (metadata_to_store_) { + auto txn_value_type = ValueTypeAsChar::kTransactionId; + std::array key = { + Slice(&txn_value_type, 1), + transaction_id_.AsSlice(), + }; + auto data_copy = *metadata_to_store_; + // We use hybrid time only for backward compatibility, actually wall time is required. + data_copy.set_metadata_write_time(GetCurrentTimeMicros()); + auto value = data_copy.SerializeAsString(); + Slice value_slice(value); + handler->Put(key, SliceParts(&value_slice, 1)); + } + + subtransaction_id_ = put_batch_.has_subtransaction() + ? put_batch_.subtransaction().subtransaction_id() + : kMinSubTransactionId; + + if (!put_batch_.write_pairs().empty()) { + if (IsValidRowMarkType(row_mark_)) { + LOG(WARNING) << "Performing a write with row lock " << RowMarkType_Name(row_mark_) + << " when only reads are expected"; + } + strong_intent_types_ = GetStrongIntentTypeSet( + isolation_level_, OperationKind::kWrite, row_mark_); + + // We cannot recover from failures here, because it means that we cannot apply replicated + // operation. + RETURN_NOT_OK(EnumerateIntents( + put_batch_.write_pairs(), std::ref(*this), partial_range_key_intents_)); + } + + if (!put_batch_.read_pairs().empty()) { + strong_intent_types_ = GetStrongIntentTypeSet( + isolation_level_, OperationKind::kRead, row_mark_); + RETURN_NOT_OK(EnumerateIntents( + put_batch_.read_pairs(), std::ref(*this), partial_range_key_intents_)); + } + + return Finish(); +} + +// Using operator() to pass this object conveniently to EnumerateIntents. +CHECKED_STATUS TransactionalWriter::operator()( + IntentStrength intent_strength, FullDocKey, Slice value_slice, KeyBytes* key, + LastKey last_key) { + if (intent_strength == IntentStrength::kWeak) { + weak_intents_[key->data()] |= StrongToWeak(strong_intent_types_); + return Status::OK(); + } + + const auto transaction_value_type = ValueTypeAsChar::kTransactionId; + const auto write_id_value_type = ValueTypeAsChar::kWriteId; + const auto row_lock_value_type = ValueTypeAsChar::kRowLock; + IntraTxnWriteId big_endian_write_id = BigEndian::FromHost32(intra_txn_write_id_); + + const auto subtransaction_value_type = ValueTypeAsChar::kSubTransactionId; + SubTransactionId big_endian_subtxn_id; + Slice subtransaction_marker; + Slice subtransaction_id; + if (subtransaction_id_ > kMinSubTransactionId) { + subtransaction_marker = Slice(&subtransaction_value_type, 1); + big_endian_subtxn_id = BigEndian::FromHost32(subtransaction_id_); + subtransaction_id = Slice::FromPod(&big_endian_subtxn_id); + } else { + DCHECK_EQ(subtransaction_id_, kMinSubTransactionId); + } + + std::array value = {{ + Slice(&transaction_value_type, 1), + transaction_id_.AsSlice(), + subtransaction_marker, + subtransaction_id, + Slice(&write_id_value_type, 1), + Slice::FromPod(&big_endian_write_id), + value_slice, + }}; + // Store a row lock indicator rather than data (in value_slice) for row lock intents. + if (IsValidRowMarkType(row_mark_)) { + value.back() = Slice(&row_lock_value_type, 1); + } + + ++intra_txn_write_id_; + + char intent_type[2] = { ValueTypeAsChar::kIntentTypeSet, + static_cast(strong_intent_types_.ToUIntPtr()) }; + + DocHybridTimeBuffer doc_ht_buffer; + + constexpr size_t kNumKeyParts = 3; + std::array key_parts = {{ + key->AsSlice(), + Slice(intent_type, 2), + doc_ht_buffer.EncodeWithValueType(hybrid_time_, write_id_++), + }}; + + Slice reverse_value_prefix; + if (last_key && FLAGS_enable_transaction_sealing) { + reverse_value_prefix = replicated_batches_state_; + } + AddIntent(transaction_id_, key_parts, value, handler_, reverse_value_prefix); + + return Status::OK(); +} + +CHECKED_STATUS TransactionalWriter::Finish() { + char transaction_id_value_type = ValueTypeAsChar::kTransactionId; + + DocHybridTimeBuffer doc_ht_buffer; + + std::array value = {{ + Slice(&transaction_id_value_type, 1), + transaction_id_.AsSlice(), + }}; + + if (PREDICT_FALSE(FLAGS_TEST_docdb_sort_weak_intents)) { + // This is done in tests when deterministic DocDB state is required. + std::vector> intents_and_types( + weak_intents_.begin(), weak_intents_.end()); + sort(intents_and_types.begin(), intents_and_types.end()); + for (const auto& intent_and_types : intents_and_types) { + RETURN_NOT_OK(AddWeakIntent(intent_and_types, value, &doc_ht_buffer)); + } + return Status::OK(); + } + + for (const auto& intent_and_types : weak_intents_) { + RETURN_NOT_OK(AddWeakIntent(intent_and_types, value, &doc_ht_buffer)); + } + + return Status::OK(); +} + +CHECKED_STATUS TransactionalWriter::AddWeakIntent( + const std::pair& intent_and_types, + const std::array& value, + DocHybridTimeBuffer* doc_ht_buffer) { + char intent_type[2] = { ValueTypeAsChar::kIntentTypeSet, + static_cast(intent_and_types.second.ToUIntPtr()) }; + constexpr size_t kNumKeyParts = 3; + std::array key = {{ + intent_and_types.first.AsSlice(), + Slice(intent_type, 2), + doc_ht_buffer->EncodeWithValueType(hybrid_time_, write_id_++), + }}; + + AddIntent(transaction_id_, key, value, handler_); + + return Status::OK(); +} + +DocHybridTimeBuffer::DocHybridTimeBuffer() { + buffer_[0] = ValueTypeAsChar::kHybridTime; +} + +} // namespace docdb +} // namespace yb diff --git a/src/yb/docdb/rocksdb_writer.h b/src/yb/docdb/rocksdb_writer.h new file mode 100644 index 000000000000..c8b2c5e68499 --- /dev/null +++ b/src/yb/docdb/rocksdb_writer.h @@ -0,0 +1,117 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#ifndef YB_DOCDB_ROCKSDB_WRITER_H +#define YB_DOCDB_ROCKSDB_WRITER_H + +#include "yb/common/doc_hybrid_time.h" +#include "yb/common/hybrid_time.h" +#include "yb/common/transaction.h" + +#include "yb/docdb/docdb.h" +#include "yb/docdb/docdb.fwd.h" +#include "yb/docdb/docdb_fwd.h" +#include "yb/docdb/intent.h" + +#include "yb/rocksdb/write_batch.h" + +namespace yb { +namespace docdb { + +class NonTransactionalWriter : public rocksdb::DirectWriter { + public: + NonTransactionalWriter( + std::reference_wrapper put_batch, HybridTime hybrid_time); + + bool Empty() const; + + CHECKED_STATUS Apply(rocksdb::DirectWriteHandler* handler) override; + + private: + const docdb::KeyValueWriteBatchPB& put_batch_; + HybridTime hybrid_time_; +}; + +// Buffer for encoding DocHybridTime +class DocHybridTimeBuffer { + public: + DocHybridTimeBuffer(); + + Slice EncodeWithValueType(const DocHybridTime& doc_ht) { + auto end = doc_ht.EncodedInDocDbFormat(buffer_.data() + 1); + return Slice(buffer_.data(), end); + } + + Slice EncodeWithValueType(HybridTime ht, IntraTxnWriteId write_id) { + return EncodeWithValueType(DocHybridTime(ht, write_id)); + } + private: + std::array buffer_; +}; + +class TransactionalWriter : public rocksdb::DirectWriter { + public: + TransactionalWriter( + std::reference_wrapper put_batch, + HybridTime hybrid_time, + const TransactionId& transaction_id, + IsolationLevel isolation_level, + PartialRangeKeyIntents partial_range_key_intents, + const Slice& replicated_batches_state, + IntraTxnWriteId intra_txn_write_id); + + CHECKED_STATUS Apply(rocksdb::DirectWriteHandler* handler) override; + + IntraTxnWriteId intra_txn_write_id() const { + return intra_txn_write_id_; + } + + void SetMetadataToStore(const TransactionMetadataPB* value) { + metadata_to_store_ = value; + } + + CHECKED_STATUS operator()( + IntentStrength intent_strength, FullDocKey, Slice value_slice, KeyBytes* key, + LastKey last_key); + + private: + CHECKED_STATUS Finish(); + CHECKED_STATUS AddWeakIntent( + const std::pair& intent_and_types, + const std::array& value, + DocHybridTimeBuffer* doc_ht_buffer); + + const docdb::KeyValueWriteBatchPB& put_batch_; + HybridTime hybrid_time_; + TransactionId transaction_id_; + IsolationLevel isolation_level_; + PartialRangeKeyIntents partial_range_key_intents_; + Slice replicated_batches_state_; + IntraTxnWriteId intra_txn_write_id_; + IntraTxnWriteId write_id_ = 0; + const TransactionMetadataPB* metadata_to_store_ = nullptr; + + // TODO(dtxn) weak & strong intent in one batch. + // TODO(dtxn) extract part of code knowing about intents structure to lower level. + // Handler is initialized in Apply method, and not used after apply returns. + rocksdb::DirectWriteHandler* handler_; + RowMarkType row_mark_; + SubTransactionId subtransaction_id_; + IntentTypeSet strong_intent_types_; + std::unordered_map weak_intents_; +}; + +} // namespace docdb +} // namespace yb + +#endif // YB_DOCDB_ROCKSDB_WRITER_H diff --git a/src/yb/rocksdb/db/db_impl.cc b/src/yb/rocksdb/db/db_impl.cc index a667c9bbf250..f493be2be698 100644 --- a/src/yb/rocksdb/db/db_impl.cc +++ b/src/yb/rocksdb/db/db_impl.cc @@ -5207,6 +5207,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // in case the write callback returned a non-ok status. status = w.FinalStatus(); } + for (const auto& writer : write_group) { + last_sequence += writer->batch->DirectEntries(); + } } else { WriteThread::ParallelGroup pg; diff --git a/src/yb/rocksdb/db/db_log_iter_test.cc b/src/yb/rocksdb/db/db_log_iter_test.cc index 7096495e895f..a968c2874e48 100644 --- a/src/yb/rocksdb/db/db_log_iter_test.cc +++ b/src/yb/rocksdb/db/db_log_iter_test.cc @@ -261,10 +261,10 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) { auto res = OpenTransactionLogIter(0)->GetBatch(); struct Handler : public WriteBatch::Handler { std::string seen; - virtual Status PutCF(uint32_t cf, const Slice& key, - const Slice& value) override { - seen += "Put(" + ToString(cf) + ", " + key.ToString() + ", " + - ToString(value.size()) + ")"; + virtual Status PutCF(uint32_t cf, const SliceParts& key, + const SliceParts& value) override { + seen += "Put(" + ToString(cf) + ", " + key.TheOnlyPart().ToString() + ", " + + ToString(value.TheOnlyPart().size()) + ")"; return Status::OK(); } virtual Status MergeCF(uint32_t cf, const Slice& key, diff --git a/src/yb/rocksdb/db/dbformat.cc b/src/yb/rocksdb/db/dbformat.cc index 55e8186054bd..9dff3479ad6c 100644 --- a/src/yb/rocksdb/db/dbformat.cc +++ b/src/yb/rocksdb/db/dbformat.cc @@ -45,12 +45,18 @@ uint64_t PackSequenceAndType(uint64_t seq, ValueType t) { return (seq << 8) | t; } -void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t) { - *seq = packed >> 8; - *t = static_cast(packed & 0xff); +SequenceAndType UnPackSequenceAndTypeFromEnd(const void* end) { + uint64_t packed; + memcpy(&packed, static_cast(end) - kLastInternalComponentSize, sizeof(packed)); + auto result = SequenceAndType { + .sequence = packed >> 8, + .type = static_cast(packed & 0xff), + }; - assert(*seq <= kMaxSequenceNumber); - assert(IsValueType(*t)); + assert(result.sequence <= kMaxSequenceNumber); + assert(IsValueType(result.type)); + + return result; } void AppendInternalKey(std::string* result, const ParsedInternalKey& key) { diff --git a/src/yb/rocksdb/db/dbformat.h b/src/yb/rocksdb/db/dbformat.h index 8534dee6a726..8873147f51b2 100644 --- a/src/yb/rocksdb/db/dbformat.h +++ b/src/yb/rocksdb/db/dbformat.h @@ -108,9 +108,12 @@ inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) { // Pack a sequence number and a ValueType into a uint64_t extern uint64_t PackSequenceAndType(uint64_t seq, ValueType t); -// Given the result of PackSequenceAndType, store the sequence number in *seq -// and the ValueType in *t. -extern void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t); +struct SequenceAndType { + SequenceNumber sequence; + ValueType type; +}; + +SequenceAndType UnPackSequenceAndTypeFromEnd(const void* end); // Append the serialization of "key" to *result. extern void AppendInternalKey(std::string* result, diff --git a/src/yb/rocksdb/db/flush_job_test.cc b/src/yb/rocksdb/db/flush_job_test.cc index 73d002dfba3a..771ab117ddc8 100644 --- a/src/yb/rocksdb/db/flush_job_test.cc +++ b/src/yb/rocksdb/db/flush_job_test.cc @@ -142,7 +142,10 @@ TEST_F(FlushJobTest, NonEmpty) { for (int i = 1; i < 10000; ++i) { std::string key(ToString((i + 1000) % 10000)); std::string value("value" + key); - new_mem->Add(SequenceNumber(i), kTypeValue, key, value); + Slice key_slice(key); + Slice value_slice(value); + new_mem->Add( + SequenceNumber(i), kTypeValue, SliceParts(&key_slice, 1), SliceParts(&value_slice, 1)); InternalKey internal_key(key, SequenceNumber(i), kTypeValue); inserted_keys.emplace(internal_key.Encode().ToBuffer(), value); values.Feed(key); @@ -202,11 +205,14 @@ TEST_F(FlushJobTest, Snapshots) { auto inserted_keys = mock::MakeMockFile(); for (int i = 1; i < keys; ++i) { std::string key(ToString(i)); + Slice key_slice(key); int insertions = rnd.Uniform(max_inserts_per_keys); for (int j = 0; j < insertions; ++j) { std::string value(test::RandomHumanReadableString(&rnd, 10)); + Slice value_slice(value); auto seqno = ++current_seqno; - new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value); + new_mem->Add(SequenceNumber(seqno), kTypeValue, SliceParts(&key_slice, 1), + SliceParts(&value_slice, 1)); // a key is visible only if: // 1. it's the last one written (j == insertions - 1) // 2. there's a snapshot pointing at it diff --git a/src/yb/rocksdb/db/memtable.cc b/src/yb/rocksdb/db/memtable.cc index 44fe5a74e8b1..95711df0c065 100644 --- a/src/yb/rocksdb/db/memtable.cc +++ b/src/yb/rocksdb/db/memtable.cc @@ -387,16 +387,24 @@ uint64_t MemTable::ApproximateSize(const Slice& start_ikey, return entry_count * (data_size / n); } -void MemTable::Add(SequenceNumber s, ValueType type, - const Slice& key, /* user key */ - const Slice& value, bool allow_concurrent) { +void MemTable::Add(SequenceNumber seq, ValueType type, const SliceParts& key, + const SliceParts& value, bool allow_concurrent) { + PreparedAdd prepared_add; + auto handle = PrepareAdd(seq, type, key, value, &prepared_add); + ApplyPreparedAdd(&handle, 1, prepared_add, allow_concurrent); +} + +KeyHandle MemTable::PrepareAdd(SequenceNumber s, ValueType type, + const SliceParts& key, + const SliceParts& value, + PreparedAdd* prepared_add) { // Format of an entry is concatenation of: // key_size : varint32 of internal_key.size() // key bytes : char[internal_key.size()] // value_size : varint32 of value.size() // value bytes : char[value.size()] - uint32_t key_size = static_cast(key.size()); - uint32_t val_size = static_cast(value.size()); + uint32_t key_size = static_cast(key.SumSizes()); + uint32_t val_size = static_cast(value.SumSizes()); uint32_t internal_key_size = key_size + 8; const uint32_t encoded_len = VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + @@ -405,39 +413,56 @@ void MemTable::Add(SequenceNumber s, ValueType type, KeyHandle handle = table_->Allocate(encoded_len, &buf); char* p = EncodeVarint32(buf, internal_key_size); - memcpy(p, key.data(), key_size); - p += key_size; + p = key.CopyAllTo(p); uint64_t packed = PackSequenceAndType(s, type); EncodeFixed64(p, packed); p += 8; p = EncodeVarint32(p, val_size); - memcpy(p, value.data(), val_size); - assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len); + p = value.CopyAllTo(p); + assert((unsigned)(p - buf) == (unsigned)encoded_len); + + if (prefix_bloom_) { + assert(prefix_extractor_); + prefix_bloom_->Add(prefix_extractor_->Transform(key.TheOnlyPart())); + } + + if (!prepared_add->min_seq_no) { + prepared_add->min_seq_no = s; + } + prepared_add->total_encoded_len += encoded_len; + if (type == ValueType::kTypeDeletion) { + ++prepared_add->num_deletes; + } + return handle; +} + +void MemTable::ApplyPreparedAdd( + const KeyHandle* handle, size_t count, const PreparedAdd& prepared_add, bool allow_concurrent) { if (!allow_concurrent) { - table_->Insert(handle); + for (const auto* end = handle + count; handle != end; ++handle) { + table_->Insert(*handle); + } // this is a bit ugly, but is the way to avoid locked instructions // when incrementing an atomic - num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1, + num_entries_.store(num_entries_.load(std::memory_order_relaxed) + count, std::memory_order_relaxed); - data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len, + data_size_.store(data_size_.load(std::memory_order_relaxed) + prepared_add.total_encoded_len, std::memory_order_relaxed); - if (type == kTypeDeletion) { - num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1, + if (prepared_add.num_deletes) { + num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + prepared_add.num_deletes, std::memory_order_relaxed); } - if (prefix_bloom_) { - assert(prefix_extractor_); - prefix_bloom_->Add(prefix_extractor_->Transform(key)); - } - // The first sequence number inserted into the memtable. // Multiple occurences of the same sequence number in the write batch are allowed // as long as they touch different keys. - assert(first_seqno_ == 0 || s >= first_seqno_); + DCHECK(first_seqno_ == 0 || prepared_add.min_seq_no >= first_seqno_) + << "first_seqno_: " << first_seqno_ << ", prepared_add.min_seq_no: " + << prepared_add.min_seq_no; + if (first_seqno_ == 0) { - first_seqno_.store(s, std::memory_order_relaxed); + first_seqno_.store(prepared_add.min_seq_no, std::memory_order_relaxed); if (earliest_seqno_ == kMaxSequenceNumber) { earliest_seqno_.store(GetFirstSequenceNumber(), @@ -446,29 +471,27 @@ void MemTable::Add(SequenceNumber s, ValueType type, DCHECK_GE(first_seqno_.load(), earliest_seqno_.load()); } } else { - table_->InsertConcurrently(handle); - - num_entries_.fetch_add(1, std::memory_order_relaxed); - data_size_.fetch_add(encoded_len, std::memory_order_relaxed); - if (type == kTypeDeletion) { - num_deletes_.fetch_add(1, std::memory_order_relaxed); + for (const auto* end = handle + count; handle != end; ++handle) { + table_->InsertConcurrently(*handle); } - if (prefix_bloom_) { - assert(prefix_extractor_); - prefix_bloom_->AddConcurrently(prefix_extractor_->Transform(key)); + num_entries_.fetch_add(count, std::memory_order_relaxed); + data_size_.fetch_add(prepared_add.total_encoded_len, std::memory_order_relaxed); + if (prepared_add.num_deletes) { + num_deletes_.fetch_add(prepared_add.num_deletes, std::memory_order_relaxed); } // atomically update first_seqno_ and earliest_seqno_. uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed); - while ((cur_seq_num == 0 || s < cur_seq_num) && - !first_seqno_.compare_exchange_weak(cur_seq_num, s)) { + while ((cur_seq_num == 0 || prepared_add.min_seq_no < cur_seq_num) && + !first_seqno_.compare_exchange_weak(cur_seq_num, prepared_add.min_seq_no)) { } uint64_t cur_earliest_seqno = earliest_seqno_.load(std::memory_order_relaxed); while ( - (cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) && - !first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) { + (cur_earliest_seqno == kMaxSequenceNumber || + prepared_add.min_seq_no < cur_earliest_seqno) && + !first_seqno_.compare_exchange_weak(cur_earliest_seqno, prepared_add.min_seq_no)) { } } @@ -601,11 +624,10 @@ static bool SaveValue(void* arg, const char* entry) { if (s->mem->GetInternalKeyComparator().user_comparator()->Equal( Slice(key_ptr, key_length - 8), s->key->user_key())) { // Correct user key - const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); - ValueType type; - UnPackSequenceAndType(tag, &s->seq, &type); + auto seq_and_type = UnPackSequenceAndTypeFromEnd(key_ptr + key_length); + s->seq = seq_and_type.sequence; - switch (type) { + switch (seq_and_type.type) { case kTypeValue: { if (s->inplace_update_support) { s->mem->GetLock(s->key->user_key())->ReadLock(); @@ -767,11 +789,8 @@ void MemTable::Update(SequenceNumber seq, if (comparator_.comparator.user_comparator()->Equal( Slice(key_ptr, key_length - 8), lkey.user_key())) { // Correct user key - const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); - ValueType type; - SequenceNumber unused; - UnPackSequenceAndType(tag, &unused, &type); - switch (type) { + auto seq_and_type = UnPackSequenceAndTypeFromEnd(key_ptr + key_length); + switch (seq_and_type.type) { case kTypeValue: { Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); uint32_t prev_size = static_cast(prev_value.size()); @@ -795,14 +814,14 @@ void MemTable::Update(SequenceNumber seq, default: // If the latest value is kTypeDeletion, kTypeMerge or kTypeLogData // we don't have enough space for update inplace - Add(seq, kTypeValue, key, value); + Add(seq, kTypeValue, SliceParts(&key, 1), SliceParts(&value, 1)); return; } } } // key doesn't exist - Add(seq, kTypeValue, key, value); + Add(seq, kTypeValue, SliceParts(&key, 1), SliceParts(&value, 1)); } bool MemTable::UpdateCallback(SequenceNumber seq, @@ -831,11 +850,7 @@ bool MemTable::UpdateCallback(SequenceNumber seq, if (comparator_.comparator.user_comparator()->Equal( Slice(key_ptr, key_length - 8), lkey.user_key())) { // Correct user key - const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); - ValueType type; - uint64_t unused; - UnPackSequenceAndType(tag, &unused, &type); - switch (type) { + switch (UnPackSequenceAndTypeFromEnd(key_ptr + key_length).type) { case kTypeValue: { Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); uint32_t prev_size = static_cast(prev_value.size()); @@ -863,7 +878,8 @@ bool MemTable::UpdateCallback(SequenceNumber seq, UpdateFlushState(); return true; } else if (status == UpdateStatus::UPDATED) { - Add(seq, kTypeValue, key, Slice(str_value)); + Slice value(str_value); + Add(seq, kTypeValue, SliceParts(&key, 1), SliceParts(&value, 1)); RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN); UpdateFlushState(); return true; @@ -905,11 +921,7 @@ size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) { break; } - const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8); - ValueType type; - uint64_t unused; - UnPackSequenceAndType(tag, &unused, &type); - if (type != kTypeMerge) { + if (UnPackSequenceAndTypeFromEnd(iter_key_ptr + key_length).type != kTypeMerge) { break; } diff --git a/src/yb/rocksdb/db/memtable.h b/src/yb/rocksdb/db/memtable.h index 37698b5612f0..3202c5e6cf7e 100644 --- a/src/yb/rocksdb/db/memtable.h +++ b/src/yb/rocksdb/db/memtable.h @@ -84,6 +84,12 @@ struct MemTableOptions { YB_DEFINE_ENUM(FlushState, (kNotRequested)(kRequested)(kScheduled)); +struct PreparedAdd { + SequenceNumber min_seq_no = 0; + size_t total_encoded_len = 0; + size_t num_deletes = 0; +}; + // Note: Many of the methods in this class have comments indicating that // external synchromization is required as these methods are not thread-safe. // It is up to higher layers of code to decide how to prevent concurrent @@ -186,8 +192,16 @@ class MemTable { // // REQUIRES: if allow_concurrent = false, external synchronization to prevent // simultaneous operations on the same MemTable. - void Add(SequenceNumber seq, ValueType type, const Slice& key, - const Slice& value, bool allow_concurrent = false); + void Add(SequenceNumber seq, ValueType type, const SliceParts& key, + const SliceParts& value, bool allow_concurrent = false); + + KeyHandle PrepareAdd( + SequenceNumber s, ValueType type, const SliceParts& key, const SliceParts& value, + PreparedAdd* prepared_add); + + void ApplyPreparedAdd( + const KeyHandle* handle, size_t count, const PreparedAdd& prepared_add, + bool allow_concurrent); // If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error diff --git a/src/yb/rocksdb/db/user_op_id_test.cc b/src/yb/rocksdb/db/user_op_id_test.cc index 8db094bdf524..e17e64de577c 100644 --- a/src/yb/rocksdb/db/user_op_id_test.cc +++ b/src/yb/rocksdb/db/user_op_id_test.cc @@ -32,11 +32,11 @@ namespace { struct UserOpIdTestHandler : public WriteBatch::Handler { Status PutCF( uint32_t column_family_id, - const Slice& key, - const Slice& value) override { + const SliceParts& key, + const SliceParts& value) override { StartOutputLine(__FUNCTION__); - OutputField("key", key); - OutputField("value", value); + OutputField("key", key.TheOnlyPart()); + OutputField("value", value.TheOnlyPart()); FinishOutputLine(); return Status::OK(); } diff --git a/src/yb/rocksdb/db/write_batch.cc b/src/yb/rocksdb/db/write_batch.cc index 2b7cb82e458d..4041d617e173 100644 --- a/src/yb/rocksdb/db/write_batch.cc +++ b/src/yb/rocksdb/db/write_batch.cc @@ -76,7 +76,7 @@ enum ContentFlags : uint32_t { struct BatchContentClassifier : public WriteBatch::Handler { uint32_t content_flags = 0; - CHECKED_STATUS PutCF(uint32_t, const Slice&, const Slice&) override { + CHECKED_STATUS PutCF(uint32_t, const SliceParts&, const SliceParts&) override { content_flags |= ContentFlags::HAS_PUT; return Status::OK(); } @@ -102,6 +102,35 @@ struct BatchContentClassifier : public WriteBatch::Handler { } }; +class DirectWriteHandlerImpl : public DirectWriteHandler { + public: + explicit DirectWriteHandlerImpl(MemTable* mem_table, SequenceNumber seq) + : mem_table_(mem_table), seq_(seq) {} + + void Put(const SliceParts& key, const SliceParts& value) override { + keys_.push_back( + mem_table_->PrepareAdd(seq_++, ValueType::kTypeValue, key, value, &prepared_add_)); + } + + size_t Complete() { + auto compare = + [comparator = &mem_table_->GetInternalKeyComparator()](KeyHandle lhs, KeyHandle rhs) { + auto lhs_slice = GetLengthPrefixedSlice(static_cast(lhs)); + auto rhs_slice = GetLengthPrefixedSlice(static_cast(rhs)); + return comparator->Compare(lhs_slice, rhs_slice) < 0; + }; + std::sort(keys_.begin(), keys_.end(), compare); + mem_table_->ApplyPreparedAdd(keys_.data(), keys_.size(), prepared_add_, false); + return keys_.size(); + } + + private: + MemTable* mem_table_; + SequenceNumber seq_; + PreparedAdd prepared_add_; + boost::container::small_vector keys_; +}; + } // anon namespace // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. @@ -278,6 +307,8 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag, return Status::OK(); } +Result DirectInsert(WriteBatch::Handler* handler, DirectWriter* writer); + Status WriteBatch::Iterate(Handler* handler) const { Slice input(rep_); if (input.size() < kHeader) { @@ -292,6 +323,14 @@ Status WriteBatch::Iterate(Handler* handler) const { if (frontiers_) { s = handler->Frontiers(*frontiers_); } + if (s.ok() && direct_writer_) { + auto result = DirectInsert(handler, direct_writer_); + if (result.ok()) { + direct_entries_ = *result; + } else { + s = result.status(); + } + } while (s.ok() && !input.empty() && handler->Continue()) { char tag = 0; uint32_t column_family = 0; // default @@ -307,7 +346,7 @@ Status WriteBatch::Iterate(Handler* handler) const { case kTypeValue: assert(content_flags_.load(std::memory_order_relaxed) & (ContentFlags::DEFERRED | ContentFlags::HAS_PUT)); - s = handler->PutCF(column_family, key, value); + s = handler->PutCF(column_family, SliceParts(&key, 1), SliceParts(&value, 1)); found++; break; case kTypeColumnFamilyDeletion: @@ -633,8 +672,8 @@ class MemTableInserter : public WriteBatch::Handler { return true; } - virtual CHECKED_STATUS PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { + CHECKED_STATUS PutCF( + uint32_t column_family_id, const SliceParts& key, const SliceParts& value) override { Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { ++sequence_; @@ -647,12 +686,12 @@ class MemTableInserter : public WriteBatch::Handler { insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites)); } else if (moptions->inplace_callback == nullptr) { assert(!insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites)); - mem->Update(CurrentSequenceNumber(), key, value); + mem->Update(CurrentSequenceNumber(), key.TheOnlyPart(), value.TheOnlyPart()); RecordTick(moptions->statistics, NUMBER_KEYS_UPDATED); } else { assert(!insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites)); SequenceNumber current_seq = CurrentSequenceNumber(); - if (mem->UpdateCallback(current_seq, key, value)) { + if (mem->UpdateCallback(current_seq, key.TheOnlyPart(), value.TheOnlyPart())) { } else { // key not found in memtable. Do sst get, update, add SnapshotImpl read_from_snapshot; @@ -667,20 +706,22 @@ class MemTableInserter : public WriteBatch::Handler { if (cf_handle == nullptr) { cf_handle = db_->DefaultColumnFamily(); } - Status s = db_->Get(ropts, cf_handle, key, &prev_value); + Status s = db_->Get(ropts, cf_handle, key.TheOnlyPart(), &prev_value); char* prev_buffer = const_cast(prev_value.c_str()); uint32_t prev_size = static_cast(prev_value.size()); auto status = moptions->inplace_callback(s.ok() ? prev_buffer : nullptr, s.ok() ? &prev_size : nullptr, - value, &merged_value); + value.TheOnlyPart(), &merged_value); if (status == UpdateStatus::UPDATED_INPLACE) { // prev_value is updated in-place with final value. - mem->Add(current_seq, kTypeValue, key, Slice(prev_buffer, prev_size)); + Slice new_value(prev_buffer, prev_size); + mem->Add(current_seq, kTypeValue, key, SliceParts(&new_value, 1)); RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); } else if (status == UpdateStatus::UPDATED) { // merged_value contains the final value. - mem->Add(current_seq, kTypeValue, key, Slice(merged_value)); + Slice new_value(merged_value); + mem->Add(current_seq, kTypeValue, key, SliceParts(&new_value, 1)); RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); } } @@ -723,7 +764,7 @@ class MemTableInserter : public WriteBatch::Handler { return Status::OK(); } } - mem->Add(CurrentSequenceNumber(), delete_type, key, Slice(), + mem->Add(CurrentSequenceNumber(), delete_type, SliceParts(&key, 1), SliceParts(), insert_flags_.Test(InsertFlag::kConcurrentMemtableWrites)); sequence_++; CheckMemtableFull(); @@ -808,13 +849,14 @@ class MemTableInserter : public WriteBatch::Handler { perform_merge = false; } else { // 3) Add value to memtable - mem->Add(current_seq, kTypeValue, key, new_value); + Slice value_slice(new_value); + mem->Add(current_seq, kTypeValue, SliceParts(&key, 1), SliceParts(&value_slice, 1)); } } if (!perform_merge) { // Add merge operator to memtable - mem->Add(current_seq, kTypeMerge, key, value); + mem->Add(current_seq, kTypeMerge, SliceParts(&key, 1), SliceParts(&value, 1)); } sequence_++; @@ -914,4 +956,20 @@ size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize, } } +Result DirectInsert(WriteBatch::Handler* handler, DirectWriter* writer) { + auto mem_table_inserter = down_cast(handler); + auto* mems = mem_table_inserter->cf_mems_; + auto current = mems->current(); + if (!current) { + mems->Seek(0); + current = mems->current(); + } + DirectWriteHandlerImpl direct_write_handler( + current->mem(), mem_table_inserter->sequence_); + RETURN_NOT_OK(writer->Apply(&direct_write_handler)); + auto result = direct_write_handler.Complete(); + mem_table_inserter->CheckMemtableFull(); + return result; +} + } // namespace rocksdb diff --git a/src/yb/rocksdb/db/write_batch_test.cc b/src/yb/rocksdb/db/write_batch_test.cc index 777af885d57f..181d4921d896 100644 --- a/src/yb/rocksdb/db/write_batch_test.cc +++ b/src/yb/rocksdb/db/write_batch_test.cc @@ -208,13 +208,14 @@ namespace { struct TestHandler : public WriteBatch::Handler { std::string seen; - virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { + virtual Status PutCF(uint32_t column_family_id, const SliceParts& key, + const SliceParts& value) override { if (column_family_id == 0) { - seen += "Put(" + key.ToDebugString() + ", " + value.ToDebugString() + ")"; + seen += "Put(" + key.TheOnlyPart().ToDebugString() + ", " + + value.TheOnlyPart().ToDebugString() + ")"; } else { seen += "PutCF(" + ToString(column_family_id) + ", " + - key.ToDebugString() + ", " + value.ToDebugString() + ")"; + key.TheOnlyPart().ToDebugString() + ", " + value.TheOnlyPart().ToDebugString() + ")"; } return Status::OK(); } @@ -354,14 +355,14 @@ TEST_F(WriteBatchTest, DISABLED_ManyUpdates) { struct NoopHandler : public WriteBatch::Handler { uint32_t num_seen = 0; char expected_char = 'A'; - virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { - EXPECT_EQ(kKeyValueSize, key.size()); - EXPECT_EQ(kKeyValueSize, value.size()); - EXPECT_EQ(expected_char, key[0]); - EXPECT_EQ(expected_char, value[0]); - EXPECT_EQ(expected_char, key[kKeyValueSize - 1]); - EXPECT_EQ(expected_char, value[kKeyValueSize - 1]); + virtual Status PutCF(uint32_t column_family_id, const SliceParts& key, + const SliceParts& value) override { + EXPECT_EQ(kKeyValueSize, key.TheOnlyPart().size()); + EXPECT_EQ(kKeyValueSize, value.TheOnlyPart().size()); + EXPECT_EQ(expected_char, key.TheOnlyPart()[0]); + EXPECT_EQ(expected_char, value.TheOnlyPart()[0]); + EXPECT_EQ(expected_char, key.TheOnlyPart()[kKeyValueSize - 1]); + EXPECT_EQ(expected_char, value.TheOnlyPart()[kKeyValueSize - 1]); expected_char++; if (expected_char > 'Z') { expected_char = 'A'; @@ -409,14 +410,14 @@ TEST_F(WriteBatchTest, DISABLED_LargeKeyValue) { struct NoopHandler : public WriteBatch::Handler { int num_seen = 0; - virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { - EXPECT_EQ(kKeyValueSize, key.size()); - EXPECT_EQ(kKeyValueSize, value.size()); - EXPECT_EQ('A' + num_seen, key[0]); - EXPECT_EQ('A' + num_seen, value[0]); - EXPECT_EQ('A' - num_seen, key[kKeyValueSize - 1]); - EXPECT_EQ('A' - num_seen, value[kKeyValueSize - 1]); + virtual Status PutCF(uint32_t column_family_id, const SliceParts& key, + const SliceParts& value) override { + EXPECT_EQ(kKeyValueSize, key.TheOnlyPart().size()); + EXPECT_EQ(kKeyValueSize, value.TheOnlyPart().size()); + EXPECT_EQ('A' + num_seen, key.TheOnlyPart()[0]); + EXPECT_EQ('A' + num_seen, value.TheOnlyPart()[0]); + EXPECT_EQ('A' - num_seen, key.TheOnlyPart()[kKeyValueSize - 1]); + EXPECT_EQ('A' - num_seen, value.TheOnlyPart()[kKeyValueSize - 1]); ++num_seen; return Status::OK(); } @@ -448,8 +449,8 @@ TEST_F(WriteBatchTest, Continue) { struct Handler : public TestHandler { int num_seen = 0; - virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { + virtual Status PutCF(uint32_t column_family_id, const SliceParts& key, + const SliceParts& value) override { ++num_seen; return TestHandler::PutCF(column_family_id, key, value); } diff --git a/src/yb/rocksdb/table/table_test.cc b/src/yb/rocksdb/table/table_test.cc index 95679d05fdc2..aa7227eef555 100644 --- a/src/yb/rocksdb/table/table_test.cc +++ b/src/yb/rocksdb/table/table_test.cc @@ -434,7 +434,9 @@ class MemTableConstructor: public Constructor { memtable_->Ref(); int seq = 1; for (const auto& kv : kv_map) { - memtable_->Add(seq, kTypeValue, kv.first, kv.second); + Slice key(kv.first); + Slice value(kv.second); + memtable_->Add(seq, kTypeValue, SliceParts(&key, 1), SliceParts(&value, 1)); seq++; } return Status::OK(); diff --git a/src/yb/rocksdb/utilities/transactions/transaction_impl.cc b/src/yb/rocksdb/utilities/transactions/transaction_impl.cc index e8397bac3e7f..0b8ec0e06127 100644 --- a/src/yb/rocksdb/utilities/transactions/transaction_impl.cc +++ b/src/yb/rocksdb/utilities/transactions/transaction_impl.cc @@ -210,9 +210,9 @@ Status TransactionImpl::LockBatch(WriteBatch* batch, } } - virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { - RecordKey(column_family_id, key); + virtual Status PutCF(uint32_t column_family_id, const SliceParts& key, + const SliceParts& value) override { + RecordKey(column_family_id, key.TheOnlyPart()); return Status::OK(); } virtual Status MergeCF(uint32_t column_family_id, const Slice& key, diff --git a/src/yb/rocksdb/utilities/ttl/db_ttl_impl.cc b/src/yb/rocksdb/utilities/ttl/db_ttl_impl.cc index 369f9d119cee..8e439db4222e 100644 --- a/src/yb/rocksdb/utilities/ttl/db_ttl_impl.cc +++ b/src/yb/rocksdb/utilities/ttl/db_ttl_impl.cc @@ -260,14 +260,15 @@ Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) { explicit Handler(Env* env) : env_(env) {} WriteBatch updates_ttl; Status batch_rewrite_status; - virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { + + Status PutCF( + uint32_t column_family_id, const SliceParts& key, const SliceParts& value) override { std::string value_with_ts; - Status st = AppendTS(value, &value_with_ts, env_); + Status st = AppendTS(value.TheOnlyPart(), &value_with_ts, env_); if (!st.ok()) { batch_rewrite_status = st; } else { - WriteBatchInternal::Put(&updates_ttl, column_family_id, key, + WriteBatchInternal::Put(&updates_ttl, column_family_id, key.TheOnlyPart(), value_with_ts); } return Status::OK(); diff --git a/src/yb/rocksdb/utilities/write_batch_with_index/write_batch_with_index_test.cc b/src/yb/rocksdb/utilities/write_batch_with_index/write_batch_with_index_test.cc index d9bd04aa1b7f..82ab10a2879c 100644 --- a/src/yb/rocksdb/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/src/yb/rocksdb/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -62,10 +62,10 @@ struct Entry { struct TestHandler : public WriteBatch::Handler { std::map> seen; - Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { + Status PutCF(uint32_t column_family_id, const SliceParts& key, const SliceParts& value) override { Entry e; - e.key = key.ToBuffer(); - e.value = value.ToBuffer(); + e.key = key.TheOnlyPart().ToBuffer(); + e.value = value.TheOnlyPart().ToBuffer(); e.type = kPutRecord; seen[column_family_id].push_back(e); return Status::OK(); diff --git a/src/yb/rocksdb/write_batch.h b/src/yb/rocksdb/write_batch.h index 6d31d6616632..5b00bfc2fa02 100644 --- a/src/yb/rocksdb/write_batch.h +++ b/src/yb/rocksdb/write_batch.h @@ -57,6 +57,23 @@ class ColumnFamilyHandle; struct SavePoints; class UserFrontiers; +class DirectWriteHandler { + public: + virtual void Put(const SliceParts& key, const SliceParts& value) = 0; + + virtual ~DirectWriteHandler() = default; +}; + +// DirectWriter could be attached to WriteBatch, in this case when write batch is applied to +// rocksdb, it calls direct writer passing DirectWriteHandler, that could be used to add +// entries directly to mem table. +class DirectWriter { + public: + virtual CHECKED_STATUS Apply(DirectWriteHandler* handler) = 0; + + virtual ~DirectWriter() = default; +}; + class WriteBatch : public WriteBatchBase { public: explicit WriteBatch(size_t reserved_bytes = 0); @@ -153,13 +170,13 @@ class WriteBatch : public WriteBatchBase { // default implementation will just call Put without column family for // backwards compatibility. If the column family is not default, // the function is noop - virtual CHECKED_STATUS PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) { + virtual CHECKED_STATUS PutCF(uint32_t column_family_id, const SliceParts& key, + const SliceParts& value) { if (column_family_id == 0) { // Put() historically doesn't return status. We didn't want to be // backwards incompatible so we didn't change the return status // (this is a public API). We do an ordinary get and return Status::OK() - Put(key, value); + Put(key.TheOnlyPart(), value.TheOnlyPart()); return Status::OK(); } return STATUS(InvalidArgument, @@ -250,6 +267,18 @@ class WriteBatch : public WriteBatchBase { void SetFrontiers(const UserFrontiers* value) { frontiers_ = value; } const UserFrontiers* Frontiers() const { return frontiers_; } + void SetDirectWriter(DirectWriter* direct_writer) { + direct_writer_ = direct_writer; + } + + bool HasDirectWriter() const { + return direct_writer_ != nullptr; + } + + size_t DirectEntries() const { + return direct_entries_; + } + private: friend class WriteBatchInternal; std::unique_ptr save_points_; @@ -263,6 +292,8 @@ class WriteBatch : public WriteBatchBase { protected: std::string rep_; // See comment in write_batch.cc for the format of rep_ const UserFrontiers* frontiers_ = nullptr; + DirectWriter* direct_writer_ = nullptr; + mutable size_t direct_entries_ = 0; // Intentionally copyable }; diff --git a/src/yb/rocksutil/write_batch_formatter.cc b/src/yb/rocksutil/write_batch_formatter.cc index dbfa4a60123b..c08defa183fb 100644 --- a/src/yb/rocksutil/write_batch_formatter.cc +++ b/src/yb/rocksutil/write_batch_formatter.cc @@ -27,12 +27,12 @@ namespace yb { rocksdb::Status WriteBatchFormatter::PutCF( uint32_t column_family_id, - const Slice& key, - const Slice& value) { + const SliceParts& key, + const SliceParts& value) { StartOutputLine(__FUNCTION__); - OutputKey(key); + OutputKey(key.TheOnlyPart()); AddSeparator(); - OutputValue(key, value); + OutputValue(key.TheOnlyPart(), value.TheOnlyPart()); FinishOutputLine(); return Status::OK(); } diff --git a/src/yb/rocksutil/write_batch_formatter.h b/src/yb/rocksutil/write_batch_formatter.h index bdd9e2f8a8bf..ba025cac2c68 100644 --- a/src/yb/rocksutil/write_batch_formatter.h +++ b/src/yb/rocksutil/write_batch_formatter.h @@ -46,8 +46,8 @@ class WriteBatchFormatter : public rocksdb::WriteBatch::Handler { virtual CHECKED_STATUS PutCF( uint32_t column_family_id, - const rocksdb::Slice& key, - const rocksdb::Slice& value) override; + const rocksdb::SliceParts& key, + const rocksdb::SliceParts& value) override; virtual CHECKED_STATUS DeleteCF( uint32_t column_family_id, diff --git a/src/yb/tablet/tablet.cc b/src/yb/tablet/tablet.cc index 140196d2ad08..34a41df07da5 100644 --- a/src/yb/tablet/tablet.cc +++ b/src/yb/tablet/tablet.cc @@ -69,6 +69,7 @@ #include "yb/docdb/pgsql_operation.h" #include "yb/docdb/ql_rocksdb_storage.h" #include "yb/docdb/redis_operation.h" +#include "yb/docdb/rocksdb_writer.h" #include "yb/gutil/casts.h" @@ -1154,21 +1155,23 @@ Status Tablet::ApplyOperation( batch_idx, write_batch, frontiers_ptr, hybrid_time, already_applied_to_regular_db); } -Status Tablet::PrepareTransactionWriteBatch( +Status Tablet::WriteTransactionalBatch( int64_t batch_idx, const KeyValueWriteBatchPB& put_batch, HybridTime hybrid_time, - rocksdb::WriteBatch* rocksdb_write_batch) { + const rocksdb::UserFrontiers* frontiers) { auto transaction_id = CHECK_RESULT( FullyDecodeTransactionId(put_batch.transaction().transaction_id())); + + bool store_metadata = false; if (put_batch.transaction().has_isolation()) { // Store transaction metadata (status tablet, isolation level etc.) - if (!transaction_participant()->Add(put_batch.transaction(), rocksdb_write_batch)) { - auto status = STATUS_EC_FORMAT( - TryAgain, PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE), - "Transaction was recently aborted: $0", transaction_id); - return status.CloneAndAddErrorCode(TransactionError(TransactionErrorCode::kAborted)); + auto metadata = VERIFY_RESULT(TransactionMetadata::FromPB(put_batch.transaction())); + auto add_result = transaction_participant()->Add(metadata); + if (!add_result.ok()) { + return add_result.status(); } + store_metadata = add_result.get(); } boost::container::small_vector encoded_replicated_batch_idx_set; auto prepare_batch_data = transaction_participant()->PrepareBatchData( @@ -1184,12 +1187,23 @@ Status Tablet::PrepareTransactionWriteBatch( auto isolation_level = prepare_batch_data->first; auto& last_batch_data = prepare_batch_data->second; - yb::docdb::PrepareTransactionWriteBatch( - put_batch, hybrid_time, rocksdb_write_batch, transaction_id, isolation_level, + + docdb::TransactionalWriter writer( + put_batch, hybrid_time, transaction_id, isolation_level, docdb::PartialRangeKeyIntents(metadata_->UsePartialRangeKeyIntents()), Slice(encoded_replicated_batch_idx_set.data(), encoded_replicated_batch_idx_set.size()), - &last_batch_data.next_write_id); + last_batch_data.next_write_id); + if (store_metadata) { + writer.SetMetadataToStore(&put_batch.transaction()); + } + rocksdb::WriteBatch write_batch; + write_batch.SetDirectWriter(&writer); + RequestScope request_scope(transaction_participant_.get()); + + WriteToRocksDB(frontiers, &write_batch, StorageDbType::kIntents); + last_batch_data.hybrid_time = hybrid_time; + last_batch_data.next_write_id = writer.intra_txn_write_id(); transaction_participant()->BatchReplicated(transaction_id, last_batch_data); return Status::OK(); @@ -1211,21 +1225,16 @@ Status Tablet::ApplyKeyValueRowOperations( // In all other cases we should crash instead of skipping apply. if (put_batch.has_transaction()) { - rocksdb::WriteBatch write_batch; - RequestScope request_scope(transaction_participant_.get()); - RETURN_NOT_OK(PrepareTransactionWriteBatch(batch_idx, put_batch, hybrid_time, &write_batch)); - WriteToRocksDB(frontiers, &write_batch, StorageDbType::kIntents); + RETURN_NOT_OK(WriteTransactionalBatch(batch_idx, put_batch, hybrid_time, frontiers)); } else { rocksdb::WriteBatch regular_write_batch; auto* regular_write_batch_ptr = !already_applied_to_regular_db ? ®ular_write_batch : nullptr; - // See comments for PrepareNonTransactionWriteBatch. + + // See comments for PrepareExternalWriteBatch. rocksdb::WriteBatch intents_write_batch; - PrepareNonTransactionWriteBatch( + bool has_non_exteranl_records = PrepareExternalWriteBatch( put_batch, hybrid_time, intents_db_.get(), regular_write_batch_ptr, &intents_write_batch); - if (regular_write_batch.Count() != 0) { - WriteToRocksDB(frontiers, regular_write_batch_ptr, StorageDbType::kRegular); - } if (intents_write_batch.Count() != 0) { if (!metadata_->is_under_twodc_replication()) { RETURN_NOT_OK(metadata_->SetIsUnderTwodcReplicationAndFlush(true)); @@ -1233,6 +1242,14 @@ Status Tablet::ApplyKeyValueRowOperations( WriteToRocksDB(frontiers, &intents_write_batch, StorageDbType::kIntents); } + docdb::NonTransactionalWriter writer(put_batch, hybrid_time); + if (!already_applied_to_regular_db && has_non_exteranl_records) { + regular_write_batch.SetDirectWriter(&writer); + } + if (regular_write_batch.Count() != 0 || regular_write_batch.HasDirectWriter()) { + WriteToRocksDB(frontiers, ®ular_write_batch, StorageDbType::kRegular); + } + if (snapshot_coordinator_) { for (const auto& pair : put_batch.write_pairs()) { WARN_NOT_OK(snapshot_coordinator_->ApplyWritePair(pair.key(), pair.value()), @@ -1248,9 +1265,6 @@ void Tablet::WriteToRocksDB( const rocksdb::UserFrontiers* frontiers, rocksdb::WriteBatch* write_batch, docdb::StorageDbType storage_db_type) { - if (write_batch->Count() == 0) { - return; - } rocksdb::DB* dest_db = nullptr; switch (storage_db_type) { case StorageDbType::kRegular: dest_db = regular_db_.get(); break; diff --git a/src/yb/tablet/tablet.h b/src/yb/tablet/tablet.h index cb8e238125b4..fbe32a65d82f 100644 --- a/src/yb/tablet/tablet.h +++ b/src/yb/tablet/tablet.h @@ -731,11 +731,11 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier { void DocDBDebugDump(std::vector *lines); - CHECKED_STATUS PrepareTransactionWriteBatch( + CHECKED_STATUS WriteTransactionalBatch( int64_t batch_idx, // index of this batch in its transaction const docdb::KeyValueWriteBatchPB& put_batch, HybridTime hybrid_time, - rocksdb::WriteBatch* rocksdb_write_batch); + const rocksdb::UserFrontiers* frontiers); Result CreateTransactionOperationContext( const boost::optional& transaction_id, diff --git a/src/yb/tablet/transaction_participant.cc b/src/yb/tablet/transaction_participant.cc index 618592a8bcd5..afb8013ab92a 100644 --- a/src/yb/tablet/transaction_participant.cc +++ b/src/yb/tablet/transaction_participant.cc @@ -24,6 +24,7 @@ #include "yb/client/transaction_rpc.h" #include "yb/common/pgsql_error.h" +#include "yb/common/transaction_error.h" #include "yb/consensus/consensus_util.h" @@ -185,41 +186,26 @@ class TransactionParticipant::Impl } // Adds new running transaction. - bool Add(const TransactionMetadataPB& data, rocksdb::WriteBatch *write_batch) { - auto metadata = TransactionMetadata::FromPB(data); - if (!metadata.ok()) { - LOG_WITH_PREFIX(DFATAL) << "Invalid transaction id: " << metadata.status().ToString(); + Result Add(const TransactionMetadata& metadata) { + loader_.WaitLoaded(metadata.transaction_id); + + MinRunningNotifier min_running_notifier(&applier_); + std::lock_guard lock(mutex_); + auto it = transactions_.find(metadata.transaction_id); + if (it != transactions_.end()) { return false; } - loader_.WaitLoaded(metadata->transaction_id); - bool store = false; - { - MinRunningNotifier min_running_notifier(&applier_); - std::lock_guard lock(mutex_); - auto it = transactions_.find(metadata->transaction_id); - if (it == transactions_.end()) { - if (WasTransactionRecentlyRemoved(metadata->transaction_id)) { - return false; - } - if (cleanup_cache_.Erase(metadata->transaction_id) != 0) { - return false; - } - VLOG_WITH_PREFIX(4) << "Create new transaction: " << metadata->transaction_id; - transactions_.insert(std::make_shared( - *metadata, TransactionalBatchData(), OneWayBitmap(), metadata->start_time, this)); - TransactionsModifiedUnlocked(&min_running_notifier); - store = true; - } - } - if (store) { - docdb::KeyBytes key; - AppendTransactionKeyPrefix(metadata->transaction_id, &key); - auto data_copy = data; - // We use hybrid time only for backward compatibility, actually wall time is required. - data_copy.set_metadata_write_time(GetCurrentTimeMicros()); - auto value = data_copy.SerializeAsString(); - write_batch->Put(key.AsSlice(), value); + if (WasTransactionRecentlyRemoved(metadata.transaction_id) || + cleanup_cache_.Erase(metadata.transaction_id) != 0) { + auto status = STATUS_EC_FORMAT( + TryAgain, PgsqlError(YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE), + "Transaction was recently aborted: $0", metadata.transaction_id); + return status.CloneAndAddErrorCode(TransactionError(TransactionErrorCode::kAborted)); } + VLOG_WITH_PREFIX(4) << "Create new transaction: " << metadata.transaction_id; + transactions_.insert(std::make_shared( + metadata, TransactionalBatchData(), OneWayBitmap(), metadata.start_time, this)); + TransactionsModifiedUnlocked(&min_running_notifier); return true; } @@ -1569,9 +1555,8 @@ void TransactionParticipant::Start() { impl_->Start(); } -bool TransactionParticipant::Add( - const TransactionMetadataPB& data, rocksdb::WriteBatch *write_batch) { - return impl_->Add(data, write_batch); +Result TransactionParticipant::Add(const TransactionMetadata& metadata) { + return impl_->Add(metadata); } Result TransactionParticipant::PrepareMetadata( diff --git a/src/yb/tablet/transaction_participant.h b/src/yb/tablet/transaction_participant.h index 202a18df54e3..04b6155d0815 100644 --- a/src/yb/tablet/transaction_participant.h +++ b/src/yb/tablet/transaction_participant.h @@ -111,8 +111,8 @@ class TransactionParticipant : public TransactionStatusManager { void Start(); // Adds new running transaction. - MUST_USE_RESULT bool Add( - const TransactionMetadataPB& data, rocksdb::WriteBatch *write_batch); + // Returns true if transaction was added, false if transaction already present. + Result Add(const TransactionMetadata& metadata); Result PrepareMetadata(const TransactionMetadataPB& id) override; diff --git a/src/yb/util/slice.cc b/src/yb/util/slice.cc index 66bf83c25513..ae4689ff9e8f 100644 --- a/src/yb/util/slice.cc +++ b/src/yb/util/slice.cc @@ -199,15 +199,20 @@ size_t SliceParts::SumSizes() const { return result; } -void SliceParts::CopyAllTo(void* out) const { - char* buf = static_cast(out); +char* SliceParts::CopyAllTo(char* out) const { for (int i = 0; i != num_parts; ++i) { if (!parts[i].size()) { continue; } - memcpy(buf, parts[i].data(), parts[i].size()); - buf += parts[i].size(); + memcpy(out, parts[i].data(), parts[i].size()); + out += parts[i].size(); } + return out; +} + +Slice SliceParts::TheOnlyPart() const { + CHECK_EQ(num_parts, 1); + return parts[0]; } } // namespace yb diff --git a/src/yb/util/slice.h b/src/yb/util/slice.h index bbde23563dec..f7563a3c1cd6 100644 --- a/src/yb/util/slice.h +++ b/src/yb/util/slice.h @@ -304,7 +304,13 @@ struct SliceParts { size_t SumSizes() const; // Copy content of all slice to specified buffer. - void CopyAllTo(void* out) const; + void* CopyAllTo(void* out) const { + return CopyAllTo(static_cast(out)); + } + + char* CopyAllTo(char* out) const; + + Slice TheOnlyPart() const; const Slice* parts; int num_parts;