Skip to content

Commit

Permalink
Add looped mode to bulk operation (#1830)
Browse files Browse the repository at this point in the history
Adds looped mode to bulk operation. That is useful in benchmarks where
we want Rally to ingest for a specific amount of time even if corpus is
not big enough to sustain indexing at a specific rate for this time.

This mode is meant to be used together with `time-period` or
`iterations` specified at the task level. Otherwise Rally will never
complete the task.
  • Loading branch information
gbanasiak committed Feb 21, 2024
1 parent ac2cef9 commit 43e56f6
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ Properties
* ``detailed-results`` (optional, defaults to ``false``): Records more detailed meta-data for bulk requests. As it analyzes the corresponding bulk response in more detail, this might incur additional overhead which can skew measurement results. See the section below for the meta-data that are returned. This property must be set to ``true`` for individual bulk request failures to be logged by Rally.
* ``timeout`` (optional, defaults to ``1m``): Defines the `time period that Elasticsearch will wait per action <https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-query-params>`_ until it has finished processing the following operations: automatic index creation, dynamic mapping updates, waiting for active shards.
* ``refresh`` (optional): Control Elasticsearch refresh behavior for bulk requests via the ``refresh`` bulk API query parameter. Valid values are ``true``, ``wait_for``, and ``false``. Parameter values are specified as a string. If ``true``, Elasticsearch will refresh target shards in the background. If ``wait_for``, Elasticsearch blocks bulk requests until affected shards have been refreshed. If ``false``, Elasticsearch will use the default refresh behavior.
* ``looped`` (optional, defaults to ``false``): If set to ``true``, ``bulk`` operation will continue from the beginning upon completing the corpus. This option should be combined with ``time-period`` or ``iteration`` properties at the task level, otherwise Rally will never finish the task.

With multiple ``clients``, Rally will split each document using as many splits as there are ``clients``. This ensures that the bulk index operations are efficiently parallelized but has the drawback that the ingestion is not done in the order of each document. For example, if ``clients`` is set to 2, one client will index the document starting from the beginning, while the other will index starting from the middle.

Expand Down
12 changes: 11 additions & 1 deletion esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ def __init__(self, track, params, **kwargs):

self.ingest_percentage = self.float_param(params, name="ingest-percentage", default_value=100, min_value=0, max_value=100)
self.refresh = params.get("refresh")
self.looped = params.get("looped", False)
self.param_source = PartitionBulkIndexParamSource(
self.corpora,
self.batch_size,
Expand All @@ -646,6 +647,7 @@ def __init__(self, track, params, **kwargs):
self.recency,
self.pipeline,
self.refresh,
self.looped,
self._params,
)

Expand Down Expand Up @@ -708,6 +710,7 @@ def __init__(
recency,
pipeline=None,
refresh=None,
looped: bool = False,
original_params=None,
):
"""
Expand All @@ -726,6 +729,7 @@ def __init__(
If "true", Elasticsearch refreshes the affected shards in the background.
If "wait_for", the client is blocked until Elasticsearch finishes the refresh operation.
If "false", Elasticsearch will use the default refresh behavior.
:param looped: Set to True for looped mode where bulk requests are repeated from the beginning when entire corpus was ingested.
:param original_params: The original dict passed to the parent parameter source.
"""
self.corpora = corpora
Expand All @@ -740,6 +744,7 @@ def __init__(
self.recency = recency
self.pipeline = pipeline
self.refresh = refresh
self.looped = looped
self.original_params = original_params
# this is only intended for unit-testing
self.create_reader = original_params.pop("__create_reader", create_default_reader)
Expand All @@ -763,7 +768,12 @@ def params(self):
# self.internal_params always reads all files. This is necessary to ensure we terminate early in case
# the user has specified ingest percentage.
if self.current_bulk == self.total_bulks:
raise StopIteration()
# start from the beginning in looped mode, otherwise stop the run
if self.looped:
self.current_bulk = 0
self._init_internal_params()
else:
raise StopIteration()
self.current_bulk += 1
return next(self.internal_params)

Expand Down
45 changes: 45 additions & 0 deletions tests/track/params_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,51 @@ def schedule(param_source):
assert partition.total_bulks == 3
assert len(list(schedule(partition))) == 3

def test_looped_mode(self):
def create_unit_test_reader(*args):
return StaticBulkReader(
"idx",
"doc",
bulks=[
['{"location" : [-0.1485188, 51.5250666]}'],
['{"location" : [-0.1479949, 51.5252071]}'],
],
)

corpora = [
track.DocumentCorpus(
name="default",
documents=[
track.Documents(
source_format=track.Documents.SOURCE_FORMAT_BULK,
number_of_documents=2,
target_index="test-idx",
target_type="test-type",
)
],
),
]

source = params.BulkIndexParamSource(
track=track.Track(name="unit-test", corpora=corpora),
params={
"bulk-size": 2,
"looped": True,
"__create_reader": create_unit_test_reader,
},
)

partition = source.partition(0, 1)
partition.params()
# should issue 1 bulk with the size of 2
assert partition.total_bulks == 1
assert partition.current_bulk == 1

partition.params()
# should have looped back to the beginning
assert partition.total_bulks == 1
assert partition.current_bulk == 1

def test_create_with_conflict_probability_zero(self):
corpus = track.DocumentCorpus(
name="default",
Expand Down

0 comments on commit 43e56f6

Please sign in to comment.