Skip to content

Commit

Permalink
Fix orphaned files in SstFileManager (#13015)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #13015

`Close()`ing a database now releases tracked files in `SstFileManager`. Previously this space would be leaked until the database was later reopened.

Differential Revision: D62590773
  • Loading branch information
Nick Brekhus authored and facebook-github-bot committed Sep 18, 2024
1 parent 4d3d200 commit 2daccfb
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 41 deletions.
68 changes: 68 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,11 @@ Status DBImpl::MaybeReleaseTimestampedSnapshotsAndCheck() {
return Status::OK();
}

void DBImpl::UntrackDataFiles() {
TrackOrUntrackFiles(/*existing_data_files=*/{},
/*track=*/false);
}

Status DBImpl::CloseHelper() {
// Guarantee that there is no background error recovery in progress before
// continuing with the shutdown
Expand Down Expand Up @@ -669,6 +674,13 @@ Status DBImpl::CloseHelper() {
delete txn_entry.second;
}

// Return an unowned SstFileManager to a consistent state
if (immutable_db_options_.sst_file_manager && !own_sfm_) {
mutex_.Unlock();
UntrackDataFiles();
mutex_.Lock();
}

// versions need to be destroyed before table_cache since it can hold
// references to table_cache.
{
Expand Down Expand Up @@ -6747,6 +6759,62 @@ void DBImpl::RecordSeqnoToTimeMapping(uint64_t populate_historical_seconds) {
}
}

void DBImpl::TrackOrUntrackFiles(
const std::vector<std::string>& existing_data_files, bool track) {
auto sfm = static_cast_with_check<SstFileManagerImpl>(
immutable_db_options_.sst_file_manager.get());
assert(sfm);
std::vector<ColumnFamilyMetaData> metadata;
GetAllColumnFamilyMetaData(&metadata);
auto action = [&](const std::string& file_path,
std::optional<uint64_t> size) {
if (track) {
if (size) {
sfm->OnAddFile(file_path, *size).PermitUncheckedError();
} else {
sfm->OnAddFile(file_path).PermitUncheckedError();
}
} else {
sfm->OnUntrackFile(file_path).PermitUncheckedError();
}
};

std::unordered_set<std::string> referenced_files;
for (const auto& md : metadata) {
for (const auto& lmd : md.levels) {
for (const auto& fmd : lmd.files) {
// We're assuming that each sst file name exists in at most one of
// the paths.
std::string file_path =
fmd.directory + kFilePathSeparator + fmd.relative_filename;
action(file_path, fmd.size);
referenced_files.insert(file_path);
}
}
for (const auto& bmd : md.blob_files) {
std::string name = bmd.blob_file_name;
// The BlobMetaData.blob_file_name may start with "/".
if (!name.empty() && name[0] == kFilePathSeparator) {
name = name.substr(1);
}
// We're assuming that each blob file name exists in at most one of
// the paths.
std::string file_path = bmd.blob_file_path + kFilePathSeparator + name;
action(file_path, bmd.blob_file_size);
referenced_files.insert(file_path);
}
}

for (const auto& file_path : existing_data_files) {
if (referenced_files.find(file_path) != referenced_files.end()) {
continue;
}
// There shouldn't be any duplicated files. In case there is, SstFileManager
// will take care of deduping it.
action(file_path, /*size=*/std::nullopt);
}
}

void DBImpl::InstallSeqnoToTimeMappingInSV(
std::vector<SuperVersionContext>* sv_contexts) {
mutex_.AssertHeld();
Expand Down
10 changes: 10 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1618,9 +1618,15 @@ class DBImpl : public DB {
// vast majority of all files), since it already has the file size
// on record, we don't need to query the file system. Otherwise, we query the
// file system for the size of an unreferenced file.
// REQUIRES: mutex unlocked
void TrackExistingDataFiles(
const std::vector<std::string>& existing_data_files);

// Untrack data files in sst manager. This is only called during DB::Close on
// an unowned SstFileManager, to return it to a consistent state.
// REQUIRES: mutex unlocked
void UntrackDataFiles();

// SetDbSessionId() should be called in the constuctor DBImpl()
// to ensure that db_session_id_ gets updated every time the DB is opened
void SetDbSessionId();
Expand Down Expand Up @@ -2190,6 +2196,10 @@ class DBImpl : public DB {
JobContext* job_context, LogBuffer* log_buffer,
CompactionJobInfo* compaction_job_info);

// REQUIRES: mutex unlocked
void TrackOrUntrackFiles(const std::vector<std::string>& existing_data_files,
bool track);

ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);

void MaybeScheduleFlushOrCompaction();
Expand Down
41 changes: 1 addition & 40 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1988,46 +1988,7 @@ IOStatus DBImpl::CreateWAL(const WriteOptions& write_options,

void DBImpl::TrackExistingDataFiles(
const std::vector<std::string>& existing_data_files) {
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
assert(sfm);
std::vector<ColumnFamilyMetaData> metadata;
GetAllColumnFamilyMetaData(&metadata);

std::unordered_set<std::string> referenced_files;
for (const auto& md : metadata) {
for (const auto& lmd : md.levels) {
for (const auto& fmd : lmd.files) {
// We're assuming that each sst file name exists in at most one of
// the paths.
std::string file_path =
fmd.directory + kFilePathSeparator + fmd.relative_filename;
sfm->OnAddFile(file_path, fmd.size).PermitUncheckedError();
referenced_files.insert(file_path);
}
}
for (const auto& bmd : md.blob_files) {
std::string name = bmd.blob_file_name;
// The BlobMetaData.blob_file_name may start with "/".
if (!name.empty() && name[0] == kFilePathSeparator) {
name = name.substr(1);
}
// We're assuming that each blob file name exists in at most one of
// the paths.
std::string file_path = bmd.blob_file_path + kFilePathSeparator + name;
sfm->OnAddFile(file_path, bmd.blob_file_size).PermitUncheckedError();
referenced_files.insert(file_path);
}
}

for (const auto& file_path : existing_data_files) {
if (referenced_files.find(file_path) != referenced_files.end()) {
continue;
}
// There shouldn't be any duplicated files. In case there is, SstFileManager
// will take care of deduping it.
sfm->OnAddFile(file_path).PermitUncheckedError();
}
TrackOrUntrackFiles(existing_data_files, /*track=*/true);
}

Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
Expand Down
24 changes: 24 additions & 0 deletions db/db_sst_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,12 +383,16 @@ TEST_F(DBSSTTest, DBWithSstFileManager) {
ASSERT_EQ(files_moved, 0);

Close();
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
Reopen(options);
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);

// Verify that we track all the files again after the DB is closed and opened
Close();
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
sst_file_manager.reset(NewSstFileManager(env_));
options.sst_file_manager = sst_file_manager;
sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Expand Down Expand Up @@ -439,6 +443,11 @@ TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFiles) {

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; });

int64_t untracked_files = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnUntrackFile",
[&](void* /*arg*/) { ++untracked_files; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

Options options = CurrentOptions();
Expand Down Expand Up @@ -485,13 +494,21 @@ TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFiles) {
}
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
Close();
ASSERT_EQ(untracked_files, files_in_db.size());
untracked_files = 0;
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";

Reopen(options);
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);

// Verify that we track all the files again after the DB is closed and opened.
Close();
ASSERT_EQ(untracked_files, files_in_db.size());
untracked_files = 0;
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";

sst_file_manager.reset(NewSstFileManager(env_));
options.sst_file_manager = sst_file_manager;
Expand All @@ -507,6 +524,10 @@ TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFiles) {
ASSERT_EQ(files_deleted, 0);
ASSERT_EQ(files_scheduled_to_delete, 0);
Close();
ASSERT_EQ(untracked_files, files_in_db.size());
untracked_files = 0;
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleUnaccountedFileDeletion", [&](void* arg) {
assert(arg);
Expand Down Expand Up @@ -666,6 +687,9 @@ TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFilesWithGC) {
}

