Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote Compaction - Remove CFOptions in CompactionServiceOptionsOverride #12981

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions db/compaction/compaction_service_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,20 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
MakeTableFileName(file->fd.GetNumber()));
}
}

// CompactionFilter explicitly set in options.compaction_filter is not yet
// supported. Use CompactionFilterFactory instead.
if (compaction->column_family_data()
->initial_cf_options()
.compaction_filter != nullptr) {
return CompactionServiceJobStatus::kUseLocal;
}

compaction_input.column_family.name =
compaction->column_family_data()->GetName();
compaction_input.column_family.options =
compaction->column_family_data()->GetLatestCFOptions();
compaction_input.column_family.options = BuildColumnFamilyOptions(
compaction->column_family_data()->initial_cf_options(),
*compaction->mutable_cf_options());
compaction_input.db_options =
BuildDBOptions(db_options_, mutable_db_options_copy_);
compaction_input.snapshots = existing_snapshots_;
Expand Down
108 changes: 81 additions & 27 deletions db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,44 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).


#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/utilities/object_registry.h"
#include "table/unique_id_impl.h"

namespace ROCKSDB_NAMESPACE {

class PartialDeleteCompactionFilter : public CompactionFilter {
public:
CompactionFilter::Decision FilterV2(
int /*level*/, const Slice& key, ValueType /*value_type*/,
const Slice& /*existing_value*/, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
int i = std::stoi(key.ToString().substr(3));
if (i > 5 && i <= 105) {
return CompactionFilter::Decision::kRemove;
}
return CompactionFilter::Decision::kKeep;
}

static const char* kClassName() { return "PartialDeleteCompactionFilter"; }
const char* Name() const override { return kClassName(); }
};

class PartialDeleteCompactionFilterFactory : public CompactionFilterFactory {
public:
static const char* kClassName() {
return "PartialDeleteCompactionFilterFactory";
}
const char* Name() const override { return kClassName(); }

std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) override {
return std::unique_ptr<CompactionFilter>(
new PartialDeleteCompactionFilter());
}
};

