Skip to content

Commit

Permalink
utils: move storage info to a separate file
Browse files Browse the repository at this point in the history
This is the preliminary refactoring before solving the original issue.

Part of #412
Part of #415
  • Loading branch information
DifferentialOrange committed Jan 22, 2024
1 parent 5ebc3c5 commit cb041bb
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 107 deletions.
17 changes: 9 additions & 8 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ local utils = require('crud.common.utils')
local stats = require('crud.stats')
local readview = require('crud.readview')
local schema = require('crud.schema')
local storage_info = require('crud.storage_info')

local crud = {}

Expand Down Expand Up @@ -145,9 +146,9 @@ crud.stats = stats.get
-- @function reset_stats
crud.reset_stats = stats.reset

-- @refer utils.storage_info
-- @refer storage_info.call
-- @function storage_info
crud.storage_info = utils.storage_info
crud.storage_info = storage_info.call

-- @refer readview.new
-- @function readview
Expand All @@ -174,8 +175,8 @@ function crud.init_storage()
user = utils.get_this_replica_user() or 'guest'
end

if rawget(_G, '_crud') == nil then
rawset(_G, '_crud', {})
if rawget(_G, utils.STORAGE_NAMESPACE) == nil then
rawset(_G, utils.STORAGE_NAMESPACE, {})
end

insert.init(user)
Expand All @@ -195,9 +196,9 @@ function crud.init_storage()
sharding_metadata.init(user)
readview.init(user)

utils.init_storage_call(user, 'storage_info_on_storage',
utils.storage_info_on_storage
)
-- Must be initialized last: properly working storage info is the flag
-- of initialization success.
storage_info.init(user)
end

function crud.init_router()
Expand All @@ -209,7 +210,7 @@ function crud.stop_router()
end

function crud.stop_storage()
rawset(_G, '_crud', nil)
rawset(_G, utils.STORAGE_NAMESPACE, nil)
end

return crud
103 changes: 4 additions & 99 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ local GetSpaceError = errors.new_class('GetSpaceError')
local GetSpaceFormatError = errors.new_class('GetSpaceFormatError', {capture_stack = false})
local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false})
local NotInitializedError = errors.new_class('NotInitialized')
local StorageInfoError = errors.new_class('StorageInfoError')
local VshardRouterError = errors.new_class('VshardRouterError', {capture_stack = false})
local UtilsInternalError = errors.new_class('UtilsInternalError', {capture_stack = false})
local fiber = require('fiber')

local utils = {}

utils.STORAGE_NAMESPACE = '_crud'

--- Returns a full call string for a storage function name.
--
-- @param string name a base name of the storage function.
Expand All @@ -35,11 +36,9 @@ local utils = {}
function utils.get_storage_call(name)
dev_checks('string')

return '_crud.' .. name
return ('%s.%s'):format(utils.STORAGE_NAMESPACE, name)
end

local CRUD_STORAGE_INFO_FUNC_NAME = utils.get_storage_call('storage_info_on_storage')

local space_format_cache = setmetatable({}, {__mode = 'k'})

-- copy from LuaJIT lj_char.c
Expand Down Expand Up @@ -1131,100 +1130,6 @@ function utils.list_slice(list, start_index, end_index)
return slice
end

--- Polls replicas for storage state
--
-- @function storage_info
--
-- @tparam ?number opts.timeout
-- Function call timeout
--
-- @tparam ?string|table opts.vshard_router
-- Cartridge vshard group name or vshard router instance.
--
-- @return a table of storage states by replica id.
function utils.storage_info(opts)
opts = opts or {}

local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
if err ~= nil then
return nil, StorageInfoError:new(err)
end

local replicasets, err = vshard_router:routeall()
if replicasets == nil then
return nil, StorageInfoError:new("Failed to get router replicasets: %s", err.err)
end

local futures_by_replicas = {}
local replica_state_by_id = {}
local async_opts = {is_async = true}
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

for _, replicaset in pairs(replicasets) do
for replica_id, replica in pairs(replicaset.replicas) do
local master = utils.get_replicaset_master(replicaset, {cached = false})

replica_state_by_id[replica_id] = {
status = "error",
is_master = master == replica
}

local ok, res = pcall(replica.conn.call, replica.conn, CRUD_STORAGE_INFO_FUNC_NAME,
{}, async_opts)
if ok then
futures_by_replicas[replica_id] = res
else
local err_msg = string.format("Error getting storage info for %s", replica_id)
if res ~= nil then
log.error("%s: %s", err_msg, res)
replica_state_by_id[replica_id].message = tostring(res)
else
log.error(err_msg)
replica_state_by_id[replica_id].message = err_msg
end
end
end
end

