Skip to content

Commit

Permalink
Add documentation for background job's state transition (#12994)
Browse files Browse the repository at this point in the history
Summary:
The `SchedulePending*` API is a bit confusing since it doesn't immediately schedule the work and can be confused with the actual scheduling. So I have changed these to be `EnqueuePending*` and added some documentation for the corresponding state transitions of these background work.

Pull Request resolved: #12994

Test Plan: existing tests

Reviewed By: cbi42

Differential Revision: D62252746

Pulled By: jowlyzhang

fbshipit-source-id: ee68be6ed33070cad9a5004b7b3e16f5bcb041bf
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed Sep 6, 2024
1 parent 0bea5a2 commit a24574e
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 26 deletions.
2 changes: 1 addition & 1 deletion db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6146,7 +6146,7 @@ TEST_F(DBCompactionTest, CompactionLimiter) {

std::vector<std::string> pending_compaction_cfs;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SchedulePendingCompaction::cfd", [&](void* arg) {
"EnqueuePendingCompaction::cfd", [&](void* arg) {
const std::string& cf_name =
static_cast<ColumnFamilyData*>(arg)->GetName();
pending_compaction_cfs.emplace_back(cf_name);
Expand Down
4 changes: 2 additions & 2 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ Status DBImpl::ResumeImpl(DBRecoverContext context) {

if (s.ok()) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
SchedulePendingCompaction(cfd);
EnqueuePendingCompaction(cfd);
}
MaybeScheduleFlushOrCompaction();
}
Expand Down Expand Up @@ -4282,7 +4282,7 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
->storage_info()
->BottommostFilesMarkedForCompaction()
.empty()) {
SchedulePendingCompaction(cfd);
EnqueuePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
cf_scheduled.push_back(cfd);
}
Expand Down
21 changes: 19 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2216,10 +2216,27 @@ class DBImpl : public DB {
void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
FlushReason flush_reason, FlushRequest* req);

// Below functions are for executing flush, compaction in the background. A
// dequeue is the communication channel between threads that asks for the work
// to be done and the available threads in the thread pool that pick it up to
// execute it. We use these terminologies to describe the state of the work
// and its transitions:
// 1) It becomes pending once it's successfully enqueued into the
// corresponding dequeue, a work in this state is also called unscheduled.
// Counter `unscheduled_*_` counts work in this state.
// 2) When `MaybeScheduleFlushOrCompaction` schedule a thread to run `BGWork*`
// for the work, it becomes scheduled
// Counter `bg_*_scheduled_` counts work in this state.
// 3) Once the thread start to execute `BGWork*`, the work is popped from the
// dequeue, it is now in running state
// Counter `num_running_*_` counts work in this state.
// 4) Eventually, the work is finished. We don't need to specifically track
// finished work.

// Returns true if `req` is successfully enqueued.
bool SchedulePendingFlush(const FlushRequest& req);
bool EnqueuePendingFlush(const FlushRequest& req);

void SchedulePendingCompaction(ColumnFamilyData* cfd);
void EnqueuePendingCompaction(ColumnFamilyData* cfd);
void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
FileType type, uint64_t number, int job_id);
static void BGWorkCompaction(void* arg);
Expand Down
23 changes: 10 additions & 13 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2377,7 +2377,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
ColumnFamilyData* loop_cfd =
req.cfd_to_max_mem_id_to_persist.begin()->first;
bool already_queued_for_flush = loop_cfd->queued_for_flush();
bool flush_req_enqueued = SchedulePendingFlush(req);
bool flush_req_enqueued = EnqueuePendingFlush(req);
if (already_queued_for_flush || flush_req_enqueued) {
loop_cfd->SetFlushSkipReschedule();
}
Expand Down Expand Up @@ -2528,7 +2528,7 @@ Status DBImpl::AtomicFlushMemTables(
}
}
GenerateFlushRequest(cfds, flush_reason, &flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
MaybeScheduleFlushOrCompaction();
}