Close();
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleUnaccountedFileDeletion", [&](void* arg) {
assert(arg);
Expand Down
10 changes: 10 additions & 0 deletions file/sst_file_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ Status SstFileManagerImpl::OnMoveFile(const std::string& old_path,
return Status::OK();
}

Status SstFileManagerImpl::OnUntrackFile(const std::string& file_path) {
{
MutexLock l(&mu_);
OnDeleteFileImpl(file_path);
}
TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::OnUntrackFile",
const_cast<std::string*>(&file_path));
return Status::OK();
}

void SstFileManagerImpl::SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) {
MutexLock l(&mu_);
max_allowed_space_ = max_allowed_space;
Expand Down
4 changes: 3 additions & 1 deletion file/sst_file_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class SstFileManagerImpl : public SstFileManager {
Status OnMoveFile(const std::string& old_path, const std::string& new_path,
uint64_t* file_size = nullptr);

// DB will call OnUntrackFile when closing with an unowned SstFileManager.
Status OnUntrackFile(const std::string& file_path);

// Update the maximum allowed space that should be used by RocksDB, if
// the total size of the SST and blob files exceeds max_allowed_space, writes
// to RocksDB will fail.
Expand Down Expand Up @@ -217,4 +220,3 @@ class SstFileManagerImpl : public SstFileManager {
};

} // namespace ROCKSDB_NAMESPACE

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DB::Close now untracks files in SstFileManager, making avaialble any space used
by them. Prior to this change they would be orphaned until the DB is re-opened.

0 comments on commit 2daccfb

Please sign in to comment.