From 5794d4a79eb360aa414ba0ccf0c77f84e4c9c318 Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Mon, 25 Mar 2024 11:06:54 +0300 Subject: [PATCH] test: wait replication is finished for replicas Wait until replication is finished if test case is a read request from replica. Part of #412 Part of #415 --- test/helper.lua | 48 +++++++++++++++ .../read_calls_strategies_test.lua | 6 +- test/vshard_helpers/server.lua | 38 +----------- test/vshard_helpers/vclock.lua | 58 +++++++++++++++++++ 4 files changed, 111 insertions(+), 39 deletions(-) create mode 100644 test/vshard_helpers/vclock.lua diff --git a/test/helper.lua b/test/helper.lua index fc7cb6eb..ef1f443b 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -2,6 +2,7 @@ require('strict').on() local t = require('luatest') local vtest = require('test.vshard_helpers.vtest') +local vclock_utils = require('test.vshard_helpers.vclock') local luatest_capture = require('luatest.capture') local luatest_helpers = require('luatest.helpers') @@ -745,6 +746,17 @@ function helpers.start_cluster(g, cartridge_cfg, vshard_cfg, opts) g.cfg = cfg g.cluster = helpers.Cluster:new(cfg) + + for k, server in ipairs(g.cluster.servers) do + local mt = getmetatable(server) + + local extended_mt = table.deepcopy(mt) + extended_mt.__index = vclock_utils.extend_with_vclock_methods(extended_mt.__index) + + g.cluster.servers[k] = setmetatable(server, extended_mt) + end + g.cluster.main_server = g.cluster.servers[1] + g.cluster:start() elseif g.params.backend == helpers.backend.VSHARD then local cfg = table.deepcopy(vshard_cfg) @@ -1110,4 +1122,40 @@ function helpers.merge_tables(t1, t2, ...) return helpers.merge_tables(res, ...) end +function helpers.wait_cluster_replication_finished(g) + if g.params.backend == helpers.backend.CARTRIDGE then + for _, replicaset in ipairs(g.cfg.replicasets) do + local server_names = {} + for _, server in ipairs(replicaset.servers) do + table.insert(server_names, server.alias) + end + + helpers.wait_replicaset_replication_finished(g, server_names) + end + elseif g.params.backend == helpers.backend.VSHARD then + for _, storage_replicaset in pairs(g.cfg.sharding) do + local server_names = {} + for name, _ in pairs(storage_replicaset.replicas) do + table.insert(server_names, name) + end + + helpers.wait_replicaset_replication_finished(g, server_names) + end + end +end + +function helpers.wait_replicaset_replication_finished(g, server_names) + for _, etalon_server_name in ipairs(server_names) do + local etalon_server = g.cluster:server(etalon_server_name) + + for _, current_server_name in ipairs(server_names) do + local current_server = g.cluster:server(current_server_name) + + if current_server ~= etalon_server then + current_server:wait_vclock_of(etalon_server) + end + end + end +end + return helpers diff --git a/test/integration/read_calls_strategies_test.lua b/test/integration/read_calls_strategies_test.lua index 19bb4f4d..29e1275c 100644 --- a/test/integration/read_calls_strategies_test.lua +++ b/test/integration/read_calls_strategies_test.lua @@ -55,16 +55,14 @@ pgroup.before_all(function(g) -- patch vshard.router.call* functions local vshard_call_names = {'callro', 'callbro', 'callre', 'callbre', 'callrw'} g.router:call('patch_vshard_calls', {vshard_call_names}) + + helpers.wait_cluster_replication_finished(g) end) pgroup.after_all(function(g) helpers.stop_cluster(g.cluster, g.params.backend) end) -pgroup.before_each(function(g) - helpers.truncate_space_on_cluster(g.cluster, 'customers') -end) - pgroup.test_get = function(g) g.clear_vshard_calls() local _, err = g.router:call('crud.get', {'customers', 1, { diff --git a/test/vshard_helpers/server.lua b/test/vshard_helpers/server.lua index 1b504f16..c60bc34d 100644 --- a/test/vshard_helpers/server.lua +++ b/test/vshard_helpers/server.lua @@ -6,12 +6,12 @@ local fio = require('fio') local fun = require('fun') local json = require('json') local errno = require('errno') -local log = require('log') -local yaml = require('yaml') local checks = require('checks') local luatest = require('luatest') +local vclock_utils = require('test.vshard_helpers.vclock') + ffi.cdef([[ int kill(pid_t pid, int sig); ]]) @@ -341,38 +341,6 @@ function Server:grep_log(what, bytes, opts) return found end -function Server:get_vclock() - return self:exec(function() return box.info.vclock end) -end - -function Server:wait_vclock(to_vclock) - while true do - local vclock = self:get_vclock() - local ok = true - - for server_id, to_lsn in pairs(to_vclock) do - local lsn = vclock[server_id] - if lsn == nil or lsn < to_lsn then - ok = false - break - end - end - - if ok then - return - end - - log.info("wait vclock: %s to %s", - yaml.encode(vclock), yaml.encode(to_vclock)) - fiber.sleep(0.001) - end -end - -function Server:wait_vclock_of(other_server) - local vclock = other_server:get_vclock() - -- First component is for local changes. - vclock[0] = nil - return self:wait_vclock(vclock) -end +vclock_utils.extend_with_vclock_methods(Server) return Server diff --git a/test/vshard_helpers/vclock.lua b/test/vshard_helpers/vclock.lua new file mode 100644 index 00000000..415389ee --- /dev/null +++ b/test/vshard_helpers/vclock.lua @@ -0,0 +1,58 @@ +local fiber = require('fiber') +local log = require('log') +local yaml = require('yaml') + +-- Simple implementation without metatables. +local function extend_with(t, new_fields) + for k, v in pairs(new_fields) do + t[k] = v + end + + return t +end + +local function get_vclock(self) + return self:exec(function() return box.info.vclock end) +end + +local function wait_vclock(self, to_vclock) + while true do + local vclock = self:get_vclock() + local ok = true + + for server_id, to_lsn in pairs(to_vclock) do + local lsn = vclock[server_id] + if lsn == nil or lsn < to_lsn then + ok = false + break + end + end + + if ok then + return + end + + log.info("wait vclock: %s to %s", + yaml.encode(vclock), yaml.encode(to_vclock)) + fiber.sleep(0.001) + end +end + +local function wait_vclock_of(self, other_server) + local vclock = other_server:get_vclock() + -- First component is for local changes. + vclock[0] = nil + return self:wait_vclock(vclock) +end + +local function extend_with_vclock_methods(server) + return extend_with(server, { + get_vclock = get_vclock, + wait_vclock = wait_vclock, + wait_vclock_of = wait_vclock_of, + }) +end + +return { + extend_with_vclock_methods = extend_with_vclock_methods, +}