Skip to content

Commit

Permalink
Include a new downsampling operation (#1574)
Browse files Browse the repository at this point in the history
The new downsampling operation allows Rally to hit the Elasticsearch
/<source-index>/_rollup/<target-index> endpoint to start a downsampling
operation on the source index. Expected parameters include:

* fixed_interval: the aggregation granularity using the same interval
  definition used by date histograms
* source-index: the index to downsample applying the aggregation
* target-index: the index created as a result of the aggregation
  on the @timestamp field
  • Loading branch information
salvatore-campagna committed Sep 19, 2022
1 parent 067b27c commit af0c464
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 0 deletions.
33 changes: 33 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3056,6 +3056,39 @@ The following meta data is always returned:
* ``weight``: The number of fetched pages. Should always equal to the ``pages`` parameter.
* ``unit``: The unit in which to interpret ``weight``. Always "ops".

downsample
~~~~~~~~~~

Executes a downsampling operation on an index producing a new index whose data is aggregated on the @timestamp field.


Properties
""""""""""

* ``fixed_interval`` (optional, defaults to ``1h``): The aggregation interval key defined as in `https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-datehistogram-aggregation.html#fixed_intervals`.
* ``source-index`` (optional): The index containing data to aggregate which includes a ``@timestamp`` field. Note that this index should be marked read-only prior to the execution of this operation. If there is only one index defined in the ``indices`` of the track definition, that will be used as the default.
* ``target-index`` (optional, defaults to ``{source-index}-{fixed-interval}``): Tne new target index created by the downsampling operation and including aggregated data.

**Example**

Executes a downsampling operation aggregating data in the source index (test-source-index) and creating a new target index (test-target-index) applying an aggregation
interval of 1 minute on the @timestamp field::

{
"name": "downsample",
"operation": {
"operation-type": "downsample",
"fixed-interval": "1m",
"source-index": "test-source-index",
"target-index": "tsdb-target-index"
}
}

Meta-data
"""""""""

The operation returns no meta-data.

field-caps
~~~~~~~~~~~~~~~~~~~

Expand Down
43 changes: 43 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def register_default_runners():
# these requests should not be retried as they are not idempotent
register_runner(track.OperationType.CreateSnapshot, CreateSnapshot(), async_runner=True)
register_runner(track.OperationType.RestoreSnapshot, RestoreSnapshot(), async_runner=True)
register_runner(track.OperationType.Downsample, Downsample(), async_runner=True)
# We treat the following as administrative commands and thus already start to wrap them in a retry.
register_runner(track.OperationType.ClusterHealth, Retry(ClusterHealth()), async_runner=True)
register_runner(track.OperationType.PutPipeline, Retry(PutPipeline()), async_runner=True)
Expand Down Expand Up @@ -2706,6 +2707,48 @@ def __repr__(self, *args, **kwargs):
return "sql"


class Downsample(Runner):
"""
Executes a downsampling operation creating the target index and aggregating data in the source index on the @timestamp field.
"""

async def __call__(self, es, params):

request_params, request_headers = self._transport_request_params(params)

fixed_interval = mandatory(params, "fixed-interval", self)
if fixed_interval is None:
raise exceptions.DataError(
"Parameter source for operation 'downsample' did not provide the mandatory parameter 'fixed-interval'. "
"Add it to your parameter source and try again."
)

source_index = mandatory(params, "source-index", self)
if source_index is None:
raise exceptions.DataError(
"Parameter source for operation 'downsample' did not provide the mandatory parameter 'source-index'. "
"Add it to your parameter source and try again."
)

target_index = mandatory(params, "target-index", self)
if target_index is None:
raise exceptions.DataError(
"Parameter source for operation 'downsample' did not provide the mandatory parameter 'target-index'. "
"Add it to your parameter source and try again."
)

path = f"/{source_index}/_downsample/{target_index}"

await es.perform_request(
method="POST", path=path, body={"fixed_interval": fixed_interval}, params=request_params, headers=request_headers
)

return {"weight": 1, "unit": "ops", "success": True}

def __repr__(self, *args, **kwargs):
return "downsample"


class FieldCaps(Runner):
"""
Retrieve `the capabilities of fields among indices.
Expand Down
15 changes: 15 additions & 0 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,20 @@ def params(self):
return parsed_params


class DownsampleParamSource(ParamSource):
def __init__(self, track, params, **kwargs):
super().__init__(track, params, **kwargs)
self._fixed_interval = params.get("fixed-interval", "1h")
params["index"] = params.get("source-index")
self._source_index = get_target(track, params)
self._target_index = params.get("target-index", f"{self._source_index}-{self._fixed_interval}")

def params(self):
parsed_params = {"fixed-interval": self._fixed_interval, "source-index": self._source_index, "target-index": self._target_index}
parsed_params.update(self._client_params())
return parsed_params


def get_target(track, params):
if len(track.indices) == 1:
default_target = track.indices[0].name
Expand Down Expand Up @@ -1342,6 +1356,7 @@ def read_bulk(self):
register_param_source_for_operation(track.OperationType.DeleteComposableTemplate, DeleteComposableTemplateParamSource)
register_param_source_for_operation(track.OperationType.Sleep, SleepParamSource)
register_param_source_for_operation(track.OperationType.ForceMerge, ForceMergeParamSource)
register_param_source_for_operation(track.OperationType.Downsample, DownsampleParamSource)

# Also register by name, so users can use it too
register_param_source_for_name("file-reader", BulkIndexParamSource)
3 changes: 3 additions & 0 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ class OperationType(Enum):
FieldCaps = 17
CompositeAgg = 18
WaitForCurrentSnapshotsCreate = 19
Downsample = 20

# administrative actions
ForceMerge = 1001
Expand Down Expand Up @@ -852,6 +853,8 @@ def from_hyphenated_string(cls, v):
return OperationType.Sql
elif v == "field-caps":
return OperationType.FieldCaps
elif v == "downsample":
return OperationType.Downsample
else:
raise KeyError(f"No enum value for [{v}]")

Expand Down
69 changes: 69 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5275,6 +5275,75 @@ async def test_mandatory_query_in_body_param(self, es):
)


class TestDownsampleRunner:
default_response = {}

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_index_downsample(self, es):
es.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(self.default_response)))

sql_runner = runner.Downsample()
params = {
"operation-type": "downsample",
"fixed-interval": "1d",
"source-index": "source-index",
"target-index": "target-index",
}

async with sql_runner:
result = await sql_runner(es, params)

assert result == {"success": True, "weight": 1, "unit": "ops"}

es.perform_request.assert_awaited_once_with(
method="POST",
path="/source-index/_downsample/target-index",
body={"fixed_interval": params.get("fixed-interval")},
params={},
headers={},
)

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_mandatory_fixed_interval_in_body_param(self, es):
sql_runner = runner.Downsample()
params = {"operation-type": "downsample", "source-index": "source-index", "target-index": "target-index"}

with pytest.raises(exceptions.DataError) as exc:
await sql_runner(es, params)
assert exc.value.args[0] == (
"Parameter source for operation 'downsample' did not provide the mandatory parameter 'fixed-interval'. "
"Add it to your parameter source and try again."
)

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_mandatory_source_index_in_body_param(self, es):
sql_runner = runner.Downsample()
params = {"operation-type": "downsample", "fixed-interval": "1d", "target-index": "target-index"}

with pytest.raises(exceptions.DataError) as exc:
await sql_runner(es, params)
assert exc.value.args[0] == (
"Parameter source for operation 'downsample' did not provide the mandatory parameter 'source-index'. "
"Add it to your parameter source and try again."
)

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_mandatory_target_index_in_body_param(self, es):
sql_runner = runner.Downsample()
params = {"operation-type": "downsample", "fixed-interval": "1d", "source-index": "source-index"}

with pytest.raises(exceptions.DataError) as exc:
await sql_runner(es, params)
assert exc.value.args[0] == (
"Parameter source for operation 'downsample' did not provide the mandatory parameter 'target-index'. "
"Add it to your parameter source and try again."
)


class TestSubmitAsyncSearch:
@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
Expand Down
49 changes: 49 additions & 0 deletions tests/track/params_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2772,3 +2772,52 @@ def test_force_merge_all_params(self):
assert p["request-timeout"] == 30
assert p["max-num-segments"] == 1
assert p["mode"] == "polling"


class TestDownsampleParamSource:
def test_downsample_all_params(self):
source = params.DownsampleParamSource(
track.Track(name="unit-test"),
params={"source-index": "test-source-index", "target-index": "test-target-index", "fixed-interval": "1m"},
)

p = source.params()

assert p["fixed-interval"] == "1m"
assert p["source-index"] == "test-source-index"
assert p["target-index"] == "test-target-index"

def test_downsample_default_index_param(self):
source = params.DownsampleParamSource(
track.Track(name="unit-test", indices=[track.Index(name="test-source-index", body="index.json")]),
params={"fixed-interval": "1m", "target-index": "test-target-index"},
)

p = source.params()

assert p["fixed-interval"] == "1m"
assert p["source-index"] == "test-source-index"
assert p["target-index"] == "test-target-index"

def test_downsample_source_index_override_default_index_param(self):
source = params.DownsampleParamSource(
track.Track(name="unit-test", indices=[track.Index(name="test-source-index", body="index.json")]),
params={"source-index": "another-index", "fixed-interval": "1m", "target-index": "test-target-index"},
)

p = source.params()

assert p["fixed-interval"] == "1m"
assert p["source-index"] == "another-index"
assert p["target-index"] == "test-target-index"

def test_downsample_empty_params(self):
source = params.DownsampleParamSource(
track.Track(name="unit-test"),
params={},
)

p = source.params()

assert p["fixed-interval"] == "1h"
assert p["target-index"] == f"{p['source-index']}-{p['fixed-interval']}"

0 comments on commit af0c464

Please sign in to comment.