Skip to content

Commit

Permalink
[#23890] docdb: Add filtering for bootstrap intent iterators based on…
Browse files Browse the repository at this point in the history
… min_replay_txn_start_ht

Summary:
This diff adds filtering for intent SST files during the transaction loading step of tablet bootstrap, using the newly introduced `min_replay_txn_start_ht`.

When CDC is enabled, we persist intent SST files longer than they are otherwise needed, until CDC has streamed all transactions in the SST file and moved the retention barrier far enough ahead. This can lead to a large buildup of intent SST files which are not actually needed at bootstrap time.

8b23a4e / D35639 added changes to save `min_running_ht` periodically, and add intent SST file filtering during bootstrap time based on periodically saved values of `min_running_ht`. This can lead to data loss, if there is a transaction T such that the following is true:
- T has been applied (APPLIED written to WALs)
- T has intents that have been flushed to disk (this is rare but possible when CDC is disabled since in the ideal non-CDC case we never flush intents)
- Changes made by T on regulardb have not been flushed
- The metadata record for T is on an intent SST file whose max HT is less than min_running_ht after T apply (i.e. intentsdb flush happened between T writes and apply)
- Tablet bootstrap state has been saved after T has committed

These conditions will result in a `min_running_ht > T.start_time` being written to disk, and loaded during tablet bootstrap. Since regulardb changes have not been flushed, WAL replay will start from a point that includes T. However, transaction loader will not load T, because its metadata record has been excluded due to the SST file filter. This results in changes made by T being dropped, even though it has successfully committed.

This change introduces a new `min_replay_txn_start_ht` and changes the intent SST file filter to be based off of periodically saved values of this new `min_replay_txn_start_ht`.

`min_replay_txn_start_ht` is defined as the minimum of:
- `min_running_ht`
- `start_ht` of any transaction which may be read during WAL replay

WAL replay begins at `bootstrap_start_op_id = min(intentsdb flushed_op_id, rocksdb flushed_op_id, retryable requests last_flushed_op_id)`. We calculate `min_replay_txn_start_ht` by maintaining a set of `(applied_op_id, start_ht)` for recently applied transactions. Transactions are added into this set when they are applied and cleaned from memory (removed from `transactions_`) and are removed when `bootstrap_start_op_id` is increased past `applied_op_id`. `min_replay_txn_start_ht` is then the minimum of `start_ht` of this set and `min_running_ht`.

Since `replay_start_op_id` is only updated after flushes to disk, this ensures that any transaction whose metadata record is filtered out by the intent SST file filter will not be incorrectly loaded during WAL replay, since such a transaction would have `apply_op_id < replay_start_op_id` (the `replay_start_op_id`  calculated at bootstrap time), so none of its records are read by WAL replay.

**Upgrade/Rollback safety:**

The `min_running_ht` field in `TabletBootstrapStatePB` was retired and a new `min_replay_txn_start_ht` field was added. There are no autoflags added because `min_replay_txn_start_ht` is only used for an optimization (intent SST file filtering) so the lack of its presence post-upgrade does not change correctness, and its presence post-rollback is simply ignored. `min_running_ht` was only used for a incorrect implementation of the optimization which was off by default, so the lack of its presence post-rollback does not harm correctness (and actually improves it if optimization was turned on) and its presence after upgrade is ignored.

A different field was used for this change to ensure that values of `min_running_ht` set before upgrade are not used, since it is unsafe to use it.
Jira: DB-12794

Test Plan:
Added test case to reproduce the data loss scenario when filter was using `min_running_ht`:
```
./yb_build.sh --cxx_test pgwrapper_pg_mini-test --gtest_filter PgMiniTestSingleNode.TestBootstrapOnAppliedTransactionWithIntents
```

Also confirmed that CDC stress tests stop failing after these changes.

Reviewers: sergei, qhu

Reviewed By: sergei

Subscribers: rthallam, ybase, yql

Differential Revision: https://phorge.dev.yugabyte.com/D37792
  • Loading branch information
es1024 committed Sep 17, 2024
1 parent 41f5afd commit d6bbf59
Show file tree
Hide file tree
Showing 18 changed files with 409 additions and 68 deletions.
15 changes: 15 additions & 0 deletions src/yb/common/opid.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ struct OpId {

std::string ToString() const;

static OpId MinValid(const OpId& lhs, const OpId& rhs);
static OpId MaxValid(const OpId& lhs, const OpId& rhs);

// Parse OpId from TERM.INDEX string.
static Result<OpId> FromString(Slice input);
};
Expand Down Expand Up @@ -112,6 +115,18 @@ inline bool operator>=(const OpId& lhs, const OpId& rhs) {
return !(lhs < rhs);
}

inline OpId OpId::MinValid(const OpId& lhs, const OpId& rhs) {
if (!lhs.valid()) return rhs;
if (!rhs.valid()) return lhs;
return std::min(lhs, rhs);
}

inline OpId OpId::MaxValid(const OpId& lhs, const OpId& rhs) {
if (!lhs.valid()) return rhs;
if (!rhs.valid()) return lhs;
return std::max(lhs, rhs);
}

std::ostream& operator<<(std::ostream& out, const OpId& op_id);

size_t hash_value(const OpId& op_id) noexcept;
Expand Down
5 changes: 4 additions & 1 deletion src/yb/consensus/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,10 @@ message ClientReplicatedRetryableRequestRangesPB {
message TabletBootstrapStatePB {
optional OpIdPB last_op_id = 1;
repeated ClientReplicatedRetryableRequestRangesPB client_requests = 2;
optional fixed64 min_running_ht = 3;

reserved 3;

optional fixed64 min_replay_txn_start_ht = 4;
}

// A Raft implementation.
Expand Down
47 changes: 36 additions & 11 deletions src/yb/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,36 @@ Result<HybridTime> CheckSafeTime(HybridTime time, HybridTime min_allowed) {

} // namespace

class Tablet::RegularRocksDbListener : public rocksdb::EventListener {
class Tablet::RocksDbListener : public rocksdb::EventListener {
public:
RegularRocksDbListener(Tablet* tablet, const std::string& log_prefix)
: tablet_(*CHECK_NOTNULL(tablet)),
log_prefix_(log_prefix) {}
RocksDbListener(Tablet& tablet, const std::string& log_prefix)
: tablet_(tablet), log_prefix_(log_prefix) {}

void OnFlushCompleted(rocksdb::DB*, const rocksdb::FlushJobInfo&) override {
if (auto* participant = tablet_.transaction_participant()) {
VLOG_WITH_PREFIX_AND_FUNC(2)
<< "RocksDB flush completed, triggering cleanup of recently applied transactions";
auto status = participant->ProcessRecentlyAppliedTransactions();
if (!status.ok() && !tablet_.shutdown_requested_.load(std::memory_order_acquire)) {
LOG_WITH_PREFIX_AND_FUNC(DFATAL)
<< "Failed to clean up recently applied transactions: " << status;
}
}
}

protected:
const std::string& LogPrefix() const {
return log_prefix_;
}

Tablet& tablet_;
const std::string log_prefix_;
};

class Tablet::RegularRocksDbListener : public Tablet::RocksDbListener {
public:
RegularRocksDbListener(Tablet& tablet, const std::string& log_prefix)
: RocksDbListener(tablet, log_prefix) {}

void OnCompactionCompleted(rocksdb::DB* db, const rocksdb::CompactionJobInfo& ci) override {
auto& metadata = *CHECK_NOTNULL(tablet_.metadata());
Expand Down Expand Up @@ -528,7 +553,7 @@ class Tablet::RegularRocksDbListener : public rocksdb::EventListener {
{
auto scoped_read_operation = tablet_.CreateScopedRWOperationNotBlockingRocksDbShutdownStart();
if (!scoped_read_operation.ok()) {
VLOG_WITH_FUNC(4) << "Skip";
VLOG_WITH_PREFIX_AND_FUNC(4) << "Skip";
return;
}

Expand All @@ -552,7 +577,7 @@ class Tablet::RegularRocksDbListener : public rocksdb::EventListener {
if(!tablet_.metadata()->colocated()) {
auto schema_version = tablet_.get_min_xcluster_schema_version_(primary_table_id,
kColocationIdNotSet);
VLOG_WITH_FUNC(4) <<
VLOG_WITH_PREFIX_AND_FUNC(4) <<
Format("MinNonXClusterSchemaVersion, MinXClusterSchemaVersion for $0:$1,$2",
primary_table_id, min_schema_versions[Uuid::Nil()], schema_version);
if (schema_version < min_schema_versions[Uuid::Nil()]) {
Expand All @@ -566,7 +591,7 @@ class Tablet::RegularRocksDbListener : public rocksdb::EventListener {
ColocationId colocation_id = colocated_tables[table_id.ToHexString()];
auto xcluster_min_schema_version = tablet_.get_min_xcluster_schema_version_(primary_table_id,
colocation_id);
VLOG_WITH_FUNC(4) <<
VLOG_WITH_PREFIX_AND_FUNC(4) <<
Format("MinNonXClusterSchemaVersion, MinXClusterSchemaVersion for $0,$1:$2,$3",
primary_table_id, colocation_id, min_schema_versions[table_id],
xcluster_min_schema_version);
Expand Down Expand Up @@ -595,9 +620,6 @@ class Tablet::RegularRocksDbListener : public rocksdb::EventListener {
smallest.MakeExternalSchemaVersionsAtMost(table_id_to_min_schema_version);
}
}

Tablet& tablet_;
const std::string log_prefix_;
};

Tablet::Tablet(const TabletInitData& data)
Expand Down Expand Up @@ -972,7 +994,7 @@ Status Tablet::OpenKeyValueTablet() {

rocksdb::Options regular_rocksdb_options(rocksdb_options);
regular_rocksdb_options.listeners.push_back(
std::make_shared<RegularRocksDbListener>(this, regular_rocksdb_options.log_prefix));
std::make_shared<RegularRocksDbListener>(*this, regular_rocksdb_options.log_prefix));

const string db_dir = metadata()->rocksdb_dir();
RETURN_NOT_OK(CreateTabletDirectories(db_dir, metadata()->fs_manager()));
Expand Down Expand Up @@ -1020,6 +1042,9 @@ Status Tablet::OpenKeyValueTablet() {
}
intents_rocksdb_options.statistics = intentsdb_statistics_;

intents_rocksdb_options.listeners.push_back(
std::make_shared<RocksDbListener>(*this, intents_rocksdb_options.log_prefix));

rocksdb::DB* intents_db = nullptr;
RETURN_NOT_OK(
rocksdb::DB::Open(intents_rocksdb_options, db_dir + kIntentsDBSuffix, &intents_db));
Expand Down
1 change: 1 addition & 0 deletions src/yb/tablet/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,7 @@ class Tablet : public AbstractTablet,
friend class ScopedReadOperation;
friend class TabletComponent;

class RocksDbListener;
class RegularRocksDbListener;

FRIEND_TEST(TestTablet, TestGetLogRetentionSizeForIndex);
Expand Down
14 changes: 7 additions & 7 deletions src/yb/tablet/tablet_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ DEFINE_RUNTIME_bool(skip_flushed_entries_in_first_replayed_segment, true,
"If applicable, only replay entries that are not flushed to RocksDB or necessary "
"to bootstrap retryable requests in the first replayed wal segment.");

DEFINE_RUNTIME_bool(use_bootstrap_intent_ht_filter, false,
"Use min running hybrid time filter for bootstrap.");
DEFINE_RUNTIME_bool(use_bootstrap_intent_ht_filter, true,
"Use min replay txn start time filter for bootstrap.");

DECLARE_int32(retryable_request_timeout_secs);

Expand Down Expand Up @@ -506,22 +506,22 @@ class TabletBootstrap {
}

std::optional<consensus::TabletBootstrapStatePB> bootstrap_state_pb = std::nullopt;
HybridTime min_running_ht = HybridTime::kInvalid;
HybridTime min_replay_txn_start_ht = HybridTime::kInvalid;
if (GetAtomicFlag(&FLAGS_enable_flush_retryable_requests) && data_.bootstrap_state_manager) {
auto result = data_.bootstrap_state_manager->LoadFromDisk();
if (result.ok()) {
bootstrap_state_pb = std::move(*result);

if (GetAtomicFlag(&FLAGS_use_bootstrap_intent_ht_filter)) {
const auto& bootstrap_state = data_.bootstrap_state_manager->bootstrap_state();
min_running_ht = bootstrap_state.GetMinRunningHybridTime();
min_replay_txn_start_ht = bootstrap_state.GetMinReplayTxnStartTime();
}
} else if (!result.status().IsNotFound()) {
return result.status();
}
}

const bool has_blocks = VERIFY_RESULT(OpenTablet(min_running_ht));
const bool has_blocks = VERIFY_RESULT(OpenTablet(min_replay_txn_start_ht));

if (data_.retryable_requests) {
const auto retryable_request_timeout_secs = meta_->IsSysCatalog()
Expand Down Expand Up @@ -623,7 +623,7 @@ class TabletBootstrap {
}

// Sets result to true if there was any data on disk for this tablet.
Result<bool> OpenTablet(HybridTime min_running_ht) {
Result<bool> OpenTablet(HybridTime min_replay_txn_start_ht) {
CleanupSnapshots();
// Use operator new instead of make_shared for creating the shared_ptr. That way, we would have
// the shared_ptr's control block hold a raw pointer to the Tablet object as opposed to the
Expand All @@ -637,7 +637,7 @@ class TabletBootstrap {

auto participant = tablet->transaction_participant();
if (participant) {
participant->SetMinRunningHybridTimeLowerBound(min_running_ht);
participant->SetMinReplayTxnStartTimeLowerBound(min_replay_txn_start_ht);
}

// Doing nothing for now except opening a tablet locally.
Expand Down
2 changes: 1 addition & 1 deletion src/yb/tablet/tablet_bootstrap_state_flusher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ Status TabletBootstrapStateFlusher::FlushBootstrapState(TabletBootstrapFlushStat
SetIdleAndNotifyAll();
});
TEST_PAUSE_IF_FLAG(TEST_pause_before_flushing_bootstrap_state);
return bootstrap_state_manager_->SaveToDisk(*raft_consensus_);
return bootstrap_state_manager_->SaveToDisk(tablet_, *raft_consensus_);
}

Status TabletBootstrapStateFlusher::SubmitFlushBootstrapStateTask() {
Expand Down
3 changes: 3 additions & 0 deletions src/yb/tablet/tablet_bootstrap_state_flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ class TabletBootstrapStateFlusher :
public:
TabletBootstrapStateFlusher(
const std::string& tablet_id,
TabletWeakPtr tablet,
std::shared_ptr<consensus::RaftConsensus> raft_consensus,
std::shared_ptr<TabletBootstrapStateManager> bootstrap_state_manager,
std::unique_ptr<ThreadPoolToken> flush_bootstrap_state_pool_token)
: tablet_id_(tablet_id),
tablet_(std::move(tablet)),
raft_consensus_(raft_consensus),
bootstrap_state_manager_(bootstrap_state_manager),
flush_bootstrap_state_pool_token_(std::move(flush_bootstrap_state_pool_token)) {}
Expand Down Expand Up @@ -77,6 +79,7 @@ class TabletBootstrapStateFlusher :
mutable std::condition_variable flush_cond_;
std::atomic<TabletBootstrapFlushState> flush_state_{TabletBootstrapFlushState::kFlushIdle};
TabletId tablet_id_;
TabletWeakPtr tablet_;
std::shared_ptr<consensus::RaftConsensus> raft_consensus_;
std::shared_ptr<TabletBootstrapStateManager> bootstrap_state_manager_;
std::unique_ptr<ThreadPoolToken> flush_bootstrap_state_pool_token_;
Expand Down
49 changes: 39 additions & 10 deletions src/yb/tablet/tablet_bootstrap_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,36 @@
#include "yb/consensus/retryable_requests.h"
#include "yb/consensus/opid_util.h"

#include "yb/tablet/tablet.h"
#include "yb/tablet/transaction_participant.h"

#include "yb/util/debug-util.h"
#include "yb/util/env_util.h"

namespace yb::tablet {

TabletBootstrapState::TabletBootstrapState(const TabletBootstrapState& rhs):
min_running_ht_(rhs.min_running_ht_.load()) {}
min_replay_txn_start_ht_(rhs.min_replay_txn_start_ht_.load()) {}

TabletBootstrapState::TabletBootstrapState(TabletBootstrapState&& rhs):
min_running_ht_(rhs.min_running_ht_.load()) {}
min_replay_txn_start_ht_(rhs.min_replay_txn_start_ht_.load()) {}

void TabletBootstrapState::operator=(TabletBootstrapState&& rhs) {
min_running_ht_.store(rhs.min_running_ht_.load());
min_replay_txn_start_ht_.store(rhs.min_replay_txn_start_ht_.load());
}

void TabletBootstrapState::CopyFrom(const TabletBootstrapState& rhs) {
min_running_ht_.store(rhs.min_running_ht_.load());
min_replay_txn_start_ht_.store(rhs.min_replay_txn_start_ht_.load());
}

void TabletBootstrapState::ToPB(consensus::TabletBootstrapStatePB* pb) const {
pb->set_min_running_ht(min_running_ht_.load().ToUint64());
pb->set_min_replay_txn_start_ht(min_replay_txn_start_ht_.load().ToUint64());
}

void TabletBootstrapState::FromPB(const consensus::TabletBootstrapStatePB& pb) {
min_running_ht_.store(
pb.has_min_running_ht() ? HybridTime(pb.min_running_ht()) : HybridTime::kInvalid);
min_replay_txn_start_ht_.store(
pb.has_min_replay_txn_start_ht() ? HybridTime(pb.min_replay_txn_start_ht())
: HybridTime::kInvalid);
}

TabletBootstrapStateManager::TabletBootstrapStateManager() { }
Expand All @@ -71,15 +75,32 @@ Status TabletBootstrapStateManager::Init() {
return Status::OK();
}

Status TabletBootstrapStateManager::SaveToDisk(consensus::RaftConsensus& raft_consensus) {
Status TabletBootstrapStateManager::SaveToDisk(
const TabletWeakPtr& tablet_ptr, consensus::RaftConsensus& raft_consensus) {
auto retryable_requests = VERIFY_RESULT(raft_consensus.TakeSnapshotOfRetryableRequests());
if (!retryable_requests) {
LOG(INFO) << "Nothing to save";
return Status::OK();
}

auto max_replicated_op_id = retryable_requests->GetMaxReplicatedOpId();

TabletBootstrapState bootstrap_state(bootstrap_state_);

// Set min replay txn start time to what it will be after this flush succeeds - this is safe
// because if the flush succeeds, replay start op id will be calculated from the new value.
auto tablet = tablet_ptr.lock();
TransactionParticipant* participant = nullptr;
if (tablet) {
participant = tablet->transaction_participant();
if (participant) {
auto start_ht = VERIFY_RESULT(participant->SimulateProcessRecentlyAppliedTransactions(
max_replicated_op_id));
VLOG(1) << "Using min_replay_txn_start_ht = " << start_ht;
bootstrap_state.SetMinReplayTxnStartTime(start_ht);
}
}

consensus::TabletBootstrapStatePB pb;
retryable_requests->ToPB(&pb);
bootstrap_state.ToPB(&pb);
Expand All @@ -101,8 +122,16 @@ Status TabletBootstrapStateManager::SaveToDisk(consensus::RaftConsensus& raft_co
has_file_on_disk_ = true;
RETURN_NOT_OK(env->SyncDir(dir_));

auto max_replicated_op_id = retryable_requests->GetMaxReplicatedOpId();
return raft_consensus.SetLastFlushedOpIdInRetryableRequests(max_replicated_op_id);
RETURN_NOT_OK(raft_consensus.SetLastFlushedOpIdInRetryableRequests(max_replicated_op_id));

if (participant) {
VLOG(1)
<< "Bootstrap state saved to disk, triggering cleanup of recently applied transactions";
participant->SetRetryableRequestsFlushedOpId(max_replicated_op_id);
return participant->ProcessRecentlyAppliedTransactions();
}

return Status::OK();
}

Result<consensus::TabletBootstrapStatePB> TabletBootstrapStateManager::LoadFromDisk() {
Expand Down
11 changes: 7 additions & 4 deletions src/yb/tablet/tablet_bootstrap_state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,17 @@ class TabletBootstrapState {

void CopyFrom(const TabletBootstrapState& rhs);

void SetMinRunningHybridTime(HybridTime min_running_ht) { min_running_ht_.store(min_running_ht); }
HybridTime GetMinRunningHybridTime() const { return min_running_ht_.load(); }
void SetMinReplayTxnStartTime(HybridTime min_replay_txn_start_ht) {
min_replay_txn_start_ht_.store(min_replay_txn_start_ht);
}

HybridTime GetMinReplayTxnStartTime() const { return min_replay_txn_start_ht_.load(); }

void ToPB(consensus::TabletBootstrapStatePB* pb) const;
void FromPB(const consensus::TabletBootstrapStatePB& pb);

private:
std::atomic<HybridTime> min_running_ht_{HybridTime::kInvalid};
std::atomic<HybridTime> min_replay_txn_start_ht_{HybridTime::kInvalid};
};

class TabletBootstrapStateManager {
Expand All @@ -74,7 +77,7 @@ class TabletBootstrapStateManager {
}

// Flush the pb as the latest version.
Status SaveToDisk(consensus::RaftConsensus& raft_consensus);
Status SaveToDisk(const TabletWeakPtr& tablet_ptr, consensus::RaftConsensus& raft_consensus);

// Load the latest version from disk if any.
Result<consensus::TabletBootstrapStatePB> LoadFromDisk();
Expand Down
Loading

0 comments on commit d6bbf59

Please sign in to comment.