From 578c356a680c1d9a9f279d8d3f49bdda137181fb Mon Sep 17 00:00:00 2001 From: chejinge <945997690@qq.com> Date: Wed, 3 Apr 2024 13:36:05 +0800 Subject: [PATCH] fix:some conf load error (#2561) * fix:slotmigrate failed when some keys migrate failed --------- Co-authored-by: chejinge --- include/pika_conf.h | 7 +++- src/pika_admin.cc | 29 ++++++++----- src/pika_conf.cc | 15 ++++--- tests/integration/server_test.go | 71 ++++++++++++++++++++++++++++++++ 4 files changed, 102 insertions(+), 20 deletions(-) diff --git a/include/pika_conf.h b/include/pika_conf.h index ff51ef993..7006d6a9e 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -492,16 +492,19 @@ class PikaConf : public pstd::BaseConf { TryPushDiffCommands("masterauth", value); masterauth_ = value; } - void SetSlotMigrate(const std::string& value) { + void SetSlotMigrate(const bool value) { std::lock_guard l(rwlock_); - slotmigrate_ = (value == "yes"); + TryPushDiffCommands("slotmigrate", value ? "yes" : "no"); + slotmigrate_.store(value); } void SetSlotMigrateThreadNum(const int value) { std::lock_guard l(rwlock_); + TryPushDiffCommands("slotmigrate-thread-num", std::to_string(value)); slotmigrate_thread_num_ = value; } void SetThreadMigrateKeysNum(const int value) { std::lock_guard l(rwlock_); + TryPushDiffCommands("thread-migrate-keys-num", std::to_string(value)); thread_migrate_keys_num_ = value; } void SetExpireLogsNums(const int value) { diff --git a/src/pika_admin.cc b/src/pika_admin.cc index d0c3ff754..8cc10ad86 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1682,13 +1682,13 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeString(&config_body, g_pika_conf->slotmigrate() ? "yes" : "no"); } - if (pstd::stringmatch(pattern.data(), "slotmigrate-thread-num", 1)) { + if (pstd::stringmatch(pattern.data(), "slotmigrate-thread-num", 1)!= 0) { elements += 2; EncodeString(&config_body, "slotmigrate-thread-num"); EncodeNumber(&config_body, g_pika_conf->slotmigrate_thread_num()); } - if (pstd::stringmatch(pattern.data(), "thread-migrate-keys-num", 1)) { + if (pstd::stringmatch(pattern.data(), "thread-migrate-keys-num", 1)!= 0) { elements += 2; EncodeString(&config_body, "thread-migrate-keys-num"); EncodeNumber(&config_body, g_pika_conf->thread_migrate_keys_num()); @@ -2231,9 +2231,6 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { } else if (set_item == "masterauth") { g_pika_conf->SetMasterAuth(value); res_.AppendStringRaw("+OK\r\n"); - } else if (set_item == "slotmigrate") { - g_pika_conf->SetSlotMigrate(value); - res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "dump-prefix") { g_pika_conf->SetBgsavePrefix(value); res_.AppendStringRaw("+OK\r\n"); @@ -2282,19 +2279,19 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "slotmigrate-thread-num") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) { - res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'expire-logs-nums'\r\n"); + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slotmigrate-thread-num'\r\n"); return; } - long int migrate_thread_num = (0 > ival || 24 < ival) ? 8 : ival; - g_pika_conf->SetSlotMigrateThreadNum(static_cast(ival)); + long int migrate_thread_num = (1 > ival || 24 < ival) ? 8 : ival; + g_pika_conf->SetSlotMigrateThreadNum(migrate_thread_num); res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "thread-migrate-keys-num") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) { - res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'expire-logs-nums'\r\n"); + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'thread-migrate-keys-num'\r\n"); return; } long int thread_migrate_keys_num = (8 > ival || 128 < ival) ? 64 : ival; - g_pika_conf->SetThreadMigrateKeysNum(static_cast(ival)); + g_pika_conf->SetThreadMigrateKeysNum(thread_migrate_keys_num); res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "slowlog-write-errorlog") { bool is_write_errorlog; @@ -2308,6 +2305,18 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { } g_pika_conf->SetSlowlogWriteErrorlog(is_write_errorlog); res_.AppendStringRaw("+OK\r\n"); + } else if (set_item == "slotmigrate") { + bool slotmigrate; + if (value == "yes") { + slotmigrate = true; + } else if (value == "no") { + slotmigrate = false; + } else { + res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slotmigrate'\r\n"); + return; + } + g_pika_conf->SetSlotMigrate(slotmigrate); + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "slowlog-log-slower-than") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n"); diff --git a/src/pika_conf.cc b/src/pika_conf.cc index d17d3ebef..ee2f539b0 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -67,9 +67,9 @@ int PikaConf::Load() { slowlog_write_errorlog_.store(swe == "yes" ? true : false); // slot migrate - std::string smgrt = "no"; + std::string smgrt; GetConfStr("slotmigrate", &smgrt); - slotmigrate_ = (smgrt == "yes") ? true : false; + slotmigrate_.store(smgrt == "yes" ? true : false); int binlog_writer_num = 1; GetConfInt("binlog-writer-num", &binlog_writer_num); @@ -280,13 +280,13 @@ int PikaConf::Load() { // arena_block_size GetConfInt64Human("slotmigrate-thread-num", &slotmigrate_thread_num_); - if (slotmigrate_thread_num_ < 0 || slotmigrate_thread_num_ > 24) { + if (slotmigrate_thread_num_ < 1 || slotmigrate_thread_num_ > 24) { slotmigrate_thread_num_ = 8; // 1/8 of the write_buffer_size_ } // arena_block_size GetConfInt64Human("thread-migrate-keys-num", &thread_migrate_keys_num_); - if (thread_migrate_keys_num_ < 64 || thread_migrate_keys_num_ > 128) { + if (thread_migrate_keys_num_ < 8 || thread_migrate_keys_num_ > 128) { thread_migrate_keys_num_ = 64; // 1/8 of the write_buffer_size_ } @@ -687,13 +687,12 @@ int PikaConf::ConfigRewrite() { SetConfInt("max-write-buffer-num", max_write_buffer_num_); SetConfInt64("write-buffer-size", write_buffer_size_); SetConfInt64("arena-block-size", arena_block_size_); - SetConfInt64("slotmigrate", slotmigrate_); + SetConfStr("slotmigrate", slotmigrate_.load() ? "yes" : "no"); + SetConfInt64("slotmigrate-thread-num", slotmigrate_thread_num_); + SetConfInt64("thread-migrate-keys-num", thread_migrate_keys_num_); // slaveof config item is special SetConfStr("slaveof", slaveof_); // cache config - SetConfStr("share-block-cache", share_block_cache_ ? "yes" : "no"); - SetConfInt("block-size", block_size_); - SetConfInt("block-cache", block_cache_); SetConfStr("cache-index-and-filter-blocks", cache_index_and_filter_blocks_ ? "yes" : "no"); SetConfInt("cache-model", cache_model_); SetConfInt("zset-cache-start-direction", zset_cache_start_direction_); diff --git a/tests/integration/server_test.go b/tests/integration/server_test.go index c836ff1c5..b07b98cc9 100644 --- a/tests/integration/server_test.go +++ b/tests/integration/server_test.go @@ -333,12 +333,83 @@ var _ = Describe("Server", func() { //Expect(configSet.Val()).To(Equal("OK")) }) + It("should ConfigGet", func() { + configGet := client.ConfigGet(ctx, "slotmigrate") + Expect(configGet.Err()).NotTo(HaveOccurred()) + Expect(configGet.Val()).To(Equal(map[string]string{"slotmigrate": "no"})) + }) + + It("should ConfigSet", func() { + configSet := client.ConfigSet(ctx, "slotmigrate", "yes") + Expect(configSet.Err()).NotTo(HaveOccurred()) + Expect(configSet.Val()).To(Equal("OK")) + }) + + It("should ConfigGet", func() { + configGet1 := client.ConfigGet(ctx, "slotmigrate-thread-num") + Expect(configGet1.Err()).NotTo(HaveOccurred()) + Expect(configGet1.Val()).NotTo(Equal("0")) + }) + + It("should ConfigGet", func() { + configGet2 := client.ConfigGet(ctx, "thread-migrate-keys-num") + Expect(configGet2.Err()).NotTo(HaveOccurred()) + Expect(configGet2.Val()).NotTo(Equal("0")) + }) + + It("should ConfigSet", func() { + configSet1 := client.ConfigSet(ctx, "slotmigrate-thread-num", "4") + Expect(configSet1.Err()).NotTo(HaveOccurred()) + Expect(configSet1.Val()).To(Equal("OK")) + }) + + It("should ConfigSet", func() { + configSet2 := client.ConfigSet(ctx, "thread-migrate-keys-num", "64") + Expect(configSet2.Err()).NotTo(HaveOccurred()) + Expect(configSet2.Val()).To(Equal("OK")) + }) + + It("should ConfigGet", func() { + configGet2 := client.ConfigGet(ctx, "block-cache") + Expect(configGet2.Err()).NotTo(HaveOccurred()) + Expect(configGet2.Val()).NotTo(Equal("0")) + }) + It("should ConfigRewrite", func() { configRewrite := client.ConfigRewrite(ctx) Expect(configRewrite.Err()).NotTo(HaveOccurred()) Expect(configRewrite.Val()).To(Equal("OK")) }) + It("should ConfigGet", func() { + configGet3 := client.ConfigGet(ctx, "block-cache") + Expect(configGet3.Err()).NotTo(HaveOccurred()) + Expect(configGet3.Val()).To(Equal(map[string]string{"block-cache": "8388608"})) + }) + + It("should ConfigGet", func() { + configGet4 := client.ConfigGet(ctx, "slotmigrate-thread-num") + Expect(configGet4.Err()).NotTo(HaveOccurred()) + Expect(configGet4.Val()).To(Equal(map[string]string{"slotmigrate-thread-num": "4"})) + }) + + It("should ConfigGet", func() { + configGet5 := client.ConfigGet(ctx, "thread-migrate-keys-num") + Expect(configGet5.Err()).NotTo(HaveOccurred()) + Expect(configGet5.Val()).To(Equal(map[string]string{"thread-migrate-keys-num": "64"})) + }) + + It("should ConfigSet", func() { + configSet := client.ConfigSet(ctx, "slotmigrate", "no") + Expect(configSet.Err()).NotTo(HaveOccurred()) + Expect(configSet.Val()).To(Equal("OK")) + }) + + It("should ConfigRewrite", func() { + configRewrite := client.ConfigRewrite(ctx) + Expect(configRewrite.Err()).NotTo(HaveOccurred()) + Expect(configRewrite.Val()).To(Equal("OK")) + }) //It("should DBSize", func() { // Expect(client.Set(ctx, "key", "value", 0).Val()).To(Equal("OK")) // Expect(client.Do(ctx, "info", "keyspace", "1").Err()).NotTo(HaveOccurred())