class MyTestCompactionService : public CompactionService {
public:
MyTestCompactionService(
Expand All @@ -25,7 +56,28 @@ class MyTestCompactionService : public CompactionService {
wait_info_("na", "na", "na", 0, Env::TOTAL),
listeners_(listeners),
table_properties_collector_factories_(
std::move(table_properties_collector_factories)) {}
std::move(table_properties_collector_factories)) {
// Register Compaction Filter Factory
static std::once_flag once;
std::call_once(once, [&]() {
ObjectRegistry::Default()->AddLibrary(
"MyTestCompactionService", RegisterCompactionFilterFactory, "");
});
}

static int RegisterCompactionFilterFactory(ObjectLibrary& library,
const std::string& /*arg*/) {
library.AddFactory<CompactionFilterFactory>(
PartialDeleteCompactionFilterFactory::kClassName(),
[](const std::string& /*uri*/,
std::unique_ptr<CompactionFilterFactory>* guard,
std::string* /* errmsg */) {
guard->reset(new PartialDeleteCompactionFilterFactory());
return guard->get();
});
size_t num_types;
return static_cast<int>(library.GetFactoryCount(&num_types));
}

static const char* kClassName() { return "MyTestCompactionService"; }

Expand Down Expand Up @@ -73,19 +125,10 @@ class MyTestCompactionService : public CompactionService {
options_override.env = options_.env;
options_override.file_checksum_gen_factory =
options_.file_checksum_gen_factory;
options_override.comparator = options_.comparator;
options_override.merge_operator = options_.merge_operator;
options_override.compaction_filter = options_.compaction_filter;
options_override.compaction_filter_factory =
options_.compaction_filter_factory;
options_override.prefix_extractor = options_.prefix_extractor;
options_override.table_factory = options_.table_factory;
options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
options_override.statistics = statistics_;
if (!listeners_.empty()) {
options_override.listeners = listeners_;
}

if (!table_properties_collector_factories_.empty()) {
options_override.table_properties_collector_factories =
table_properties_collector_factories_;
Expand Down Expand Up @@ -477,26 +520,12 @@ TEST_F(CompactionServiceTest, SubCompaction) {
ASSERT_GE(compaction_num, 2);
}

class PartialDeleteCompactionFilter : public CompactionFilter {
public:
CompactionFilter::Decision FilterV2(
int /*level*/, const Slice& key, ValueType /*value_type*/,
const Slice& /*existing_value*/, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
int i = std::stoi(key.ToString().substr(3));
if (i > 5 && i <= 105) {
return CompactionFilter::Decision::kRemove;
}
return CompactionFilter::Decision::kKeep;
}

const char* Name() const override { return "PartialDeleteCompactionFilter"; }
};

TEST_F(CompactionServiceTest, CompactionFilter) {
Options options = CurrentOptions();
std::unique_ptr<CompactionFilter> delete_comp_filter(
new PartialDeleteCompactionFilter());
// CompactionFilter explicitly set by options.compaction_filter is not
// supported by Remote Compaction, yet. Use Local compaction instead
options.compaction_filter = delete_comp_filter.get();
ReopenWithCompactionService(&options);
GenerateTestData();
Expand All @@ -513,6 +542,31 @@ TEST_F(CompactionServiceTest, CompactionFilter) {
ASSERT_EQ(result, "value_new" + std::to_string(i));
}
}
// Verify that the compaction is done locally
auto my_cs = GetCompactionService();
ASSERT_EQ(my_cs->GetCompactionNum(), 0);
}

TEST_F(CompactionServiceTest, CompactionFilterFactory) {
Options options = CurrentOptions();
options.compaction_filter_factory =
std::make_shared<PartialDeleteCompactionFilterFactory>();
ReopenWithCompactionService(&options);
GenerateTestData();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// verify result
for (int i = 0; i < 200; i++) {
auto result = Get(Key(i));
if (i > 5 && i <= 105) {
ASSERT_EQ(result, "NOT_FOUND");
} else if (i % 2) {
ASSERT_EQ(result, "value" + std::to_string(i));
} else {
ASSERT_EQ(result, "value_new" + std::to_string(i));
}
}
// Verify that the compaction is done remotely
auto my_cs = GetCompactionService();
ASSERT_GE(my_cs->GetCompactionNum(), 1);
}
Expand Down
17 changes: 2 additions & 15 deletions db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -956,23 +956,10 @@ Status DB::OpenAndCompact(
compaction_input.db_options.file_checksum_gen_factory =
override_options.file_checksum_gen_factory;
compaction_input.db_options.statistics = override_options.statistics;
compaction_input.column_family.options.comparator =
override_options.comparator;
compaction_input.column_family.options.merge_operator =
override_options.merge_operator;
compaction_input.column_family.options.compaction_filter =
override_options.compaction_filter;
compaction_input.column_family.options.compaction_filter_factory =
override_options.compaction_filter_factory;
compaction_input.column_family.options.prefix_extractor =
override_options.prefix_extractor;
compaction_input.column_family.options.table_factory =
override_options.table_factory;
compaction_input.column_family.options.sst_partitioner_factory =
override_options.sst_partitioner_factory;
compaction_input.db_options.listeners = override_options.listeners;

compaction_input.column_family.options.table_properties_collector_factories =
override_options.table_properties_collector_factories;
compaction_input.db_options.listeners = override_options.listeners;
jaykorean marked this conversation as resolved.
Show resolved Hide resolved

std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(compaction_input.column_family);
Expand Down
8 changes: 0 additions & 8 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -2293,14 +2293,6 @@ struct CompactionServiceOptionsOverride {
Env* env = Env::Default();
std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory = nullptr;

const Comparator* comparator = BytewiseComparator();
std::shared_ptr<MergeOperator> merge_operator = nullptr;
const CompactionFilter* compaction_filter = nullptr;
std::shared_ptr<CompactionFilterFactory> compaction_filter_factory = nullptr;
std::shared_ptr<const SliceTransform> prefix_extractor = nullptr;
std::shared_ptr<TableFactory> table_factory;
std::shared_ptr<SstPartitionerFactory> sst_partitioner_factory = nullptr;

// Only subsets of events are triggered in remote compaction worker, like:
// `OnTableFileCreated`, `OnTableFileCreationStarted`,
// `ShouldBeNotifiedOnFileIO` `OnSubcompactionBegin`,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The following pointer configurations to pass to remote worker are no longer explicitly set by CompactionServiceOptionsOverride - comparator, merge_operator, compaction_filter, compaction_filter_factory, prefix_extractor, table_factory, and sst_partitioner_factory. They need to be registered in the ObjectRegistry of the remote worker prior to the compaction. Please note that Remote Compaction APIs are still marked as experimental and subject to change.
Loading