Skip to content

Commit

Permalink
Add the ignorable flag for the settings.
Browse files Browse the repository at this point in the history
  • Loading branch information
Vitaly Baranov committed Nov 17, 2019
1 parent 64c1f0b commit e40c140
Show file tree
Hide file tree
Showing 6 changed files with 371 additions and 346 deletions.
551 changes: 277 additions & 274 deletions dbms/src/Core/Settings.h

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions dbms/src/Core/SettingsCollection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,18 @@ namespace details
return name;
}

void SettingsCollectionUtils::serializeFlag(bool flag, WriteBuffer & buf)
{
buf.write(flag);
}

bool SettingsCollectionUtils::deserializeFlag(ReadBuffer & buf)
{
char c;
buf.readStrict(c);
return c;
}

void SettingsCollectionUtils::skipValue(ReadBuffer & buf)
{
/// Ignore a string written by the function writeStringBinary().
Expand Down
11 changes: 6 additions & 5 deletions dbms/src/Core/SettingsCollection.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ using SettingLogsLevel = SettingEnum<LogsLevel>;
enum class SettingsBinaryFormat
{
OLD, /// Part of the settings are serialized as strings, and other part as varints. This is the old behaviour.
STRINGS, /// All settings are serialized as strings.
STRINGS, /// All settings are serialized as strings. Before each value the flag `is_ignorable` is serialized.
DEFAULT = STRINGS,
};

Expand All @@ -285,9 +285,9 @@ enum class SettingsBinaryFormat
* struct MySettings : public SettingsCollection<MySettings>
* {
* # define APPLY_FOR_MYSETTINGS(M) \
* M(SettingUInt64, a, 100, "Description of a") \
* M(SettingFloat, f, 3.11, "Description of f") \
* M(SettingString, s, "default", "Description of s")
* M(SettingUInt64, a, 100, "Description of a", 0) \
* M(SettingFloat, f, 3.11, "Description of f", IGNORABLE) // IGNORABLE - means the setting can be ignored by older versions) \
* M(SettingString, s, "default", "Description of s", 0)
*
* DECLARE_SETTINGS_COLLECTION(MySettings, APPLY_FOR_MYSETTINGS)
* };
Expand Down Expand Up @@ -316,6 +316,7 @@ class SettingsCollection

StringRef name;
StringRef description;
bool is_ignorable;
IsChangedFunction is_changed;
GetStringFunction get_string;
GetFieldFunction get_field;
Expand Down Expand Up @@ -511,6 +512,6 @@ class SettingsCollection
#define DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS_MACRO) \
LIST_OF_SETTINGS_MACRO(DECLARE_SETTINGS_COLLECTION_DECLARE_VARIABLES_HELPER_)

#define DECLARE_SETTINGS_COLLECTION_DECLARE_VARIABLES_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
#define DECLARE_SETTINGS_COLLECTION_DECLARE_VARIABLES_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION, FLAGS) \
TYPE NAME {DEFAULT};
}
15 changes: 12 additions & 3 deletions dbms/src/Core/SettingsCollectionImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ namespace details
{
static void serializeName(const StringRef & name, WriteBuffer & buf);
static String deserializeName(ReadBuffer & buf);
static void serializeFlag(bool flag, WriteBuffer & buf);
static bool deserializeFlag(ReadBuffer & buf);
static void skipValue(ReadBuffer & buf);
static void warningNameNotFound(const StringRef & name);
[[noreturn]] static void throwNameNotFound(const StringRef & name);
Expand Down Expand Up @@ -251,6 +253,8 @@ void SettingsCollection<Derived>::serialize(WriteBuffer & buf, SettingsBinaryFor
if (member.is_changed(castToDerived()))
{
details::SettingsCollectionUtils::serializeName(member.name, buf);
if (format >= SettingsBinaryFormat::STRINGS)
details::SettingsCollectionUtils::serializeFlag(member.is_ignorable, buf);
member.serialize(castToDerived(), buf, format);
}
}
Expand All @@ -268,11 +272,12 @@ void SettingsCollection<Derived>::deserialize(ReadBuffer & buf, SettingsBinaryFo
if (name.empty() /* empty string is a marker of the end of settings */)
break;
auto * member = the_members.find(name);
bool is_ignorable = (format >= SettingsBinaryFormat::STRINGS) ? details::SettingsCollectionUtils::deserializeFlag(buf) : false;
if (member)
{
member->deserialize(castToDerived(), buf, format);
}
else if (format >= SettingsBinaryFormat::STRINGS)
else if (is_ignorable)
{
details::SettingsCollectionUtils::warningNameNotFound(name);
details::SettingsCollectionUtils::skipValue(buf);
Expand All @@ -283,6 +288,7 @@ void SettingsCollection<Derived>::deserialize(ReadBuffer & buf, SettingsBinaryFo
}


//-V:IMPLEMENT_SETTINGS_COLLECTION:501
#define IMPLEMENT_SETTINGS_COLLECTION(DERIVED_CLASS_NAME, LIST_OF_SETTINGS_MACRO) \
template<> \
SettingsCollection<DERIVED_CLASS_NAME>::MemberInfos::MemberInfos() \
Expand All @@ -292,6 +298,8 @@ void SettingsCollection<Derived>::deserialize(ReadBuffer & buf, SettingsBinaryFo
{ \
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_DEFINE_FUNCTIONS_HELPER_) \
}; \
constexpr int IGNORABLE = 1; \
UNUSED(IGNORABLE); \
LIST_OF_SETTINGS_MACRO(IMPLEMENT_SETTINGS_COLLECTION_ADD_MEMBER_INFO_HELPER_) \
} \
/** \
Expand All @@ -301,7 +309,7 @@ void SettingsCollection<Derived>::deserialize(ReadBuffer & buf, SettingsBinaryFo
template class SettingsCollection<DERIVED_CLASS_NAME>;


#define IMPLEMENT_SETTINGS_COLLECTION_DEFINE_FUNCTIONS_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
#define IMPLEMENT_SETTINGS_COLLECTION_DEFINE_FUNCTIONS_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION, FLAGS) \
static String NAME##_getString(const Derived & collection) { return collection.NAME.toString(); } \
static Field NAME##_getField(const Derived & collection) { return collection.NAME.toField(); } \
static void NAME##_setString(Derived & collection, const String & value) { collection.NAME.set(value); } \
Expand All @@ -312,8 +320,9 @@ void SettingsCollection<Derived>::deserialize(ReadBuffer & buf, SettingsBinaryFo
static Field NAME##_valueToCorrespondingType(const Field & value) { TYPE temp{DEFAULT}; temp.set(value); return temp.toField(); } \


#define IMPLEMENT_SETTINGS_COLLECTION_ADD_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION, FLAGS) \
add({StringRef(#NAME, strlen(#NAME)), StringRef(DESCRIPTION, strlen(DESCRIPTION)), \
FLAGS & IGNORABLE, \
[](const Derived & d) { return d.NAME.changed; }, \
&Functions::NAME##_getString, &Functions::NAME##_getField, \
&Functions::NAME##_setString, &Functions::NAME##_setField, \
Expand Down
20 changes: 10 additions & 10 deletions dbms/src/Storages/Kafka/KafkaSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ struct KafkaSettings : public SettingsCollection<KafkaSettings>


#define LIST_OF_KAFKA_SETTINGS(M) \
M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
M(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
M(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
M(SettingString, kafka_format, "", "The message format for Kafka engine.") \
M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \
M(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.") \
M(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block") \
M(SettingUInt64, kafka_commit_every_batch, 0, "Commit every consumed and handled batch instead of a single commit after writing a whole block")
M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.", 0) \
M(SettingString, kafka_topic_list, "", "A list of Kafka topics.", 0) \
M(SettingString, kafka_group_name, "", "A group of Kafka consumers.", 0) \
M(SettingString, kafka_format, "", "The message format for Kafka engine.", 0) \
M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.", 0) \
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.", 0) \
M(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.", 0) \
M(SettingUInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block", 0) \
M(SettingUInt64, kafka_commit_every_batch, 0, "Commit every consumed and handled batch instead of a single commit after writing a whole block", 0)

DECLARE_SETTINGS_COLLECTION(LIST_OF_KAFKA_SETTINGS)

Expand Down
Loading

0 comments on commit e40c140

Please sign in to comment.