Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include a new downsampling operation #1574

Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1f4e757
feature: include a new downsampling operation
salvatore-campagna Sep 7, 2022
126e99c
fix: documentation format violations
salvatore-campagna Sep 7, 2022
54b9829
fix: documentation format violations
salvatore-campagna Sep 7, 2022
61ade59
fix: use f-string for path
salvatore-campagna Sep 8, 2022
edf2e08
fix: use 'downsample'
salvatore-campagna Sep 8, 2022
81db5c6
fix: use 'downsample' instead of 'downsampling'
salvatore-campagna Sep 8, 2022
1446d12
fix: get rid of the body parameter
salvatore-campagna Sep 8, 2022
b81526d
fix: make Downsample operation administrative
salvatore-campagna Sep 8, 2022
433726c
fix: remove body parameter
salvatore-campagna Sep 8, 2022
97e2922
fix: make Downsample non-administrative
salvatore-campagna Sep 8, 2022
e4a8d81
fix: remove body parameter
salvatore-campagna Sep 8, 2022
d813ce0
test: fix endpoint and do not require body
salvatore-campagna Sep 8, 2022
01d0d75
Update docs/track.rst
salvatore-campagna Sep 8, 2022
571b712
Update docs/track.rst
salvatore-campagna Sep 8, 2022
aaac8c5
Update docs/track.rst
salvatore-campagna Sep 8, 2022
2424935
Update docs/track.rst
salvatore-campagna Sep 8, 2022
1f3ed4b
fix: include a param source for the downsample operation
salvatore-campagna Sep 8, 2022
693a9d3
fix: do not fail if index does not exist
salvatore-campagna Sep 14, 2022
476a573
Merge branch 'master' into feature/downsampling-operation
salvatore-campagna Sep 15, 2022
d5e5ffe
fix: use get_target instead of static default value
salvatore-campagna Sep 15, 2022
9d1c257
fix: remove ignore_unavailable param
salvatore-campagna Sep 15, 2022
cad9f53
fix: params.get_target attribute does not exist
salvatore-campagna Sep 15, 2022
7e063e2
test: use index instead of source-index
salvatore-campagna Sep 15, 2022
ad1a9a6
fix: support both index and source-index params
salvatore-campagna Sep 15, 2022
104a715
fix: explicitly check the default fixed_interval
salvatore-campagna Sep 15, 2022
21cc596
test: check both default index and source index
salvatore-campagna Sep 15, 2022
029babb
docs: update source-index param documentation
salvatore-campagna Sep 15, 2022
86e05ee
fix: do not assert source-index
salvatore-campagna Sep 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3056,6 +3056,39 @@ The following meta data is always returned:
* ``weight``: The number of fetched pages. Should always equal to the ``pages`` parameter.
* ``unit``: The unit in which to interpret ``weight``. Always "ops".

downsample
~~~~~~~~~~

Executes a downsampling operation on an index producing a new index whose data is aggregated on the @timestamp field.


Properties
""""""""""

* ``fixed_interval`` (optional, defaults to ``1h``): The aggregation interval key defined as in `https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-datehistogram-aggregation.html#fixed_intervals`.
* ``source-index`` (optional, defaults to ``tsdb-index``): The index containing data to aggregate which includes a ``@timestamp`` field. Note that this index should be marked read-only prior to the execution of this operation.
Copy link
Contributor

@DJRickyB DJRickyB Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* ``source-index`` (optional, defaults to ``tsdb-index``): The index containing data to aggregate which includes a ``@timestamp`` field. Note that this index should be marked read-only prior to the execution of this operation.
* ``source-index`` (optional): The index containing data to aggregate which includes a ``@timestamp`` field. Note that this index should be marked read-only prior to the execution of this operation. If there is only one index defined in the ``indices`` of the track definition, that will be used as the default.

* ``target-index`` (optional, defaults to ``{source-index}-{fixed-interval}``): Tne new target index created by the downsampling operation and including aggregated data.

**Example**

Executes a downsampling operation aggregating data in the source index (test-source-index) and creating a new target index (test-target-index) applying an aggregation
interval of 1 minute on the @timestamp field::

{
"name": "downsample",
"operation": {
"operation-type": "downsample",
"fixed-interval": "1m",
"source-index": "test-source-index",
"target-index": "tsdb-target-index"
}
}

Meta-data
"""""""""