Expand Down Expand Up @@ -2583,7 +2583,7 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason,
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, flush_reason, &flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
flush_memtable_ids.push_back(iter.second);
}
Expand All @@ -2597,7 +2597,7 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason,
flush_reason,
{{cfd,
std::numeric_limits<uint64_t>::max() /* max_mem_id_to_persist */}}};
if (SchedulePendingFlush(flush_req)) {
if (EnqueuePendingFlush(flush_req)) {
cfd->SetFlushSkipReschedule();
};
}
Expand Down Expand Up @@ -2950,6 +2950,7 @@ void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
cfd->Ref();
compaction_queue_.push_back(cfd);
cfd->set_queued_for_compaction(true);
++unscheduled_compactions_;
}

ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
Expand Down Expand Up @@ -3005,7 +3006,7 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue(
return cfd;
}

bool DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
bool DBImpl::EnqueuePendingFlush(const FlushRequest& flush_req) {
mutex_.AssertHeld();
bool enqueued = false;
if (reject_new_background_jobs_) {
Expand Down Expand Up @@ -3041,16 +3042,15 @@ bool DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
return enqueued;
}

void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
void DBImpl::EnqueuePendingCompaction(ColumnFamilyData* cfd) {
mutex_.AssertHeld();
if (reject_new_background_jobs_) {
return;
}
if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
TEST_SYNC_POINT_CALLBACK("SchedulePendingCompaction::cfd",
TEST_SYNC_POINT_CALLBACK("EnqueuePendingCompaction::cfd",
static_cast<void*>(cfd));
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
}
}

Expand Down Expand Up @@ -3218,7 +3218,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
#ifndef NDEBUG
flush_req.reschedule_count += 1;
#endif /* !NDEBUG */
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
*reason = flush_reason;
*flush_rescheduled_to_retain_udt = true;
return Status::TryAgain();
Expand Down Expand Up @@ -3678,7 +3678,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
->ComputeCompactionScore(*(c->immutable_options()),
*(c->mutable_cf_options()));
AddToCompactionQueue(cfd);
++unscheduled_compactions_;

c.reset();
// Don't need to sleep here, because BackgroundCallCompaction
Expand Down Expand Up @@ -3707,7 +3706,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (cfd->NeedsCompaction()) {
// Yes, we need more compactions!
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
MaybeScheduleFlushOrCompaction();
}
}
Expand Down Expand Up @@ -3997,7 +3995,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
*(c->mutable_cf_options()));
if (!cfd->queued_for_compaction()) {
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
}
}
}
Expand Down Expand Up @@ -4269,7 +4266,7 @@ void DBImpl::InstallSuperVersionAndScheduleWork(

// Whenever we install new SuperVersion, we might need to issue new flushes or
// compactions.
SchedulePendingCompaction(cfd);
EnqueuePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();

// Update max_total_in_memory_state_
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_experimental.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Status DBImpl::SuggestCompactRange(ColumnFamilyHandle* column_family,
// compaction score
vstorage->ComputeCompactionScore(*cfd->ioptions(),
*cfd->GetLatestMutableCFOptions());
SchedulePendingCompaction(cfd);
EnqueuePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
}
return Status::OK();
Expand Down
12 changes: 6 additions & 6 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1789,13 +1789,13 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
if (!immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kWalFull, &flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
}
}
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kWalFull, &flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
}
MaybeScheduleFlushOrCompaction();
}
Expand Down Expand Up @@ -1881,13 +1881,13 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kWriteBufferManager,
&flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
}
}
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kWriteBufferManager, &flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
}
MaybeScheduleFlushOrCompaction();
}
Expand Down Expand Up @@ -2163,12 +2163,12 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
AssignAtomicFlushSeq(cfds);
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kWriteBufferFull, &flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
} else {
for (auto* cfd : cfds) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kWriteBufferFull, &flush_req);
SchedulePendingFlush(flush_req);
EnqueuePendingFlush(flush_req);
}
}
MaybeScheduleFlushOrCompaction();
Expand Down
2 changes: 1 addition & 1 deletion db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ Status FlushJob::MemPurge() {
new_mem->SetNextLogNumber(mems_[0]->GetNextLogNumber());

// This addition will not trigger another flush, because
// we do not call SchedulePendingFlush().
// we do not call EnqueuePendingFlush().
cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
new_mem->Ref();
// Piggyback FlushJobInfo on the first flushed memtable.
Expand Down

0 comments on commit a24574e

Please sign in to comment.