Skip to content

Commit

Permalink
Add a flush option wait_for_results_readable
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlyzhang committed Oct 1, 2024
1 parent 389e66b commit f306b73
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 10 deletions.
1 change: 1 addition & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5987,6 +5987,7 @@ Status DBImpl::IngestExternalFiles(

if (status.ok() && at_least_one_cf_need_flush) {
FlushOptions flush_opts;
flush_opts.wait_for_results_readable = true;
flush_opts.allow_write_stall = true;
if (immutable_db_options_.atomic_flush) {
mutex_.Unlock();
Expand Down
8 changes: 5 additions & 3 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2078,15 +2078,17 @@ class DBImpl : public DB {
// from background error.
Status WaitForFlushMemTable(ColumnFamilyData* cfd,
const uint64_t* flush_memtable_id = nullptr,
bool resuming_from_bg_err = false) {
bool resuming_from_bg_err = false,
bool wait_for_results_readable = false) {
return WaitForFlushMemTables({cfd}, {flush_memtable_id},
resuming_from_bg_err);
resuming_from_bg_err,
wait_for_results_readable);
}
// Wait for memtables to be flushed for multiple column families.
Status WaitForFlushMemTables(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const uint64_t*>& flush_memtable_ids,
bool resuming_from_bg_err);
bool resuming_from_bg_err, bool wait_for_results_readable);

inline void WaitForPendingWrites() {
mutex_.AssertHeld();
Expand Down
17 changes: 12 additions & 5 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2407,7 +2407,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
s = WaitForFlushMemTables(
cfds, flush_memtable_ids,
flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */);
flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */,
flush_options.wait_for_results_readable);
InstrumentedMutexLock lock_guard(&mutex_);
for (auto* tmp_cfd : cfds) {
tmp_cfd->UnrefAndTryDelete();
Expand Down Expand Up @@ -2549,7 +2550,8 @@ Status DBImpl::AtomicFlushMemTables(
}
s = WaitForFlushMemTables(
cfds, flush_memtable_ids,
flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */);
flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */,
flush_options.wait_for_results_readable);
InstrumentedMutexLock lock_guard(&mutex_);
for (auto* cfd : cfds) {
cfd->UnrefAndTryDelete();
Expand Down Expand Up @@ -2612,7 +2614,8 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason,
flush_memtable_id_ptrs.push_back(&flush_memtable_id);
}
s = WaitForFlushMemTables(cfds, flush_memtable_id_ptrs,
true /* resuming_from_bg_err */);
true /* resuming_from_bg_err */,
/* wait_for_results_readable= */ false);
mutex_.Lock();
}

Expand Down Expand Up @@ -2712,7 +2715,7 @@ Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
Status DBImpl::WaitForFlushMemTables(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const uint64_t*>& flush_memtable_ids,
bool resuming_from_bg_err) {
bool resuming_from_bg_err, bool wait_for_results_readable) {
int num = static_cast<int>(cfds.size());
// Wait until the compaction completes
InstrumentedMutexLock l(&mutex_);
Expand Down Expand Up @@ -2750,7 +2753,11 @@ Status DBImpl::WaitForFlushMemTables(
(flush_memtable_ids[i] != nullptr &&
cfds[i]->imm()->GetEarliestMemTableID() >
*flush_memtable_ids[i])) {
++num_finished;
if (!wait_for_results_readable ||
cfds[i]->GetSuperVersion()->imm->GetID() ==
cfds[i]->imm()->current()->GetID()) {
++num_finished;
}
}
}
if (1 == num_dropped && 1 == num) {
Expand Down
2 changes: 2 additions & 0 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,9 @@ void MemTableList::InstallNewVersion() {
} else {
// somebody else holds the current version, we need to create new one
MemTableListVersion* version = current_;
uint64_t mlv_id = last_memtable_list_version_id_.fetch_add(1) + 1;
current_ = new MemTableListVersion(&current_memory_usage_, *version);
current_->SetID(mlv_id);
current_->Ref();
version->Unref();
}
Expand Down
16 changes: 15 additions & 1 deletion db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ class MemTableListVersion {
// Return kMaxSequenceNumber if the list is empty.
SequenceNumber GetFirstSequenceNumber() const;

// REQUIRES: db_mutex held.
void SetID(uint64_t id) { id_ = id; }

// REQUIRES: db_mutex held.
uint64_t GetID() const { return id_; }

private:
friend class MemTableList;

Expand Down Expand Up @@ -205,6 +211,9 @@ class MemTableListVersion {
int refs_ = 0;

size_t* parent_memtable_list_memory_usage_;

// MemtableListVersion id to track for flush results checking.
uint64_t id_ = 0;
};

// This class stores references to all the immutable memtables.
Expand Down Expand Up @@ -235,7 +244,8 @@ class MemTableList {
flush_requested_(false),
current_memory_usage_(0),
current_memory_allocted_bytes_excluding_last_(0),
current_has_history_(false) {
current_has_history_(false),
last_memtable_list_version_id_(0) {
current_->Ref();
}

Expand Down Expand Up @@ -500,6 +510,10 @@ class MemTableList {

// Cached value of current_->HasHistory().
std::atomic<bool> current_has_history_;

// Last memtabe list version id, increase by 1 each time a new
// MemtableListVersion is installed.
std::atomic<uint64_t> last_memtable_list_version_id_;
};

// Installs memtable atomic flush results.
Expand Down
14 changes: 13 additions & 1 deletion include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -2037,7 +2037,19 @@ struct FlushOptions {
// is performed by someone else (foreground call or background thread).
// Default: false
bool allow_write_stall;
FlushOptions() : wait(true), allow_write_stall(false) {}

// This flag is only effective if wait is also true. If true, the flush will
// wait until the flushed files are available for user reads. This means after
// this wait returns, all future reads will read the flushed files as opposed
// to the memtables. If false and wait is true, we just wait until flush is
// done, but not necessarily when the files are available for user reads. This
// means upcoming reads could still be reading the memtables.
// Default: false
bool wait_for_results_readable;
FlushOptions()
: wait(true),
allow_write_stall(false),
wait_for_results_readable(false) {}
};

// Create a Logger from provided DBOptions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*New FlushOptions.wait_for_results_readable option will wait until the flushed files are available for read requests. After such a wait returns, all future reads are guaranteed to read the flushed files as opposed to the memtables.

0 comments on commit f306b73

Please sign in to comment.