diff --git a/crud.lua b/crud.lua index 50f87218..1efa46b6 100644 --- a/crud.lua +++ b/crud.lua @@ -22,6 +22,7 @@ local utils = require('crud.common.utils') local stats = require('crud.stats') local readview = require('crud.readview') local schema = require('crud.schema') +local storage_info = require('crud.storage_info') local crud = {} @@ -145,9 +146,9 @@ crud.stats = stats.get -- @function reset_stats crud.reset_stats = stats.reset --- @refer utils.storage_info +-- @refer storage_info.call -- @function storage_info -crud.storage_info = utils.storage_info +crud.storage_info = storage_info.call -- @refer readview.new -- @function readview @@ -174,8 +175,8 @@ function crud.init_storage() user = utils.get_this_replica_user() or 'guest' end - if rawget(_G, '_crud') == nil then - rawset(_G, '_crud', {}) + if rawget(_G, utils.STORAGE_NAMESPACE) == nil then + rawset(_G, utils.STORAGE_NAMESPACE, {}) end insert.init(user) @@ -195,9 +196,9 @@ function crud.init_storage() sharding_metadata.init(user) readview.init(user) - utils.init_storage_call(user, 'storage_info_on_storage', - utils.storage_info_on_storage - ) + -- Must be initialized last: properly working storage info is the flag + -- of initialization success. + storage_info.init(user) end function crud.init_router() @@ -209,7 +210,7 @@ function crud.stop_router() end function crud.stop_storage() - rawset(_G, '_crud', nil) + rawset(_G, utils.STORAGE_NAMESPACE, nil) end return crud diff --git a/crud/common/utils.lua b/crud/common/utils.lua index 6af07b41..aec3377f 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -20,13 +20,14 @@ local GetSpaceError = errors.new_class('GetSpaceError') local GetSpaceFormatError = errors.new_class('GetSpaceFormatError', {capture_stack = false}) local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false}) local NotInitializedError = errors.new_class('NotInitialized') -local StorageInfoError = errors.new_class('StorageInfoError') local VshardRouterError = errors.new_class('VshardRouterError', {capture_stack = false}) local UtilsInternalError = errors.new_class('UtilsInternalError', {capture_stack = false}) local fiber = require('fiber') local utils = {} +utils.STORAGE_NAMESPACE = '_crud' + --- Returns a full call string for a storage function name. -- -- @param string name a base name of the storage function. @@ -35,11 +36,9 @@ local utils = {} function utils.get_storage_call(name) dev_checks('string') - return '_crud.' .. name + return ('%s.%s'):format(utils.STORAGE_NAMESPACE, name) end -local CRUD_STORAGE_INFO_FUNC_NAME = utils.get_storage_call('storage_info_on_storage') - local space_format_cache = setmetatable({}, {__mode = 'k'}) -- copy from LuaJIT lj_char.c @@ -1131,100 +1130,6 @@ function utils.list_slice(list, start_index, end_index) return slice end ---- Polls replicas for storage state --- --- @function storage_info --- --- @tparam ?number opts.timeout --- Function call timeout --- --- @tparam ?string|table opts.vshard_router --- Cartridge vshard group name or vshard router instance. --- --- @return a table of storage states by replica id. -function utils.storage_info(opts) - opts = opts or {} - - local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router) - if err ~= nil then - return nil, StorageInfoError:new(err) - end - - local replicasets, err = vshard_router:routeall() - if replicasets == nil then - return nil, StorageInfoError:new("Failed to get router replicasets: %s", err.err) - end - - local futures_by_replicas = {} - local replica_state_by_id = {} - local async_opts = {is_async = true} - local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT - - for _, replicaset in pairs(replicasets) do - for replica_id, replica in pairs(replicaset.replicas) do - local master = utils.get_replicaset_master(replicaset, {cached = false}) - - replica_state_by_id[replica_id] = { - status = "error", - is_master = master == replica - } - - local ok, res = pcall(replica.conn.call, replica.conn, CRUD_STORAGE_INFO_FUNC_NAME, - {}, async_opts) - if ok then - futures_by_replicas[replica_id] = res - else - local err_msg = string.format("Error getting storage info for %s", replica_id) - if res ~= nil then - log.error("%s: %s", err_msg, res) - replica_state_by_id[replica_id].message = tostring(res) - else - log.error(err_msg) - replica_state_by_id[replica_id].message = err_msg - end - end - end - end - - local deadline = fiber.clock() + timeout - for replica_id, future in pairs(futures_by_replicas) do - local wait_timeout = deadline - fiber.clock() - if wait_timeout < 0 then - wait_timeout = 0 - end - - local result, err = future:wait_result(wait_timeout) - if result == nil then - future:discard() - local err_msg = string.format("Error getting storage info for %s", replica_id) - if err ~= nil then - if err.type == 'ClientError' and err.code == box.error.NO_SUCH_PROC then - replica_state_by_id[replica_id].status = "uninitialized" - else - log.error("%s: %s", err_msg, err) - replica_state_by_id[replica_id].message = tostring(err) - end - else - log.error(err_msg) - replica_state_by_id[replica_id].message = err_msg - end - else - replica_state_by_id[replica_id].status = result[1].status or "uninitialized" - end - end - - return replica_state_by_id -end - ---- Storage status information. --- --- @function storage_info_on_storage --- --- @return a table with storage status. -function utils.storage_info_on_storage() - return {status = "running"} -end - --- Initializes a storage function by its name. -- -- It adds the function into the global scope by its name and required @@ -1240,7 +1145,7 @@ end function utils.init_storage_call(user, name, func) dev_checks('?string', 'string', 'function') - rawset(_G['_crud'], name, func) + rawset(_G[utils.STORAGE_NAMESPACE], name, func) if user ~= nil then name = utils.get_storage_call(name) diff --git a/crud/storage_info.lua b/crud/storage_info.lua new file mode 100644 index 00000000..839487a1 --- /dev/null +++ b/crud/storage_info.lua @@ -0,0 +1,119 @@ +local checks = require('checks') +local errors = require('errors') +local fiber = require('fiber') +local log = require('log') + +local const = require('crud.common.const') +local utils = require('crud.common.utils') + +local StorageInfoError = errors.new_class('StorageInfoError') + +local storage_info = {} + +local STORAGE_INFO_FUNC_NAME = 'storage_info_on_storage' +local CRUD_STORAGE_INFO_FUNC_NAME = utils.get_storage_call(STORAGE_INFO_FUNC_NAME) + +--- Storage status information. +-- +-- @function storage_info_on_storage +-- +-- @return a table with storage status. +local function storage_info_on_storage() + return {status = "running"} +end + +function storage_info.init(user) + utils.init_storage_call(user, STORAGE_INFO_FUNC_NAME, storage_info_on_storage) +end + +--- Polls replicas for storage state +-- +-- @function call +-- +-- @tparam ?number opts.timeout +-- Function call timeout +-- +-- @tparam ?string|table opts.vshard_router +-- Cartridge vshard group name or vshard router instance. +-- +-- @return a table of storage states by replica id. +function storage_info.call(opts) + checks({ + timeout = '?number', + vshard_router = '?string|table', + }) + + opts = opts or {} + + local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router) + if err ~= nil then + return nil, StorageInfoError:new(err) + end + + local replicasets, err = vshard_router:routeall() + if replicasets == nil then + return nil, StorageInfoError:new("Failed to get router replicasets: %s", err.err) + end + + local futures_by_replicas = {} + local replica_state_by_id = {} + local async_opts = {is_async = true} + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT + + for _, replicaset in pairs(replicasets) do + for replica_id, replica in pairs(replicaset.replicas) do + local master = utils.get_replicaset_master(replicaset, {cached = false}) + + replica_state_by_id[replica_id] = { + status = "error", + is_master = master == replica + } + + local ok, res = pcall(replica.conn.call, replica.conn, CRUD_STORAGE_INFO_FUNC_NAME, + {}, async_opts) + if ok then + futures_by_replicas[replica_id] = res + else + local err_msg = string.format("Error getting storage info for %s", replica_id) + if res ~= nil then + log.error("%s: %s", err_msg, res) + replica_state_by_id[replica_id].message = tostring(res) + else + log.error(err_msg) + replica_state_by_id[replica_id].message = err_msg + end + end + end + end + + local deadline = fiber.clock() + timeout + for replica_id, future in pairs(futures_by_replicas) do + local wait_timeout = deadline - fiber.clock() + if wait_timeout < 0 then + wait_timeout = 0 + end + + local result, err = future:wait_result(wait_timeout) + if result == nil then + future:discard() + local err_msg = string.format("Error getting storage info for %s", replica_id) + if err ~= nil then + if err.type == 'ClientError' and err.code == box.error.NO_SUCH_PROC then + replica_state_by_id[replica_id].status = "uninitialized" + else + log.error("%s: %s", err_msg, err) + replica_state_by_id[replica_id].message = tostring(err) + end + else + log.error(err_msg) + replica_state_by_id[replica_id].message = err_msg + end + else + replica_state_by_id[replica_id].status = result[1].status or "uninitialized" + end + end + + return replica_state_by_id +end + +return storage_info