Skip to content

Commit

Permalink
test: wait replication is finished for replicas
Browse files Browse the repository at this point in the history
Wait until replication is finished if test case is a read request from
replica.

Part of #412
Part of #415
  • Loading branch information
DifferentialOrange committed Apr 16, 2024
1 parent 8c8b4dc commit 5794d4a
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 39 deletions.
48 changes: 48 additions & 0 deletions test/helper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ require('strict').on()

local t = require('luatest')
local vtest = require('test.vshard_helpers.vtest')
local vclock_utils = require('test.vshard_helpers.vclock')

local luatest_capture = require('luatest.capture')
local luatest_helpers = require('luatest.helpers')
Expand Down Expand Up @@ -745,6 +746,17 @@ function helpers.start_cluster(g, cartridge_cfg, vshard_cfg, opts)

g.cfg = cfg
g.cluster = helpers.Cluster:new(cfg)

for k, server in ipairs(g.cluster.servers) do
local mt = getmetatable(server)

local extended_mt = table.deepcopy(mt)
extended_mt.__index = vclock_utils.extend_with_vclock_methods(extended_mt.__index)

g.cluster.servers[k] = setmetatable(server, extended_mt)
end
g.cluster.main_server = g.cluster.servers[1]

g.cluster:start()
elseif g.params.backend == helpers.backend.VSHARD then
local cfg = table.deepcopy(vshard_cfg)
Expand Down Expand Up @@ -1110,4 +1122,40 @@ function helpers.merge_tables(t1, t2, ...)
return helpers.merge_tables(res, ...)
end

function helpers.wait_cluster_replication_finished(g)
if g.params.backend == helpers.backend.CARTRIDGE then
for _, replicaset in ipairs(g.cfg.replicasets) do
local server_names = {}
for _, server in ipairs(replicaset.servers) do
table.insert(server_names, server.alias)
end

helpers.wait_replicaset_replication_finished(g, server_names)
end
elseif g.params.backend == helpers.backend.VSHARD then
for _, storage_replicaset in pairs(g.cfg.sharding) do
local server_names = {}
for name, _ in pairs(storage_replicaset.replicas) do
table.insert(server_names, name)
end

helpers.wait_replicaset_replication_finished(g, server_names)
end
end
end

function helpers.wait_replicaset_replication_finished(g, server_names)
for _, etalon_server_name in ipairs(server_names) do
local etalon_server = g.cluster:server(etalon_server_name)

for _, current_server_name in ipairs(server_names) do
local current_server = g.cluster:server(current_server_name)

if current_server ~= etalon_server then
current_server:wait_vclock_of(etalon_server)
end
end
end
end

return helpers
6 changes: 2 additions & 4 deletions test/integration/read_calls_strategies_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,14 @@ pgroup.before_all(function(g)
-- patch vshard.router.call* functions
local vshard_call_names = {'callro', 'callbro', 'callre', 'callbre', 'callrw'}
g.router:call('patch_vshard_calls', {vshard_call_names})

helpers.wait_cluster_replication_finished(g)
end)

pgroup.after_all(function(g)
helpers.stop_cluster(g.cluster, g.params.backend)
end)

pgroup.before_each(function(g)
helpers.truncate_space_on_cluster(g.cluster, 'customers')
end)

pgroup.test_get = function(g)
g.clear_vshard_calls()
local _, err = g.router:call('crud.get', {'customers', 1, {
Expand Down
38 changes: 3 additions & 35 deletions test/vshard_helpers/server.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ local fio = require('fio')
local fun = require('fun')
local json = require('json')
local errno = require('errno')
local log = require('log')
local yaml = require('yaml')

local checks = require('checks')
local luatest = require('luatest')

local vclock_utils = require('test.vshard_helpers.vclock')

ffi.cdef([[
int kill(pid_t pid, int sig);
]])
Expand Down Expand Up @@ -341,38 +341,6 @@ function Server:grep_log(what, bytes, opts)
return found
end

function Server:get_vclock()
return self:exec(function() return box.info.vclock end)
end

function Server:wait_vclock(to_vclock)
while true do
local vclock = self:get_vclock()
local ok = true

for server_id, to_lsn in pairs(to_vclock) do
local lsn = vclock[server_id]
if lsn == nil or lsn < to_lsn then
ok = false
break
end
end

if ok then
return
end

log.info("wait vclock: %s to %s",
yaml.encode(vclock), yaml.encode(to_vclock))
fiber.sleep(0.001)
end
end

function Server:wait_vclock_of(other_server)
local vclock = other_server:get_vclock()
-- First component is for local changes.
vclock[0] = nil
return self:wait_vclock(vclock)
end
vclock_utils.extend_with_vclock_methods(Server)

return Server
58 changes: 58 additions & 0 deletions test/vshard_helpers/vclock.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
local fiber = require('fiber')
local log = require('log')
local yaml = require('yaml')

-- Simple implementation without metatables.
local function extend_with(t, new_fields)
for k, v in pairs(new_fields) do
t[k] = v
end

return t
end

local function get_vclock(self)
return self:exec(function() return box.info.vclock end)
end

local function wait_vclock(self, to_vclock)
while true do
local vclock = self:get_vclock()
local ok = true

for server_id, to_lsn in pairs(to_vclock) do
local lsn = vclock[server_id]
if lsn == nil or lsn < to_lsn then
ok = false
break
end
end

if ok then
return
end

log.info("wait vclock: %s to %s",
yaml.encode(vclock), yaml.encode(to_vclock))
fiber.sleep(0.001)
end
end

local function wait_vclock_of(self, other_server)
local vclock = other_server:get_vclock()
-- First component is for local changes.
vclock[0] = nil
return self:wait_vclock(vclock)
end

local function extend_with_vclock_methods(server)
return extend_with(server, {
get_vclock = get_vclock,
wait_vclock = wait_vclock,
wait_vclock_of = wait_vclock_of,
})
end

return {
extend_with_vclock_methods = extend_with_vclock_methods,
}

0 comments on commit 5794d4a

Please sign in to comment.