The operation returns no meta-data.

field-caps
~~~~~~~~~~~~~~~~~~~

Expand Down
44 changes: 44 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def register_default_runners():
# these requests should not be retried as they are not idempotent
register_runner(track.OperationType.CreateSnapshot, CreateSnapshot(), async_runner=True)
register_runner(track.OperationType.RestoreSnapshot, RestoreSnapshot(), async_runner=True)
register_runner(track.OperationType.Downsample, Downsample(), async_runner=True)
# We treat the following as administrative commands and thus already start to wrap them in a retry.
register_runner(track.OperationType.ClusterHealth, Retry(ClusterHealth()), async_runner=True)
register_runner(track.OperationType.PutPipeline, Retry(PutPipeline()), async_runner=True)
Expand Down Expand Up @@ -1369,6 +1370,7 @@ async def __call__(self, es, params):
try:
for index_name in indices:
if not only_if_exists:
request_params["ignore_unavailable"] = "true"
DJRickyB marked this conversation as resolved.
Show resolved Hide resolved
await es.indices.delete(index=index_name, params=request_params)
ops += 1
elif only_if_exists:
Expand Down Expand Up @@ -2706,6 +2708,48 @@ def __repr__(self, *args, **kwargs):
return "sql"


class Downsample(Runner):
"""
Executes a downsampling operation creating the target index and aggregating data in the source index on the @timestamp field.
"""

async def __call__(self, es, params):

request_params, request_headers = self._transport_request_params(params)

fixed_interval = mandatory(params, "fixed-interval", self)
if fixed_interval is None:
raise exceptions.DataError(
"Parameter source for operation 'downsample' did not provide the mandatory parameter 'fixed-interval'. "
"Add it to your parameter source and try again."
)

source_index = mandatory(params, "source-index", self)
if source_index is None:
raise exceptions.DataError(
"Parameter source for operation 'downsample' did not provide the mandatory parameter 'source-index'. "
"Add it to your parameter source and try again."
)

target_index = mandatory(params, "target-index", self)
if target_index is None:
raise exceptions.DataError(
"Parameter source for operation 'downsample' did not provide the mandatory parameter 'target-index'. "
"Add it to your parameter source and try again."
)

path = f"/{source_index}/_downsample/{target_index}"

await es.perform_request(
method="POST", path=path, body={"fixed_interval": fixed_interval}, params=request_params, headers=request_headers
)

return {"weight": 1, "unit": "ops", "success": True}

def __repr__(self, *args, **kwargs):
return "downsample"