local deadline = fiber.clock() + timeout
for replica_id, future in pairs(futures_by_replicas) do
local wait_timeout = deadline - fiber.clock()
if wait_timeout < 0 then
wait_timeout = 0
end

local result, err = future:wait_result(wait_timeout)
if result == nil then
future:discard()
local err_msg = string.format("Error getting storage info for %s", replica_id)
if err ~= nil then
if err.type == 'ClientError' and err.code == box.error.NO_SUCH_PROC then
replica_state_by_id[replica_id].status = "uninitialized"
else
log.error("%s: %s", err_msg, err)
replica_state_by_id[replica_id].message = tostring(err)
end
else
log.error(err_msg)
replica_state_by_id[replica_id].message = err_msg
end
else
replica_state_by_id[replica_id].status = result[1].status or "uninitialized"
end
end

return replica_state_by_id
end

--- Storage status information.
--
-- @function storage_info_on_storage
--
-- @return a table with storage status.
function utils.storage_info_on_storage()
return {status = "running"}
end

--- Initializes a storage function by its name.
--
-- It adds the function into the global scope by its name and required
Expand All @@ -1240,7 +1145,7 @@ end
function utils.init_storage_call(user, name, func)
dev_checks('?string', 'string', 'function')

rawset(_G['_crud'], name, func)
rawset(_G[utils.STORAGE_NAMESPACE], name, func)

if user ~= nil then
name = utils.get_storage_call(name)
Expand Down
119 changes: 119 additions & 0 deletions crud/storage_info.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
local checks = require('checks')
local errors = require('errors')
local fiber = require('fiber')
local log = require('log')

local const = require('crud.common.const')
local utils = require('crud.common.utils')

local StorageInfoError = errors.new_class('StorageInfoError')

local storage_info = {}

local STORAGE_INFO_FUNC_NAME = 'storage_info_on_storage'
local CRUD_STORAGE_INFO_FUNC_NAME = utils.get_storage_call(STORAGE_INFO_FUNC_NAME)

--- Storage status information.
--
-- @function storage_info_on_storage
--
-- @return a table with storage status.
local function storage_info_on_storage()
return {status = "running"}
end

function storage_info.init(user)
utils.init_storage_call(user, STORAGE_INFO_FUNC_NAME, storage_info_on_storage)
end

--- Polls replicas for storage state
--
-- @function call
--
-- @tparam ?number opts.timeout
-- Function call timeout
--
-- @tparam ?string|table opts.vshard_router
-- Cartridge vshard group name or vshard router instance.
--
-- @return a table of storage states by replica id.
function storage_info.call(opts)
checks({
timeout = '?number',
vshard_router = '?string|table',
})

opts = opts or {}

local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
if err ~= nil then
return nil, StorageInfoError:new(err)
end

local replicasets, err = vshard_router:routeall()
if replicasets == nil then
return nil, StorageInfoError:new("Failed to get router replicasets: %s", err.err)
end

local futures_by_replicas = {}
local replica_state_by_id = {}
local async_opts = {is_async = true}
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

for _, replicaset in pairs(replicasets) do
for replica_id, replica in pairs(replicaset.replicas) do
local master = utils.get_replicaset_master(replicaset, {cached = false})

replica_state_by_id[replica_id] = {
status = "error",
is_master = master == replica
}

local ok, res = pcall(replica.conn.call, replica.conn, CRUD_STORAGE_INFO_FUNC_NAME,
{}, async_opts)
if ok then
futures_by_replicas[replica_id] = res
else
local err_msg = string.format("Error getting storage info for %s", replica_id)
if res ~= nil then
log.error("%s: %s", err_msg, res)
replica_state_by_id[replica_id].message = tostring(res)
else
log.error(err_msg)
replica_state_by_id[replica_id].message = err_msg
end
end
end
end

local deadline = fiber.clock() + timeout
for replica_id, future in pairs(futures_by_replicas) do
local wait_timeout = deadline - fiber.clock()
if wait_timeout < 0 then
wait_timeout = 0
end

local result, err = future:wait_result(wait_timeout)
if result == nil then
future:discard()
local err_msg = string.format("Error getting storage info for %s", replica_id)
if err ~= nil then
if err.type == 'ClientError' and err.code == box.error.NO_SUCH_PROC then
replica_state_by_id[replica_id].status = "uninitialized"
else
log.error("%s: %s", err_msg, err)
replica_state_by_id[replica_id].message = tostring(err)
end
else
log.error(err_msg)
replica_state_by_id[replica_id].message = err_msg
end
else
replica_state_by_id[replica_id].status = result[1].status or "uninitialized"
end
end

return replica_state_by_id
end

return storage_info

0 comments on commit cb041bb

Please sign in to comment.