Skip to content

Commit

Permalink
stats: add detailed statistics for select/pairs
Browse files Browse the repository at this point in the history
After this patch, statistics `select` section additionally contains
`details` collectors.

```
crud.stats('my_space').select.details
---
- map_reduces: 4
  tuples_fetched: 10500
  tuples_lookup: 238000
...
```

`map_reduces` is the count of planned map reduces (including those not
executed successfully). `tuples_fetched` is the count of tuples fetched
from storages during execution, `tuples_lookup` is the count of tuples
looked up on storages while collecting responses for calls (including
scrolls for multibatch requests). Details data is updated as part of
the request process, so you may get new details before `select`/`pairs`
call is finished and observed with count, latency and time collectors.

Part of #224
  • Loading branch information
DifferentialOrange committed Feb 25, 2022
1 parent aeb21e6 commit af30790
Show file tree
Hide file tree
Showing 12 changed files with 408 additions and 31 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,23 @@ returns. `count` is total requests count since instance start
or stats restart. `latency` is average time of requests execution,
`time` is the total time of requests execution.

`select` section additionally contains `details` collectors.
```lua
crud.stats('my_space').select.details
---
- map_reduces: 4
tuples_fetched: 10500
tuples_lookup: 238000
...
```
`map_reduces` is the count of planned map reduces (including those not
executed successfully). `tuples_fetched` is the count of tuples fetched
from storages during execution, `tuples_lookup` is the count of tuples
looked up on storages while collecting responses for calls (including
scrolls for multibatch requests). Details data is updated as part of
the request process, so you may get new details before `select`/`pairs`
call is finished and observed with count, latency and time collectors.

Since `pairs` request behavior differs from any other crud request, its
statistics collection also has specific behavior. Statistics (`select`
section) are updated after `pairs` cycle is finished: you
Expand Down
13 changes: 9 additions & 4 deletions crud/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ local function select_on_storage(space_name, index_id, conditions, opts)
end

