diff --git a/README.md b/README.md index 6a419017..9396b207 100644 --- a/README.md +++ b/README.md @@ -753,6 +753,23 @@ returns. `count` is total requests count since instance start or stats restart. `latency` is average time of requests execution, `time` is the total time of requests execution. +`select` section additionally contains `details` collectors. +```lua +crud.stats('my_space').select.details +--- +- map_reduces: 4 + tuples_fetched: 10500 + tuples_lookup: 238000 +... +``` +`map_reduces` is the count of planned map reduces (including those not +executed successfully). `tuples_fetched` is the count of tuples fetched +from storages during execution, `tuples_lookup` is the count of tuples +looked up on storages while collecting responses for calls (including +scrolls for multibatch requests). Details data is updated as part of +the request process, so you may get new details before `select`/`pairs` +call is finished and observed with count, latency and time collectors. + Since `pairs` request behavior differs from any other crud request, its statistics collection also has specific behavior. Statistics (`select` section) are updated after `pairs` cycle is finished: you diff --git a/crud/select.lua b/crud/select.lua index a633f86b..b0d1ef9b 100644 --- a/crud/select.lua +++ b/crud/select.lua @@ -59,7 +59,7 @@ local function select_on_storage(space_name, index_id, conditions, opts) end -- execute select - local tuples, err = select_executor.execute(space, index, filter_func, { + local resp, err = select_executor.execute(space, index, filter_func, { scan_value = opts.scan_value, after_tuple = opts.after_tuple, tarantool_iter = opts.tarantool_iter, @@ -70,15 +70,20 @@ local function select_on_storage(space_name, index_id, conditions, opts) end local cursor - if #tuples < opts.limit or opts.limit == 0 then + if resp.tuples_fetched < opts.limit or opts.limit == 0 then cursor = {is_end = true} else - cursor = make_cursor(tuples) + cursor = make_cursor(resp.tuples) end + cursor.stats = { + tuples_lookup = resp.tuples_lookup, + tuples_fetched = resp.tuples_fetched, + } + -- getting tuples with user defined fields (if `fields` option is specified) -- and fields that are needed for comparison on router (primary key + scan key) - return cursor, schema.filter_tuples_fields(tuples, opts.field_names) + return cursor, schema.filter_tuples_fields(resp.tuples, opts.field_names) end function select_module.init() diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index 1984a87a..a05e7a93 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -8,6 +8,7 @@ local dev_checks = require('crud.common.dev_checks') local common = require('crud.select.compat.common') local schema = require('crud.common.schema') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') +local stats = require('crud.stats') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.compare.plan') @@ -115,6 +116,8 @@ local function build_select_iterator(space_name, user_conditions, opts) if err ~= nil then return nil, err, true end + else + stats.update_map_reduces(space_name) end local tuples_limit = opts.first diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index 1cf88744..a4d79e85 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -9,6 +9,7 @@ local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') +local stats = require('crud.stats') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.compare.plan') @@ -59,6 +60,14 @@ local function select_iteration(space_name, plan, opts) local tuples = {} for replicaset_uuid, replicaset_results in pairs(results) do + -- Stats extracted with callback here and not passed + -- outside to wrapper because fetch for pairs can be + -- called even after pairs() return from generators. + local cursor = replicaset_results[1] + if cursor.stats ~= nil then + stats.update_fetch_stats(cursor.stats, space_name) + end + tuples[replicaset_uuid] = replicaset_results[2] end @@ -141,6 +150,8 @@ local function build_select_iterator(space_name, user_conditions, opts) if err ~= nil then return nil, err, true end + else + stats.update_map_reduces(space_name) end -- generate tuples comparator diff --git a/crud/select/executor.lua b/crud/select/executor.lua index 6d6f7483..10309be2 100644 --- a/crud/select/executor.lua +++ b/crud/select/executor.lua @@ -1,4 +1,5 @@ local errors = require('errors') +local fun = require('fun') local dev_checks = require('crud.common.dev_checks') local select_comparators = require('crud.compare.comparators') @@ -68,13 +69,12 @@ function executor.execute(space, index, filter_func, opts) opts = opts or {} + local resp = { tuples_fetched = 0, tuples_lookup = 0, tuples = {} } + if opts.limit == 0 then - return {} + return resp end - local tuples = {} - local tuples_count = 0 - local value = opts.scan_value if opts.after_tuple ~= nil then local new_value = generate_value(opts.after_tuple, opts.scan_value, index.parts, opts.tarantool_iter) @@ -84,7 +84,16 @@ function executor.execute(space, index, filter_func, opts) end local tuple - local gen = index:pairs(value, {iterator = opts.tarantool_iter}) + local raw_gen, param, state = index:pairs(value, {iterator = opts.tarantool_iter}) + local gen = fun.wrap(function(param, state) + local next_state, var = raw_gen(param, state) + + if var ~= nil then + resp.tuples_lookup = resp.tuples_lookup + 1 + end + + return next_state, var + end, param, state) if opts.after_tuple ~= nil then local err @@ -94,7 +103,7 @@ function executor.execute(space, index, filter_func, opts) end if tuple == nil then - return {} + return resp end end @@ -110,10 +119,10 @@ function executor.execute(space, index, filter_func, opts) local matched, early_exit = filter_func(tuple) if matched then - table.insert(tuples, tuple) - tuples_count = tuples_count + 1 + table.insert(resp.tuples, tuple) + resp.tuples_fetched = resp.tuples_fetched + 1 - if opts.limit ~= nil and tuples_count >= opts.limit then + if opts.limit ~= nil and resp.tuples_fetched >= opts.limit then break end elseif early_exit then @@ -123,7 +132,7 @@ function executor.execute(space, index, filter_func, opts) gen.state, tuple = gen(gen.param, gen.state) end - return tuples + return resp end return executor diff --git a/crud/select/merger.lua b/crud/select/merger.lua index fa443b84..e3c1bdf4 100644 --- a/crud/select/merger.lua +++ b/crud/select/merger.lua @@ -7,6 +7,7 @@ local compat = require('crud.common.compat') local merger_lib = compat.require('tuple.merger', 'merger') local Keydef = require('crud.compare.keydef') +local stats = require('crud.stats') local function bswap_u16(num) return bit.rshift(bit.bswap(tonumber(num)), 16) @@ -93,6 +94,7 @@ local function fetch_chunk(context, state) local replicaset = context.replicaset local vshard_call_name = context.vshard_call_name local timeout = context.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local space_name = context.space_name local future = state.future -- The source was entirely drained. @@ -109,6 +111,14 @@ local function fetch_chunk(context, state) -- Decode metainfo, leave data to be processed by the merger. local cursor = decode_metainfo(buf) + -- Extract stats info. + -- Stats extracted with callback here and not passed + -- outside to wrapper because fetch for pairs can be + -- called even after pairs() return from generators. + if cursor.stats ~= nil then + stats.update_fetch_stats(cursor.stats, space_name) + end + -- Check whether we need the next call. if cursor.is_end then local next_state = {} @@ -157,6 +167,7 @@ local function new(replicasets, space, index_id, func_name, func_args, opts) replicaset = replicaset, vshard_call_name = vshard_call_name, timeout = call_opts.timeout, + space_name = space.name, } local state = {future = future} local source = merger_lib.new_buffer_source(fetch_chunk, context, state) diff --git a/crud/stats/init.lua b/crud/stats/init.lua index 1e1a8f83..2714ee13 100644 --- a/crud/stats/init.lua +++ b/crud/stats/init.lua @@ -255,6 +255,62 @@ function stats.wrap(func, op, opts) end end +local storage_stats_schema = { tuples_fetched = 'number', tuples_lookup = 'number' } +--- Callback to collect storage tuples stats (select/pairs). +-- +-- @function update_fetch_stats +-- +-- @tab storage_stats +-- Statistics from select storage call. +-- +-- @number storage_stats.tuples_fetched +-- Count of tuples fetched during storage call. +-- +-- @number storage_stats.tuples_lookup +-- Count of tuples looked up on storages while collecting response. +-- +-- @string space_name +-- Name of space. +-- +-- @treturn boolean Returns `true`. +-- +function stats.update_fetch_stats(storage_stats, space_name) + dev_checks(storage_stats_schema, 'string') + + if not stats.is_enabled() then + return true + end + + registry.observe_fetch( + storage_stats.tuples_fetched, + storage_stats.tuples_lookup, + space_name + ) + + return true +end + +--- Callback to collect planned map reduces stats (select/pairs). +-- +-- @function update_map_reduces +-- +-- @string space_name +-- Name of space. +-- +-- @treturn boolean Returns `true`. +-- +function stats.update_map_reduces(space_name) + dev_checks('string') + + if not stats.is_enabled() then + return true + end + + registry.observe_map_reduces(1, space_name) + + return true +end + --- Table with CRUD operation lables. -- -- @tfield string INSERT diff --git a/crud/stats/local_registry.lua b/crud/stats/local_registry.lua index c5e125f1..9626f75b 100644 --- a/crud/stats/local_registry.lua +++ b/crud/stats/local_registry.lua @@ -4,6 +4,7 @@ local dev_checks = require('crud.common.dev_checks') local stash = require('crud.common.stash') +local op_module = require('crud.stats.operation') local registry_utils = require('crud.stats.registry_utils') local registry = {} @@ -98,4 +99,56 @@ function registry.observe(latency, space_name, op, status) return true end +--- Increase statistics of storage select/pairs calls +-- +-- @function observe_fetch +-- +-- @string space_name +-- Name of space. +-- +-- @number tuples_fetched +-- Count of tuples fetched during storage call. +-- +-- @number tuples_lookup +-- Count of tuples looked up on storages while collecting response. +-- +-- @treturn boolean Returns true. +-- +function registry.observe_fetch(tuples_fetched, tuples_lookup, space_name) + dev_checks('number', 'number', 'string') + + local op = op_module.SELECT + registry_utils.init_collectors_if_required(internal.registry.spaces, space_name, op) + local collectors = internal.registry.spaces[space_name][op].details + + collectors.tuples_fetched = collectors.tuples_fetched + tuples_fetched + collectors.tuples_lookup = collectors.tuples_lookup + tuples_lookup + + return true +end + +--- Increase statistics of planned map reduces during select/pairs +-- +-- @function observe_map_reduces +-- +-- @number count +-- Count of map reduces planned. +-- +-- @string space_name +-- Name of space. +-- +-- @treturn boolean Returns true. +-- +function registry.observe_map_reduces(count, space_name) + dev_checks('number', 'string') + + local op = op_module.SELECT + registry_utils.init_collectors_if_required(internal.registry.spaces, space_name, op) + local collectors = internal.registry.spaces[space_name][op].details + + collectors.map_reduces = collectors.map_reduces + count + + return true +end + return registry diff --git a/crud/stats/registry_utils.lua b/crud/stats/registry_utils.lua index 2c99f8a3..95654461 100644 --- a/crud/stats/registry_utils.lua +++ b/crud/stats/registry_utils.lua @@ -3,6 +3,7 @@ -- local dev_checks = require('crud.common.dev_checks') +local op_module = require('crud.stats.operation') local registry_utils = {} @@ -10,10 +11,17 @@ local registry_utils = {} -- -- @function build_collectors -- +-- @string op +-- Label of registry collectors. +-- Use `require('crud.stats').op` to pick one. +-- -- @treturn table Returns collectors for success and error requests. --- Collectors store 'count', 'latency' and 'time' values. +-- Collectors store 'count', 'latency' and 'time' values. Also +-- returns additional collectors for select operation. -- -function registry_utils.build_collectors() +function registry_utils.build_collectors(op) + dev_checks('string') + local collectors = { ok = { count = 0, @@ -27,6 +35,14 @@ function registry_utils.build_collectors() }, } + if op == op_module.SELECT then + collectors.details = { + tuples_fetched = 0, + tuples_lookup = 0, + map_reduces = 0, + } + end + return collectors end @@ -53,7 +69,7 @@ function registry_utils.init_collectors_if_required(spaces, space_name, op) local space_collectors = spaces[space_name] if space_collectors[op] == nil then - space_collectors[op] = registry_utils.build_collectors() + space_collectors[op] = registry_utils.build_collectors(op) end end diff --git a/test/integration/stats_test.lua b/test/integration/stats_test.lua index 5af04db9..cf937ca8 100644 --- a/test/integration/stats_test.lua +++ b/test/integration/stats_test.lua @@ -292,6 +292,80 @@ local simple_operation_cases = { }, } +local prepare_select_data = function(g) + helpers.insert_objects(g, space_name, { + -- Storage is s-2. + { + id = 1, name = "Elizabeth", last_name = "Jackson", + age = 12, city = "New York", + }, + -- Storage is s-2. + { + id = 2, name = "Mary", last_name = "Brown", + age = 46, city = "Los Angeles", + }, + -- Storage is s-1. + { + id = 3, name = "David", last_name = "Smith", + age = 33, city = "Los Angeles", + }, + -- Storage is s-2. + { + id = 4, name = "William", last_name = "White", + age = 81, city = "Chicago", + } + }) +end + +local select_cases = { + select_by_primary_index = { + func = 'crud.select', + conditions = {{ '==', 'id_index', 3 }}, + map_reduces = 0, + tuples_fetched = 1, + tuples_lookup = 1, + }, + select_by_secondary_index = { + func = 'crud.select', + conditions = {{ '==', 'age_index', 46 }}, + map_reduces = 1, + tuples_fetched = 1, + tuples_lookup = 1, + }, + select_full_scan = { + func = 'crud.select', + conditions = {{ '>', 'id_index', 0 }, { '==', 'city', 'Kyoto' }}, + map_reduces = 1, + tuples_fetched = 0, + tuples_lookup = 4, + }, + pairs_by_primary_index = { + eval = eval.pairs, + conditions = {{ '==', 'id_index', 3 }}, + map_reduces = 0, + tuples_fetched = 1, + -- Since batch_size == 1, extra lookup is generated with + -- after_tuple scroll for second batch. + tuples_lookup = 2, + }, + pairs_by_secondary_index = { + eval = eval.pairs, + conditions = {{ '==', 'age_index', 46 }}, + map_reduces = 1, + tuples_fetched = 1, + -- Since batch_size == 1, extra lookup is generated with + -- after_tuple scroll for second batch. + tuples_lookup = 2, + }, + pairs_full_scan = { + eval = eval.pairs, + conditions = {{ '>', 'id_index', 0 }, { '==', 'city', 'Kyoto' }}, + map_reduces = 1, + tuples_fetched = 0, + tuples_lookup = 4, + }, +} + -- Generate non-null stats for all cases. local function generate_stats(g) for _, case in pairs(simple_operation_cases) do @@ -316,6 +390,19 @@ local function generate_stats(g) t.assert_not_equals(err, nil) end end + + -- Generate non-null select details. + prepare_select_data(g) + for _, case in pairs(select_cases) do + local _, err + if case.eval ~= nil then + _, err = g.router:eval(case.eval, { space_name, case.conditions }) + else + _, err = g.router:call(case.func, { space_name, case.conditions }) + end + + t.assert_equals(err, nil) + end end @@ -433,6 +520,53 @@ g.test_non_existing_space = function(g) end +for name, case in pairs(select_cases) do + local test_name = ('test_%s_details'):format(name) + + g.before_test(test_name, prepare_select_data) + + g[test_name] = function(g) + local op = 'select' + local space_name = space_name + + -- Collect stats before call. + local stats_before = g:get_stats(space_name) + t.assert_type(stats_before, 'table') + + -- Call operation. + local _, err + if case.eval ~= nil then + _, err = g.router:eval(case.eval, { space_name, case.conditions }) + else + _, err = g.router:call(case.func, { space_name, case.conditions }) + end + + t.assert_equals(err, nil) + + -- Collect stats after call. + local stats_after = g:get_stats(space_name) + t.assert_type(stats_after, 'table') + + local op_before = set_defaults_if_empty(stats_before, op) + local details_before = op_before.details + local op_after = set_defaults_if_empty(stats_after, op) + local details_after = op_after.details + + local tuples_fetched_diff = details_after.tuples_fetched - details_before.tuples_fetched + t.assert_equals(tuples_fetched_diff, case.tuples_fetched, + 'Expected count of tuples fetched') + + local tuples_lookup_diff = details_after.tuples_lookup - details_before.tuples_lookup + t.assert_equals(tuples_lookup_diff, case.tuples_lookup, + 'Expected count of tuples looked up on storage') + + local map_reduces_diff = details_after.map_reduces - details_before.map_reduces + t.assert_equals(map_reduces_diff, case.map_reduces, + 'Expected count of map reduces planned') + end +end + + g.before_test( 'test_role_reload_do_not_reset_observations', generate_stats) diff --git a/test/unit/select_executor_test.lua b/test/unit/select_executor_test.lua index ff09b9ca..da014875 100644 --- a/test/unit/select_executor_test.lua +++ b/test/unit/select_executor_test.lua @@ -105,7 +105,7 @@ g.test_one_condition_no_index = function() tarantool_iter = plan.tarantool_iter, scan_condition_num = plan.scan_condition_num, }) - t.assert_equals(get_ids(results), {2, 3}) + t.assert_equals(get_ids(results.tuples), {2, 3}) -- after tuple 2 local after_tuple = space:frommap(customers[2]):totable() @@ -115,7 +115,7 @@ g.test_one_condition_no_index = function() after_tuple = after_tuple, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {3}) + t.assert_equals(get_ids(results.tuples), {3}) -- after tuple 3 local after_tuple = space:frommap(customers[3]):totable() @@ -125,7 +125,7 @@ g.test_one_condition_no_index = function() after_tuple = after_tuple, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(#results, 0) + t.assert_equals(#results.tuples, 0) end g.test_one_condition_with_index = function() @@ -164,7 +164,7 @@ g.test_one_condition_with_index = function() scan_value = plan.scan_value, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {3, 2, 4}) -- in age order + t.assert_equals(get_ids(results.tuples), {3, 2, 4}) -- in age order -- after tuple 3 local after_tuple = space:frommap(customers[3]):totable() @@ -174,7 +174,7 @@ g.test_one_condition_with_index = function() after_tuple = after_tuple, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {2, 4}) -- in age order + t.assert_equals(get_ids(results.tuples), {2, 4}) -- in age order end g.test_multiple_conditions = function() @@ -220,7 +220,7 @@ g.test_multiple_conditions = function() scan_value = plan.scan_value, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {5, 2}) -- in age order + t.assert_equals(get_ids(results.tuples), {5, 2}) -- in age order -- after tuple 5 local after_tuple = space:frommap(customers[5]):totable() @@ -230,7 +230,7 @@ g.test_multiple_conditions = function() after_tuple = after_tuple, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {2}) + t.assert_equals(get_ids(results.tuples), {2}) end g.test_composite_index = function() @@ -271,7 +271,7 @@ g.test_composite_index = function() scan_value = plan.scan_value, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {2, 1, 4}) -- in full_name order + t.assert_equals(get_ids(results.tuples), {2, 1, 4}) -- in full_name order -- after tuple 2 local after_tuple = space:frommap(customers[2]):totable() @@ -281,7 +281,7 @@ g.test_composite_index = function() after_tuple = after_tuple, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {1, 4}) + t.assert_equals(get_ids(results.tuples), {1, 4}) end g.test_get_by_id = function() @@ -319,7 +319,7 @@ g.test_get_by_id = function() scan_value = plan.scan_value, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {2}) + t.assert_equals(get_ids(results.tuples), {2}) end g.test_early_exit = function() @@ -360,7 +360,7 @@ g.test_early_exit = function() scan_value = plan.scan_value, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {4, 2}) + t.assert_equals(get_ids(results.tuples), {4, 2}) end g.test_select_all = function() @@ -397,7 +397,7 @@ g.test_select_all = function() scan_value = plan.scan_value, tarantool_iter = plan.tarantool_iter, }) - t.assert_equals(get_ids(results), {1, 2, 3, 4}) + t.assert_equals(get_ids(results.tuples), {1, 2, 3, 4}) end g.test_limit = function() @@ -435,7 +435,7 @@ g.test_limit = function() tarantool_iter = plan.tarantool_iter, limit = 0, }) - t.assert_equals(#results, 0) + t.assert_equals(#results.tuples, 0) -- limit 2 local results = select_executor.execute(space, index, filter_func, { @@ -443,5 +443,5 @@ g.test_limit = function() tarantool_iter = plan.tarantool_iter, limit = 2, }) - t.assert_equals(get_ids(results), {1, 2}) + t.assert_equals(get_ids(results.tuples), {1, 2}) end diff --git a/test/unit/stats_test.lua b/test/unit/stats_test.lua index 628a9266..8fced621 100644 --- a/test/unit/stats_test.lua +++ b/test/unit/stats_test.lua @@ -171,6 +171,19 @@ for name, case in pairs(observe_cases) do }, 'Other status collectors initialized after observations' ) + + -- SELECT collectors have additional details section. + if op == stats_module.op.SELECT then + t.assert_equals( + op_stats.details, + { + tuples_fetched = 0, + tuples_lookup = 0, + map_reduces = 0, + }, + 'Detail collectors initialized after select observations' + ) + end end end end @@ -553,3 +566,52 @@ g.test_reset_for_disabled_stats_does_not_init_module = function(g) local stats_after = g:get_stats() t.assert_equals(stats_after, {}, "Stats is still empty") end + +g.test_fetch_stats_update = function(g) + local storage_cursor_stats = { tuples_fetched = 5, tuples_lookup = 25 } + + g.router:eval([[ stats_module.update_fetch_stats(...) ]], + { storage_cursor_stats, space_name }) + + local op = stats_module.op.SELECT + local stats = g:get_stats(space_name) + + t.assert_not_equals(stats[op], nil, + 'Fetch stats update inits SELECT collectors') + + local details = stats[op].details + + t.assert_equals(details.tuples_fetched, 5, + 'tuples_fetched is inremented by expected value') + t.assert_equals(details.tuples_lookup, 25, + 'tuples_lookup is inremented by expected value') +end + +g.test_disable_stats_do_not_break_fetch_stats_update_call = function(g) + local storage_cursor_stats = { tuples_fetched = 5, tuples_lookup = 25 } + + g:disable_stats() + + local _, err = g.router:eval([[ stats_module.update_fetch_stats(...) ]], + { storage_cursor_stats, space_name }) + t.assert_equals(err, nil) +end + +g.test_map_reduce_increment = function(g) + local op = stats_module.op.SELECT + + local _, err = g.router:eval([[ stats_module.update_map_reduces(...) ]], { space_name }) + t.assert_equals(err, nil) + + local stats = g:get_stats() + + t.assert_equals(stats.spaces[space_name][op].details.map_reduces, 1, + "Counter of map reduces incremented") +end + +g.test_disable_stats_do_not_break_map_reduce_update_call = function(g) + g:disable_stats() + + local _, err = g.router:eval([[ stats_module.update_map_reduces(...) ]], { space_name }) + t.assert_equals(err, nil) +end