class FieldCaps(Runner):
"""
Retrieve `the capabilities of fields among indices.
Expand Down
14 changes: 14 additions & 0 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,19 @@ def params(self):
return parsed_params


class DownsampleParamSource(ParamSource):
def __init__(self, track, params, **kwargs):
super().__init__(track, params, **kwargs)
self._fixed_interval = params.get("fixed-interval", "1h")
self._source_index = params.get("source-index", "tsdb-index")
salvatore-campagna marked this conversation as resolved.
Show resolved Hide resolved
self._target_index = params.get("target-index", f"{self._source_index}-{self._fixed_interval}")

def params(self):
parsed_params = {"fixed-interval": self._fixed_interval, "source-index": self._source_index, "target-index": self._target_index}
parsed_params.update(self._client_params())
return parsed_params


def get_target(track, params):
if len(track.indices) == 1:
default_target = track.indices[0].name
Expand Down Expand Up @@ -1342,6 +1355,7 @@ def read_bulk(self):
register_param_source_for_operation(track.OperationType.DeleteComposableTemplate, DeleteComposableTemplateParamSource)
register_param_source_for_operation(track.OperationType.Sleep, SleepParamSource)
register_param_source_for_operation(track.OperationType.ForceMerge, ForceMergeParamSource)
register_param_source_for_operation(track.OperationType.Downsample, DownsampleParamSource)

# Also register by name, so users can use it too
register_param_source_for_name("file-reader", BulkIndexParamSource)
3 changes: 3 additions & 0 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ class OperationType(Enum):
FieldCaps = 17
CompositeAgg = 18
WaitForCurrentSnapshotsCreate = 19
Downsample = 20

# administrative actions
ForceMerge = 1001
Expand Down Expand Up @@ -852,6 +853,8 @@ def from_hyphenated_string(cls, v):
return OperationType.Sql
elif v == "field-caps":
return OperationType.FieldCaps
elif v == "downsample":
return OperationType.Downsample
else:
raise KeyError(f"No enum value for [{v}]")

Expand Down
69 changes: 69 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5275,6 +5275,75 @@ async def test_mandatory_query_in_body_param(self, es):
)


class TestDownsampleRunner:
default_response = {}

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_index_downsample(self, es):
es.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(self.default_response)))

sql_runner = runner.Downsample()
params = {
"operation-type": "downsample",
"fixed-interval": "1d",
"source-index": "source-index",
"target-index": "target-index",
}

async with sql_runner:
result = await sql_runner(es, params)

assert result == {"success": True, "weight": 1, "unit": "ops"}

es.perform_request.assert_awaited_once_with(
method="POST",
path="/source-index/_downsample/target-index",
body={"fixed_interval": params.get("fixed-interval")},
params={},
headers={},
)

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_mandatory_fixed_interval_in_body_param(self, es):
sql_runner = runner.Downsample()
params = {"operation-type": "downsample", "source-index": "source-index", "target-index": "target-index"}

with pytest.raises(exceptions.DataError) as exc:
await sql_runner(es, params)
assert exc.value.args[0] == (
"Parameter source for operation 'downsample' did not provide the mandatory parameter 'fixed-interval'. "
"Add it to your parameter source and try again."
)

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_mandatory_source_index_in_body_param(self, es):
sql_runner = runner.Downsample()
params = {"operation-type": "downsample", "fixed-interval": "1d", "target-index": "target-index"}

with pytest.raises(exceptions.DataError) as exc:
await sql_runner(es, params)
assert exc.value.args[0] == (
"Parameter source for operation 'downsample' did not provide the mandatory parameter 'source-index'. "
"Add it to your parameter source and try again."
)

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_mandatory_target_index_in_body_param(self, es):
sql_runner = runner.Downsample()
params = {"operation-type": "downsample", "fixed-interval": "1d", "source-index": "source-index"}

with pytest.raises(exceptions.DataError) as exc:
await sql_runner(es, params)
assert exc.value.args[0] == (
"Parameter source for operation 'downsample' did not provide the mandatory parameter 'target-index'. "
"Add it to your parameter source and try again."
)


class TestSubmitAsyncSearch:
@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
Expand Down
38 changes: 38 additions & 0 deletions tests/track/params_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2772,3 +2772,41 @@ def test_force_merge_all_params(self):
assert p["request-timeout"] == 30
assert p["max-num-segments"] == 1
assert p["mode"] == "polling"


class TestDownsampleParamSource:
def test_downsample_all_params(self):
source = params.DownsampleParamSource(
track.Track(name="unit-test"),
params={"source-index": "test-source-index", "target-index": "test-target-index", "fixed-interval": "1m"},
)

p = source.params()

assert p["fixed-interval"] == "1m"
assert p["source-index"] == "test-source-index"
assert p["target-index"] == "test-target-index"

def test_downsample_target_index_param(self):
source = params.DownsampleParamSource(
track.Track(name="unit-test"),
params={"source-index": "tsdb", "fixed-interval": "1m"},
)

p = source.params()

assert p["fixed-interval"] == "1m"
assert p["source-index"] == "tsdb"
assert p["target-index"] == "tsdb-1m"

def test_downsample_empty_params(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this test buys us? Checked with a debugger and captured the value of p:

{'fixed-interval': '1h', 'source-index': None, 'target-index': 'None-1h', 'request-timeout': None, 'headers': None, 'opaque-id': None}

Might be better instead to have the Track constructor call include an indices param with value like [{"name": "test-source-index", "body": "index.json"}] and assert that source-index gets provided as test-source-index

source = params.DownsampleParamSource(
track.Track(name="unit-test"),
params={},
)

p = source.params()

assert p["fixed-interval"] != ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the documented default, and should need changing if the default changes

assert p["source-index"] != ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To put my above comment slightly differently, this assert only passes because the source-index is None instead of "", technically different but not a going concern. You've covered my larger concern in test_downsample_default_index_param, where you ensure there's a default when there is supposed to be a default, so perhaps this can just be deleted and the test is left to assert the default fixed-interval?

assert p["target-index"] == f"{p['source-index']}-{p['fixed-interval']}"