-- execute select
local tuples, err = select_executor.execute(space, index, filter_func, {
local resp, err = select_executor.execute(space, index, filter_func, {
scan_value = opts.scan_value,
after_tuple = opts.after_tuple,
tarantool_iter = opts.tarantool_iter,
Expand All @@ -70,15 +70,20 @@ local function select_on_storage(space_name, index_id, conditions, opts)
end

local cursor
if #tuples < opts.limit or opts.limit == 0 then
if resp.tuples_fetched < opts.limit or opts.limit == 0 then
cursor = {is_end = true}
else
cursor = make_cursor(tuples)
cursor = make_cursor(resp.tuples)
end

cursor.stats = {
tuples_lookup = resp.tuples_lookup,
tuples_fetched = resp.tuples_fetched,
}

-- getting tuples with user defined fields (if `fields` option is specified)
-- and fields that are needed for comparison on router (primary key + scan key)
return cursor, schema.filter_tuples_fields(tuples, opts.field_names)
return cursor, schema.filter_tuples_fields(resp.tuples, opts.field_names)
end

function select_module.init()
Expand Down
3 changes: 3 additions & 0 deletions crud/select/compat/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ local dev_checks = require('crud.common.dev_checks')
local common = require('crud.select.compat.common')
local schema = require('crud.common.schema')
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
local stats = require('crud.stats')

local compare_conditions = require('crud.compare.conditions')
local select_plan = require('crud.compare.plan')
Expand Down Expand Up @@ -115,6 +116,8 @@ local function build_select_iterator(space_name, user_conditions, opts)
if err ~= nil then
return nil, err, true
end
else
stats.update_map_reduces(space_name)
end

local tuples_limit = opts.first
Expand Down
11 changes: 11 additions & 0 deletions crud/select/compat/select_old.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
local stats = require('crud.stats')

local compare_conditions = require('crud.compare.conditions')
local select_plan = require('crud.compare.plan')
Expand Down Expand Up @@ -59,6 +60,14 @@ local function select_iteration(space_name, plan, opts)

local tuples = {}
for replicaset_uuid, replicaset_results in pairs(results) do
-- Stats extracted with callback here and not passed
-- outside to wrapper because fetch for pairs can be
-- called even after pairs() return from generators.
local cursor = replicaset_results[1]
if cursor.stats ~= nil then
stats.update_fetch_stats(cursor.stats, space_name)
end

tuples[replicaset_uuid] = replicaset_results[2]
end

Expand Down Expand Up @@ -141,6 +150,8 @@ local function build_select_iterator(space_name, user_conditions, opts)
if err ~= nil then
return nil, err, true
end
else
stats.update_map_reduces(space_name)
end

-- generate tuples comparator
Expand Down
29 changes: 19 additions & 10 deletions crud/select/executor.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
local errors = require('errors')
local fun = require('fun')

local dev_checks = require('crud.common.dev_checks')
local select_comparators = require('crud.compare.comparators')
Expand Down Expand Up @@ -68,13 +69,12 @@ function executor.execute(space, index, filter_func, opts)

opts = opts or {}

local resp = { tuples_fetched = 0, tuples_lookup = 0, tuples = {} }

if opts.limit == 0 then
return {}
return resp
end

local tuples = {}
local tuples_count = 0

local value = opts.scan_value
if opts.after_tuple ~= nil then
local new_value = generate_value(opts.after_tuple, opts.scan_value, index.parts, opts.tarantool_iter)
Expand All @@ -84,7 +84,16 @@ function executor.execute(space, index, filter_func, opts)
end

local tuple
local gen = index:pairs(value, {iterator = opts.tarantool_iter})
local raw_gen, param, state = index:pairs(value, {iterator = opts.tarantool_iter})
local gen = fun.wrap(function(param, state)
local next_state, var = raw_gen(param, state)

if var ~= nil then
resp.tuples_lookup = resp.tuples_lookup + 1
end

return next_state, var
end, param, state)

if opts.after_tuple ~= nil then
local err
Expand All @@ -94,7 +103,7 @@ function executor.execute(space, index, filter_func, opts)
end

if tuple == nil then
return {}
return resp
end
end

Expand All @@ -110,10 +119,10 @@ function executor.execute(space, index, filter_func, opts)
local matched, early_exit = filter_func(tuple)

if matched then
table.insert(tuples, tuple)
tuples_count = tuples_count + 1
table.insert(resp.tuples, tuple)
resp.tuples_fetched = resp.tuples_fetched + 1

if opts.limit ~= nil and tuples_count >= opts.limit then
if opts.limit ~= nil and resp.tuples_fetched >= opts.limit then
break
end
elseif early_exit then
Expand All @@ -123,7 +132,7 @@ function executor.execute(space, index, filter_func, opts)
gen.state, tuple = gen(gen.param, gen.state)
end

return tuples
return resp
end

return executor
11 changes: 11 additions & 0 deletions crud/select/merger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ local compat = require('crud.common.compat')
local merger_lib = compat.require('tuple.merger', 'merger')

local Keydef = require('crud.compare.keydef')
local stats = require('crud.stats')

local function bswap_u16(num)
return bit.rshift(bit.bswap(tonumber(num)), 16)
Expand Down Expand Up @@ -93,6 +94,7 @@ local function fetch_chunk(context, state)
local replicaset = context.replicaset
local vshard_call_name = context.vshard_call_name
local timeout = context.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local space_name = context.space_name
local future = state.future

-- The source was entirely drained.
Expand All @@ -109,6 +111,14 @@ local function fetch_chunk(context, state)
-- Decode metainfo, leave data to be processed by the merger.
local cursor = decode_metainfo(buf)

-- Extract stats info.
-- Stats extracted with callback here and not passed
-- outside to wrapper because fetch for pairs can be
-- called even after pairs() return from generators.
if cursor.stats ~= nil then
stats.update_fetch_stats(cursor.stats, space_name)
end

-- Check whether we need the next call.
if cursor.is_end then
local next_state = {}
Expand Down Expand Up @@ -157,6 +167,7 @@ local function new(replicasets, space, index_id, func_name, func_args, opts)
replicaset = replicaset,
vshard_call_name = vshard_call_name,
timeout = call_opts.timeout,
space_name = space.name,
}
local state = {future = future}
local source = merger_lib.new_buffer_source(fetch_chunk, context, state)
Expand Down
56 changes: 56 additions & 0 deletions crud/stats/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,62 @@ function stats.wrap(func, op, opts)
end
end

local storage_stats_schema = { tuples_fetched = 'number', tuples_lookup = 'number' }
--- Callback to collect storage tuples stats (select/pairs).
--
-- @function update_fetch_stats
--
-- @tab storage_stats
-- Statistics from select storage call.
--
-- @number storage_stats.tuples_fetched
-- Count of tuples fetched during storage call.
--
-- @number storage_stats.tuples_lookup
-- Count of tuples looked up on storages while collecting response.
--
-- @string space_name
-- Name of space.
--
-- @treturn boolean Returns `true`.
--
function stats.update_fetch_stats(storage_stats, space_name)
dev_checks(storage_stats_schema, 'string')

if not stats.is_enabled() then
return true
end

registry.observe_fetch(
storage_stats.tuples_fetched,
storage_stats.tuples_lookup,
space_name
)

return true
end

--- Callback to collect planned map reduces stats (select/pairs).
--
-- @function update_map_reduces
--
-- @string space_name
-- Name of space.
--
-- @treturn boolean Returns `true`.
--
function stats.update_map_reduces(space_name)
dev_checks('string')

if not stats.is_enabled() then
return true
end

registry.observe_map_reduces(1, space_name)

return true
end

--- Table with CRUD operation lables.
--
-- @tfield string INSERT
Expand Down
53 changes: 53 additions & 0 deletions crud/stats/local_registry.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

local dev_checks = require('crud.common.dev_checks')
local stash = require('crud.common.stash')
local op_module = require('crud.stats.operation')
local registry_utils = require('crud.stats.registry_utils')

local registry = {}
Expand Down Expand Up @@ -98,4 +99,56 @@ function registry.observe(latency, space_name, op, status)
return true
end

--- Increase statistics of storage select/pairs calls
--
-- @function observe_fetch
--
-- @string space_name
-- Name of space.
--
-- @number tuples_fetched
-- Count of tuples fetched during storage call.
--
-- @number tuples_lookup
-- Count of tuples looked up on storages while collecting response.
--
-- @treturn boolean Returns true.
--
function registry.observe_fetch(tuples_fetched, tuples_lookup, space_name)
dev_checks('number', 'number', 'string')

local op = op_module.SELECT
registry_utils.init_collectors_if_required(internal.registry.spaces, space_name, op)
local collectors = internal.registry.spaces[space_name][op].details

collectors.tuples_fetched = collectors.tuples_fetched + tuples_fetched
collectors.tuples_lookup = collectors.tuples_lookup + tuples_lookup

return true
end

--- Increase statistics of planned map reduces during select/pairs
--
-- @function observe_map_reduces
--
-- @number count
-- Count of map reduces planned.
--
-- @string space_name
-- Name of space.
--
-- @treturn boolean Returns true.
--
function registry.observe_map_reduces(count, space_name)
dev_checks('number', 'string')

local op = op_module.SELECT
registry_utils.init_collectors_if_required(internal.registry.spaces, space_name, op)
local collectors = internal.registry.spaces[space_name][op].details

collectors.map_reduces = collectors.map_reduces + count

return true
end

return registry
22 changes: 19 additions & 3 deletions crud/stats/registry_utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,25 @@
--

local dev_checks = require('crud.common.dev_checks')
local op_module = require('crud.stats.operation')

local registry_utils = {}

--- Build collectors for local registry.
--
-- @function build_collectors
--
-- @string op
-- Label of registry collectors.
-- Use `require('crud.stats').op` to pick one.
--
-- @treturn table Returns collectors for success and error requests.
-- Collectors store 'count', 'latency' and 'time' values.
-- Collectors store 'count', 'latency' and 'time' values. Also
-- returns additional collectors for select operation.
--
function registry_utils.build_collectors()
function registry_utils.build_collectors(op)
dev_checks('string')

local collectors = {
ok = {
count = 0,
Expand All @@ -27,6 +35,14 @@ function registry_utils.build_collectors()
},
}

if op == op_module.SELECT then
collectors.details = {
tuples_fetched = 0,
tuples_lookup = 0,
map_reduces = 0,
}
end

return collectors
end

Expand All @@ -53,7 +69,7 @@ function registry_utils.init_collectors_if_required(spaces, space_name, op)

local space_collectors = spaces[space_name]
if space_collectors[op] == nil then
space_collectors[op] = registry_utils.build_collectors()
space_collectors[op] = registry_utils.build_collectors(op)
end
end

Expand Down
Loading

0 comments on commit af30790

Please sign in to comment.