Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote Compaction - Remove CFOptions in CompactionServiceOptionsOverride #12981

Closed

Conversation

jaykorean
Copy link
Contributor

@jaykorean jaykorean commented Aug 28, 2024

Summary

In the initial version of the remote compaction, CompactionServiceOptionsOverride was introduced temporarily as a workaround to set the pointer configurations explicitly because passing them to the remote worker was not available at that time.

In the Meta's internal Offload infra, we are now able to register these objects set in the CFOptions including CompactionFilterFactory, MergeOperator and others. These pointer configs are passed to the remote worker as CustomSharedPtr, CustomUniquePtr, or CustomRawPtr Option types, which can be created by Type::createFromString() on the remote side as long as they are registered in the ObjectRegistry. (As defined in

static std::unordered_map<std::string, OptionTypeInfo>
cf_mutable_options_type_info = {
{"report_bg_io_stats",
{offsetof(struct MutableCFOptions, report_bg_io_stats),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"disable_auto_compactions",
{offsetof(struct MutableCFOptions, disable_auto_compactions),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"filter_deletes",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"check_flush_compaction_key_order",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"paranoid_file_checks",
{offsetof(struct MutableCFOptions, paranoid_file_checks),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"verify_checksums_in_compaction",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"soft_pending_compaction_bytes_limit",
{offsetof(struct MutableCFOptions,
soft_pending_compaction_bytes_limit),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"hard_pending_compaction_bytes_limit",
{offsetof(struct MutableCFOptions,
hard_pending_compaction_bytes_limit),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"hard_rate_limit",
{0, OptionType::kDouble, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"soft_rate_limit",
{0, OptionType::kDouble, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"max_compaction_bytes",
{offsetof(struct MutableCFOptions, max_compaction_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"ignore_max_compaction_bytes_for_input",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"expanded_compaction_factor",
{0, OptionType::kInt, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"level0_file_num_compaction_trigger",
{offsetof(struct MutableCFOptions, level0_file_num_compaction_trigger),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"level0_slowdown_writes_trigger",
{offsetof(struct MutableCFOptions, level0_slowdown_writes_trigger),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"level0_stop_writes_trigger",
{offsetof(struct MutableCFOptions, level0_stop_writes_trigger),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"max_grandparent_overlap_factor",
{0, OptionType::kInt, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"max_write_buffer_number",
{offsetof(struct MutableCFOptions, max_write_buffer_number),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"source_compaction_factor",
{0, OptionType::kInt, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"target_file_size_multiplier",
{offsetof(struct MutableCFOptions, target_file_size_multiplier),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"arena_block_size",
{offsetof(struct MutableCFOptions, arena_block_size),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"inplace_update_num_locks",
{offsetof(struct MutableCFOptions, inplace_update_num_locks),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"max_successive_merges",
{offsetof(struct MutableCFOptions, max_successive_merges),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"strict_max_successive_merges",
{offsetof(struct MutableCFOptions, strict_max_successive_merges),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"memtable_huge_page_size",
{offsetof(struct MutableCFOptions, memtable_huge_page_size),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"memtable_prefix_bloom_huge_page_tlb_size",
{0, OptionType::kSizeT, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"write_buffer_size",
{offsetof(struct MutableCFOptions, write_buffer_size),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"memtable_prefix_bloom_bits",
{0, OptionType::kUInt32T, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"memtable_prefix_bloom_size_ratio",
{offsetof(struct MutableCFOptions, memtable_prefix_bloom_size_ratio),
OptionType::kDouble, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"memtable_prefix_bloom_probes",
{0, OptionType::kUInt32T, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"memtable_whole_key_filtering",
{offsetof(struct MutableCFOptions, memtable_whole_key_filtering),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"min_partial_merge_operands",
{0, OptionType::kUInt32T, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"max_bytes_for_level_base",
{offsetof(struct MutableCFOptions, max_bytes_for_level_base),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"snap_refresh_nanos",
{0, OptionType::kUInt64T, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"max_bytes_for_level_multiplier",
{offsetof(struct MutableCFOptions, max_bytes_for_level_multiplier),
OptionType::kDouble, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"max_bytes_for_level_multiplier_additional",
OptionTypeInfo::Vector<int>(
offsetof(struct MutableCFOptions,
max_bytes_for_level_multiplier_additional),
OptionVerificationType::kNormal, OptionTypeFlags::kMutable,
{0, OptionType::kInt})},
{"max_sequential_skip_in_iterations",
{offsetof(struct MutableCFOptions, max_sequential_skip_in_iterations),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"target_file_size_base",
{offsetof(struct MutableCFOptions, target_file_size_base),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"compression",
{offsetof(struct MutableCFOptions, compression),
OptionType::kCompressionType, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"prefix_extractor",
OptionTypeInfo::AsCustomSharedPtr<const SliceTransform>(
offsetof(struct MutableCFOptions, prefix_extractor),
OptionVerificationType::kByNameAllowNull,
(OptionTypeFlags::kMutable | OptionTypeFlags::kAllowNull))},
{"compaction_options_fifo",
OptionTypeInfo::Struct(
"compaction_options_fifo", &fifo_compaction_options_type_info,
offsetof(struct MutableCFOptions, compaction_options_fifo),
OptionVerificationType::kNormal, OptionTypeFlags::kMutable)
.SetParseFunc([](const ConfigOptions& opts,
const std::string& name, const std::string& value,
void* addr) {
// This is to handle backward compatibility, where
// compaction_options_fifo could be assigned a single scalar
// value, say, like "23", which would be assigned to
// max_table_files_size.
if (name == "compaction_options_fifo" &&
value.find('=') == std::string::npos) {
// Old format. Parse just a single uint64_t value.
auto options = static_cast<CompactionOptionsFIFO*>(addr);
options->max_table_files_size = ParseUint64(value);
return Status::OK();
} else {
return OptionTypeInfo::ParseStruct(
opts, "compaction_options_fifo",
&fifo_compaction_options_type_info, name, value, addr);
}
})},
{"compaction_options_universal",
OptionTypeInfo::Struct(
"compaction_options_universal",
&universal_compaction_options_type_info,
offsetof(struct MutableCFOptions, compaction_options_universal),
OptionVerificationType::kNormal, OptionTypeFlags::kMutable)},
{"ttl",
{offsetof(struct MutableCFOptions, ttl), OptionType::kUInt64T,
OptionVerificationType::kNormal, OptionTypeFlags::kMutable}},
{"periodic_compaction_seconds",
{offsetof(struct MutableCFOptions, periodic_compaction_seconds),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"bottommost_temperature",
{0, OptionType::kTemperature, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"last_level_temperature",
{offsetof(struct MutableCFOptions, last_level_temperature),
OptionType::kTemperature, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"default_write_temperature",
{offsetof(struct MutableCFOptions, default_write_temperature),
OptionType::kTemperature, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"enable_blob_files",
{offsetof(struct MutableCFOptions, enable_blob_files),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"min_blob_size",
{offsetof(struct MutableCFOptions, min_blob_size),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"blob_file_size",
{offsetof(struct MutableCFOptions, blob_file_size),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"blob_compression_type",
{offsetof(struct MutableCFOptions, blob_compression_type),
OptionType::kCompressionType, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"enable_blob_garbage_collection",
{offsetof(struct MutableCFOptions, enable_blob_garbage_collection),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"blob_garbage_collection_age_cutoff",
{offsetof(struct MutableCFOptions, blob_garbage_collection_age_cutoff),
OptionType::kDouble, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"blob_garbage_collection_force_threshold",
{offsetof(struct MutableCFOptions,
blob_garbage_collection_force_threshold),
OptionType::kDouble, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"blob_compaction_readahead_size",
{offsetof(struct MutableCFOptions, blob_compaction_readahead_size),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"blob_file_starting_level",
{offsetof(struct MutableCFOptions, blob_file_starting_level),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"prepopulate_blob_cache",
OptionTypeInfo::Enum<PrepopulateBlobCache>(
offsetof(struct MutableCFOptions, prepopulate_blob_cache),
&prepopulate_blob_cache_string_map, OptionTypeFlags::kMutable)},
{"sample_for_compression",
{offsetof(struct MutableCFOptions, sample_for_compression),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"bottommost_compression",
{offsetof(struct MutableCFOptions, bottommost_compression),
OptionType::kCompressionType, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"compression_per_level",
OptionTypeInfo::Vector<CompressionType>(
offsetof(struct MutableCFOptions, compression_per_level),
OptionVerificationType::kNormal, OptionTypeFlags::kMutable,
{0, OptionType::kCompressionType})},
{"experimental_mempurge_threshold",
{offsetof(struct MutableCFOptions, experimental_mempurge_threshold),
OptionType::kDouble, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"memtable_protection_bytes_per_key",
{offsetof(struct MutableCFOptions, memtable_protection_bytes_per_key),
OptionType::kUInt32T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"bottommost_file_compaction_delay",
{offsetof(struct MutableCFOptions, bottommost_file_compaction_delay),
OptionType::kUInt32T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"uncache_aggressiveness",
{offsetof(struct MutableCFOptions, uncache_aggressiveness),
OptionType::kUInt32T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"block_protection_bytes_per_key",
{offsetof(struct MutableCFOptions, block_protection_bytes_per_key),
OptionType::kUInt8T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"paranoid_memory_checks",
{offsetof(struct MutableCFOptions, paranoid_memory_checks),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{kOptNameCompOpts,
OptionTypeInfo::Struct(
kOptNameCompOpts, &compression_options_type_info,
offsetof(struct MutableCFOptions, compression_opts),
OptionVerificationType::kNormal,
(OptionTypeFlags::kMutable | OptionTypeFlags::kCompareNever),
[](const ConfigOptions& opts, const std::string& name,
const std::string& value, void* addr) {
// This is to handle backward compatibility, where
// compression_options was a ":" separated list.
if (name == kOptNameCompOpts &&
value.find('=') == std::string::npos) {
auto* compression = static_cast<CompressionOptions*>(addr);
return ParseCompressionOptions(value, name, *compression);
} else {
return OptionTypeInfo::ParseStruct(
opts, kOptNameCompOpts, &compression_options_type_info,
name, value, addr);
}
})},
{kOptNameBMCompOpts,
OptionTypeInfo::Struct(
kOptNameBMCompOpts, &compression_options_type_info,
offsetof(struct MutableCFOptions, bottommost_compression_opts),
OptionVerificationType::kNormal,
(OptionTypeFlags::kMutable | OptionTypeFlags::kCompareNever),
[](const ConfigOptions& opts, const std::string& name,
const std::string& value, void* addr) {
// This is to handle backward compatibility, where
// compression_options was a ":" separated list.
if (name == kOptNameBMCompOpts &&
value.find('=') == std::string::npos) {
auto* compression = static_cast<CompressionOptions*>(addr);
return ParseCompressionOptions(value, name, *compression);
} else {
return OptionTypeInfo::ParseStruct(
opts, kOptNameBMCompOpts, &compression_options_type_info,
name, value, addr);
}
})},
// End special case properties
{"memtable_max_range_deletions",
{offsetof(struct MutableCFOptions, memtable_max_range_deletions),
OptionType::kUInt32T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
};
static std::unordered_map<std::string, OptionTypeInfo>
cf_immutable_options_type_info = {
/* not yet supported
CompressionOptions compression_opts;
TablePropertiesCollectorFactories table_properties_collector_factories;
using TablePropertiesCollectorFactories =
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>;
UpdateStatus (*inplace_callback)(char* existing_value,
uint34_t* existing_value_size,
Slice delta_value,
std::string* merged_value);
std::vector<DbPath> cf_paths;
*/
{"compaction_measure_io_stats",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kNone}},
{"purge_redundant_kvs_while_flush",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kNone}},
{"inplace_update_support",
{offsetof(struct ImmutableCFOptions, inplace_update_support),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"level_compaction_dynamic_level_bytes",
{offsetof(struct ImmutableCFOptions,
level_compaction_dynamic_level_bytes),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"level_compaction_dynamic_file_size",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kNone}},
{"optimize_filters_for_hits",
{offsetof(struct ImmutableCFOptions, optimize_filters_for_hits),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"force_consistency_checks",
{offsetof(struct ImmutableCFOptions, force_consistency_checks),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"default_temperature",
{offsetof(struct ImmutableCFOptions, default_temperature),
OptionType::kTemperature, OptionVerificationType::kNormal,
OptionTypeFlags::kCompareNever}},
{"preclude_last_level_data_seconds",
{offsetof(struct ImmutableCFOptions, preclude_last_level_data_seconds),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"preserve_internal_time_seconds",
{offsetof(struct ImmutableCFOptions, preserve_internal_time_seconds),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
// Need to keep this around to be able to read old OPTIONS files.
{"max_mem_compaction_level",
{0, OptionType::kInt, OptionVerificationType::kDeprecated,
OptionTypeFlags::kNone}},
{"max_write_buffer_number_to_maintain",
{offsetof(struct ImmutableCFOptions,
max_write_buffer_number_to_maintain),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone, nullptr}},
{"max_write_buffer_size_to_maintain",
{offsetof(struct ImmutableCFOptions,
max_write_buffer_size_to_maintain),
OptionType::kInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"min_write_buffer_number_to_merge",
{offsetof(struct ImmutableCFOptions, min_write_buffer_number_to_merge),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kNone, nullptr}},
{"num_levels",
{offsetof(struct ImmutableCFOptions, num_levels), OptionType::kInt,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"bloom_locality",
{offsetof(struct ImmutableCFOptions, bloom_locality),
OptionType::kUInt32T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"rate_limit_delay_max_milliseconds",
{0, OptionType::kUInt, OptionVerificationType::kDeprecated,
OptionTypeFlags::kNone}},
{"comparator",
OptionTypeInfo::AsCustomRawPtr<const Comparator>(
offsetof(struct ImmutableCFOptions, user_comparator),
OptionVerificationType::kByName, OptionTypeFlags::kCompareLoose)
.SetSerializeFunc(
// Serializes a Comparator
[](const ConfigOptions& opts, const std::string&,
const void* addr, std::string* value) {
// it's a const pointer of const Comparator*
const auto* ptr =
static_cast<const Comparator* const*>(addr);
if (*ptr == nullptr) {
*value = kNullptrString;
} else if (opts.mutable_options_only) {
*value = "";
} else {
*value = (*ptr)->ToString(opts);
}
return Status::OK();
})},
{"memtable_insert_with_hint_prefix_extractor",
OptionTypeInfo::AsCustomSharedPtr<const SliceTransform>(
offsetof(struct ImmutableCFOptions,
memtable_insert_with_hint_prefix_extractor),
OptionVerificationType::kByNameAllowNull, OptionTypeFlags::kNone)},
{"memtable_factory",
{offsetof(struct ImmutableCFOptions, memtable_factory),
OptionType::kCustomizable, OptionVerificationType::kByName,
OptionTypeFlags::kShared,
[](const ConfigOptions& opts, const std::string&,
const std::string& value, void* addr) {
std::unique_ptr<MemTableRepFactory> factory;
auto* shared =
static_cast<std::shared_ptr<MemTableRepFactory>*>(addr);
Status s =
MemTableRepFactory::CreateFromString(opts, value, shared);
return s;
}}},
{"memtable",
{offsetof(struct ImmutableCFOptions, memtable_factory),
OptionType::kCustomizable, OptionVerificationType::kAlias,
OptionTypeFlags::kShared,
[](const ConfigOptions& opts, const std::string&,
const std::string& value, void* addr) {
std::unique_ptr<MemTableRepFactory> factory;
auto* shared =
static_cast<std::shared_ptr<MemTableRepFactory>*>(addr);
Status s =
MemTableRepFactory::CreateFromString(opts, value, shared);
return s;
}}},
{"table_factory",
OptionTypeInfo::AsCustomSharedPtr<TableFactory>(
offsetof(struct ImmutableCFOptions, table_factory),
OptionVerificationType::kByName,
(OptionTypeFlags::kCompareLoose |
OptionTypeFlags::kStringNameOnly |
OptionTypeFlags::kDontPrepare))},
{"block_based_table_factory",
{offsetof(struct ImmutableCFOptions, table_factory),
OptionType::kCustomizable, OptionVerificationType::kAlias,
OptionTypeFlags::kShared | OptionTypeFlags::kCompareLoose,
// Parses the input value and creates a BlockBasedTableFactory
[](const ConfigOptions& opts, const std::string& name,
const std::string& value, void* addr) {
BlockBasedTableOptions* old_opts = nullptr;
auto table_factory =
static_cast<std::shared_ptr<TableFactory>*>(addr);
if (table_factory->get() != nullptr) {
old_opts =
table_factory->get()->GetOptions<BlockBasedTableOptions>();
}
if (name == "block_based_table_factory") {
std::unique_ptr<TableFactory> new_factory;
if (old_opts != nullptr) {
new_factory.reset(NewBlockBasedTableFactory(*old_opts));
} else {
new_factory.reset(NewBlockBasedTableFactory());
}
Status s = new_factory->ConfigureFromString(opts, value);
if (s.ok()) {
table_factory->reset(new_factory.release());
}
return s;
} else if (old_opts != nullptr) {
return table_factory->get()->ConfigureOption(opts, name, value);
} else {
return Status::NotFound("Mismatched table option: ", name);
}
}}},
{"plain_table_factory",
{offsetof(struct ImmutableCFOptions, table_factory),
OptionType::kCustomizable, OptionVerificationType::kAlias,
OptionTypeFlags::kShared | OptionTypeFlags::kCompareLoose,
// Parses the input value and creates a PlainTableFactory
[](const ConfigOptions& opts, const std::string& name,
const std::string& value, void* addr) {
PlainTableOptions* old_opts = nullptr;
auto table_factory =
static_cast<std::shared_ptr<TableFactory>*>(addr);
if (table_factory->get() != nullptr) {
old_opts = table_factory->get()->GetOptions<PlainTableOptions>();
}
if (name == "plain_table_factory") {
std::unique_ptr<TableFactory> new_factory;
if (old_opts != nullptr) {
new_factory.reset(NewPlainTableFactory(*old_opts));
} else {
new_factory.reset(NewPlainTableFactory());
}
Status s = new_factory->ConfigureFromString(opts, value);
if (s.ok()) {
table_factory->reset(new_factory.release());
}
return s;
} else if (old_opts != nullptr) {
return table_factory->get()->ConfigureOption(opts, name, value);
} else {
return Status::NotFound("Mismatched table option: ", name);
}
}}},
{"table_properties_collectors",
OptionTypeInfo::Vector<
std::shared_ptr<TablePropertiesCollectorFactory>>(
offsetof(struct ImmutableCFOptions,
table_properties_collector_factories),
OptionVerificationType::kByName, OptionTypeFlags::kNone,
OptionTypeInfo::AsCustomSharedPtr<TablePropertiesCollectorFactory>(
0, OptionVerificationType::kByName, OptionTypeFlags::kNone))},
{"compaction_filter",
OptionTypeInfo::AsCustomRawPtr<const CompactionFilter>(
offsetof(struct ImmutableCFOptions, compaction_filter),
OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)},
{"compaction_filter_factory",
OptionTypeInfo::AsCustomSharedPtr<CompactionFilterFactory>(
offsetof(struct ImmutableCFOptions, compaction_filter_factory),
OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)},
{"merge_operator",
OptionTypeInfo::AsCustomSharedPtr<MergeOperator>(
offsetof(struct ImmutableCFOptions, merge_operator),
OptionVerificationType::kByNameAllowFromNull,
OptionTypeFlags::kCompareLoose | OptionTypeFlags::kAllowNull)},
{"compaction_style",
{offsetof(struct ImmutableCFOptions, compaction_style),
OptionType::kCompactionStyle, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"compaction_pri",
{offsetof(struct ImmutableCFOptions, compaction_pri),
OptionType::kCompactionPri, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"sst_partitioner_factory",
OptionTypeInfo::AsCustomSharedPtr<SstPartitionerFactory>(
offsetof(struct ImmutableCFOptions, sst_partitioner_factory),
OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)},
{"blob_cache",
{offsetof(struct ImmutableCFOptions, blob_cache), OptionType::kUnknown,
OptionVerificationType::kNormal,
(OptionTypeFlags::kCompareNever | OptionTypeFlags::kDontSerialize),
// Parses the input value as a Cache
[](const ConfigOptions& opts, const std::string&,
const std::string& value, void* addr) {
auto* cache = static_cast<std::shared_ptr<Cache>*>(addr);
return Cache::CreateFromString(opts, value, cache);
}}},
{"persist_user_defined_timestamps",
{offsetof(struct ImmutableCFOptions, persist_user_defined_timestamps),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kCompareLoose}},
};
)

In this PR, we are also not allowing remote compactions with options.compaction_filter set. options.compaction_filter is serialized as CustomRawPtr and CustomRawPtr objects are loaded as static objects. This can become a problem if the remote worker processes compactions from multiple different primary hosts with the same compaction filter with different options. To workaround this, we are only allowing CompactionFilterFactory. For compactions with options.compaction_filter set, they will fall back to local compaction for now.

Test Plan

Updated compaction_service_test::CompactionFilter test and added Updated compaction_service_test::CompactionFilterFactory

./compaction_service_test

Also tested with Meta's internal Offload infra

@jaykorean jaykorean changed the title CompactionFilter and other CFOptions from ObjectRegistry in Remote Compaction Remote Compaction - CompactionFilter and other CFOptions from ObjectRegistry Aug 28, 2024
@jaykorean jaykorean marked this pull request as ready for review August 28, 2024 22:58
@facebook-github-bot
Copy link
Contributor

@jaykorean has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@@ -2289,13 +2289,17 @@ struct CompactionServiceOptionsOverride {
Env* env = Env::Default();
std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory = nullptr;

// DEPRECATED - CFOption pointer configurations will be passed as static name
// These will need to be registered in the remote worker's ObjectRegistry,
// then created by Type::CreateFromString() when needed.
Copy link
Member

@cbi42 cbi42 Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since remove compaction APIs are experiment, would it be safe to just remove these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an old prototype code in fbcode still referencing this, so that's why I didn't remove them at first, but I guess can clean up the old prototype now before we merge this PR into fbcode.

@facebook-github-bot
Copy link
Contributor

@jaykorean has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@jaykorean has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@jaykorean
Copy link
Contributor Author

Plan changed

@jaykorean jaykorean closed this Aug 30, 2024
@jaykorean jaykorean reopened this Sep 6, 2024
@facebook-github-bot
Copy link
Contributor

@jaykorean has updated the pull request. You must reimport the pull request before landing.

@jaykorean jaykorean changed the title Remote Compaction - CompactionFilter and other CFOptions from ObjectRegistry Remote Compaction - Remove CFOptions in CompactionServiceOptionsOverride Sep 6, 2024
@facebook-github-bot
Copy link
Contributor

@jaykorean has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@jaykorean jaykorean closed this Sep 9, 2024
@jaykorean jaykorean deleted the remote_compaction_ptr_options branch September 11, 2024 22:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants