Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exclude tasks based on serverless status #1760

Merged
merged 12 commits into from
Aug 23, 2023
1 change: 1 addition & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ The ``schedule`` element contains a list of tasks that are executed by Rally, i.
* ``target-throughput`` (optional): Defines the benchmark mode. If it is not defined, Rally assumes this is a throughput benchmark and will run the task as fast as it can. This is mostly needed for batch-style operations where it is more important to achieve the best throughput instead of an acceptable latency. If it is defined, it specifies the number of requests per second over all clients. E.g. if you specify ``target-throughput: 1000`` with 8 clients, it means that each client will issue 125 (= 1000 / 8) requests per second. In total, all clients will issue 1000 requests each second. If Rally reports less than the specified throughput then Elasticsearch simply cannot reach it.
* ``target-interval`` (optional): This is just ``1 / target-throughput`` (in seconds) and may be more convenient for cases where the throughput is less than one operation per second. Define either ``target-throughput`` or ``target-interval`` but not both (otherwise Rally will raise an error).
* ``ignore-response-error-level`` (optional): Controls whether to ignore errors encountered during task execution when the benchmark is run with :ref:`on-error=abort <command_line_reference_on_error>`. The only allowable value is ``non-fatal`` which, combined with the cli option ``--on-error=abort``, will ignore non-fatal errors during the execution of the task.
* ``run-on-serverless`` (optional, default to unset): By default, Rally skips operations that are not supported in Elasticsearch Serverless, such as legacy index templates. Setting this option to ``true`` or ``false`` will override that detection.

.. note::

Expand Down
4 changes: 2 additions & 2 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,8 @@ def prepare_benchmark(self, t):
else:
self.wait_for_rest_api(es_clients)
self.target.cluster_details = self.retrieve_cluster_info(es_clients)
serverless_mode = self.config.opts("driver", "serverless.mode")
serverless_operator = self.config.opts("driver", "serverless.operator")
serverless_mode = convert.to_bool(self.config.opts("driver", "serverless.mode", mandatory=False, default_value=False))
serverless_operator = convert.to_bool(self.config.opts("driver", "serverless.operator", mandatory=False, default_value=False))
if serverless_mode and serverless_operator:
build_hash = self.retrieve_build_hash_from_nodes_info(es_clients)
self.logger.info("Retrieved actual build hash [%s] from serverless cluster.", build_hash)
Expand Down
2 changes: 2 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2575,6 +2575,8 @@ class Composite(Runner):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Since Composite is marked as ServerlessStatus.Public, only add public
# operation types here.
self.supported_op_types = [
"open-point-in-time",
"close-point-in-time",
Expand Down
17 changes: 8 additions & 9 deletions esrally/racecontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,6 @@ def __init__(self, cfg):
self.current_challenge = None

def setup(self, sources=False):
serverless_mode = False
serverless_operator = False

# to load the track we need to know the correct cluster distribution version. Usually, this value should be set
# but there are rare cases (external pipeline and user did not specify the distribution version) where we need
# to derive it ourselves. For source builds we always assume "main"
Expand All @@ -202,9 +199,12 @@ def setup(self, sources=False):
self.cfg.add(config.Scope.benchmark, "mechanic", "distribution.version", distribution_version)
self.cfg.add(config.Scope.benchmark, "mechanic", "distribution.flavor", distribution_flavor)
if versions.is_serverless(distribution_flavor):
serverless_mode = True
# operator privileges assumed for now
serverless_operator = True
if not self.cfg.exists("driver", "serverless.mode"):
self.cfg.add(config.Scope.benchmark, "driver", "serverless.mode", True)

if not self.cfg.exists("driver", "serverless.operator"):
# operator privileges assumed for now
self.cfg.add(config.Scope.benchmark, "driver", "serverless.operator", True)
else:
min_es_version = versions.Version.from_string(version.minimum_es_version())
specified_version = versions.Version.from_string(distribution_version)
Expand All @@ -213,9 +213,6 @@ def setup(self, sources=False):
f"Cluster version must be at least [{min_es_version}] but was [{distribution_version}]"
)

self.cfg.add(config.Scope.benchmark, "driver", "serverless.mode", serverless_mode)
self.cfg.add(config.Scope.benchmark, "driver", "serverless.operator", serverless_operator)

self.current_track = track.load_track(self.cfg, install_dependencies=True)
self.track_revision = self.cfg.opts("track", "repository.revision", mandatory=False)
challenge_name = self.cfg.opts("track", "challenge.name")
Expand All @@ -228,6 +225,8 @@ def setup(self, sources=False):
)
if self.current_challenge.user_info:
console.info(self.current_challenge.user_info)
for message in self.current_challenge.serverless_info:
console.info(message)
self.race = metrics.create_race(self.cfg, self.current_track, self.current_challenge, self.track_revision)

self.metrics_store = metrics.metrics_store(
Expand Down
4 changes: 4 additions & 0 deletions esrally/resources/track-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@
"ignore-response-error-level": {
"type": "string",
"description": "Overrides non-fatal error handling globally defined via the cli arg on-error=abort. Only 'non-fatal' may be defined which will continue unless a fatal error occurs."
},
"run-on-serverless": {
"type": "boolean",
"description": "Overrides automatic skip of unsupported serverless operations. Can be set to true or false."
}
},
"required": [
Expand Down
49 changes: 48 additions & 1 deletion esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

from esrally import PROGRAM_NAME, config, exceptions, paths, time, version
from esrally.track import params, track
from esrally.track.track import Parallel
from esrally.utils import collections, console, convert, io, modules, net, opts, repo


Expand Down Expand Up @@ -79,7 +80,7 @@ def on_prepare_track(self, track: track.Track, data_root_dir: str) -> Generator[

class TrackProcessorRegistry:
def __init__(self, cfg):
self.required_processors = [TaskFilterTrackProcessor(cfg), TestModeTrackProcessor(cfg)]
self.required_processors = [TaskFilterTrackProcessor(cfg), ServerlessFilterTrackProcessor(cfg), TestModeTrackProcessor(cfg)]
self.track_processors = []
self.offline = cfg.opts("system", "offline.mode")
self.test_mode = cfg.opts("track", "test.mode.enabled", mandatory=False, default_value=False)
Expand Down Expand Up @@ -892,6 +893,52 @@ def on_after_load_track(self, track):
return track


class ServerlessFilterTrackProcessor(TrackProcessor):
def __init__(self, cfg):
self.logger = logging.getLogger(__name__)
self.serverless_mode = convert.to_bool(cfg.opts("driver", "serverless.mode", mandatory=False, default_value=False))
self.serverless_operator = convert.to_bool(cfg.opts("driver", "serverless.operator", mandatory=False, default_value=False))

def _is_filtered_task(self, operation):
if operation.run_on_serverless is not None:
return not operation.run_on_serverless

if operation.type == "raw-request":
self.logger.info("Treating raw-request operation for operation [%s] as public.", operation.name)

try:
op = track.OperationType.from_hyphenated_string(operation.type)
# Comparisons rely on the ordering of auto() in track.ServerlessStatus which is an IntEnum
if self.serverless_operator:
return op.serverless_status < track.ServerlessStatus.Internal
else:
return op.serverless_status < track.ServerlessStatus.Public
gbanasiak marked this conversation as resolved.
Show resolved Hide resolved
except KeyError:
self.logger.info("Treating user-provided operation type [%s] for operation [%s] as public.", operation.type, operation.name)
return False

def on_after_load_track(self, track):
if not self.serverless_mode:
return track

for challenge in track.challenges:
# don't modify the schedule while iterating over it
tasks_to_remove = []
for task in challenge.schedule:
if isinstance(task, Parallel):
challenge.serverless_info.append(f"Treating parallel task in challenge [{challenge}] as public.")
gbanasiak marked this conversation as resolved.
Show resolved Hide resolved
elif self._is_filtered_task(task.operation):
tasks_to_remove.append(task)
for task in tasks_to_remove:
challenge.remove_task(task)

if tasks_to_remove:
task_str = ", ".join(f"[{task}]" for task in tasks_to_remove)
challenge.serverless_info.append(f"Excluding {task_str} as challenge [{challenge}] is run on serverless.")

return track


class TestModeTrackProcessor(TrackProcessor):
def __init__(self, cfg):
self.test_mode_enabled = cfg.opts("track", "test.mode.enabled", mandatory=False, default_value=False)
Expand Down
146 changes: 87 additions & 59 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import collections
import numbers
import re
from enum import Enum, unique
from enum import Enum, IntEnum, auto, unique

from esrally import exceptions

Expand Down Expand Up @@ -611,6 +611,7 @@ def __init__(
self.meta_data = meta_data if meta_data else {}
self.description = description
self.user_info = user_info
self.serverless_info = []
self.default = default
self.selected = selected
self.auto_generated = auto_generated
Expand Down Expand Up @@ -665,71 +666,94 @@ def __eq__(self, othr):
)


@unique
class AdminStatus(Enum):
# We can't use True/False as they are keywords
Yes = auto()
No = auto()


@unique
class ServerlessStatus(IntEnum):
Blocked = auto()
Internal = auto()
Public = auto()


@unique
class OperationType(Enum):
# for the time being we are not considering this action as administrative
IndexStats = 1
NodeStats = 2
Search = 3
Bulk = 4
RawRequest = 5
WaitForRecovery = 6
WaitForSnapshotCreate = 7
Composite = 8
SubmitAsyncSearch = 9
GetAsyncSearch = 10
DeleteAsyncSearch = 11
PaginatedSearch = 12
ScrollSearch = 13
OpenPointInTime = 14
ClosePointInTime = 15
Sql = 16
FieldCaps = 17
CompositeAgg = 18
WaitForCurrentSnapshotsCreate = 19
Downsample = 20
# TODO replace manual counts with auto() when we drop support for Python 3.10
# https://docs.python.org/3/library/enum.html#enum.auto
IndexStats = (1, AdminStatus.No, ServerlessStatus.Internal)
NodeStats = (2, AdminStatus.No, ServerlessStatus.Internal)
Search = (3, AdminStatus.No, ServerlessStatus.Public)
Bulk = (4, AdminStatus.No, ServerlessStatus.Public)
# Public as we can't verify the actual status
RawRequest = (5, AdminStatus.No, ServerlessStatus.Public)
gbanasiak marked this conversation as resolved.
Show resolved Hide resolved
WaitForRecovery = (6, AdminStatus.No, ServerlessStatus.Internal)
WaitForSnapshotCreate = (7, AdminStatus.No, ServerlessStatus.Internal)
# Public as all supported operation types are Public too (including RawRequest as
# mentioned above)
Composite = (8, AdminStatus.No, ServerlessStatus.Public)
SubmitAsyncSearch = (9, AdminStatus.No, ServerlessStatus.Public)
GetAsyncSearch = (10, AdminStatus.No, ServerlessStatus.Public)
DeleteAsyncSearch = (11, AdminStatus.No, ServerlessStatus.Public)
PaginatedSearch = (12, AdminStatus.No, ServerlessStatus.Public)
ScrollSearch = (13, AdminStatus.No, ServerlessStatus.Public)
OpenPointInTime = (14, AdminStatus.No, ServerlessStatus.Public)
ClosePointInTime = (15, AdminStatus.No, ServerlessStatus.Public)
Sql = (16, AdminStatus.No, ServerlessStatus.Public)
FieldCaps = (17, AdminStatus.No, ServerlessStatus.Public)
CompositeAgg = (18, AdminStatus.No, ServerlessStatus.Public)
WaitForCurrentSnapshotsCreate = (19, AdminStatus.No, ServerlessStatus.Internal)
Downsample = (20, AdminStatus.No, ServerlessStatus.Internal)

# administrative actions
ForceMerge = 1001
ClusterHealth = 1002
PutPipeline = 1003
Refresh = 1004
CreateIndex = 1005
DeleteIndex = 1006
CreateIndexTemplate = 1007
DeleteIndexTemplate = 1008
ShrinkIndex = 1009
CreateMlDatafeed = 1010
DeleteMlDatafeed = 1011
StartMlDatafeed = 1012
StopMlDatafeed = 1013
CreateMlJob = 1014
DeleteMlJob = 1015
OpenMlJob = 1016
CloseMlJob = 1017
Sleep = 1018
DeleteSnapshotRepository = 1019
CreateSnapshotRepository = 1020
CreateSnapshot = 1021
RestoreSnapshot = 1022
PutSettings = 1023
CreateTransform = 1024
StartTransform = 1025
WaitForTransform = 1026
DeleteTransform = 1027
CreateDataStream = 1028
DeleteDataStream = 1029
CreateComposableTemplate = 1030
DeleteComposableTemplate = 1031
CreateComponentTemplate = 1032
DeleteComponentTemplate = 1033
TransformStats = 1034
CreateIlmPolicy = 1035
DeleteIlmPolicy = 1036
ForceMerge = (21, AdminStatus.Yes, ServerlessStatus.Internal)
ClusterHealth = (22, AdminStatus.Yes, ServerlessStatus.Internal)
PutPipeline = (23, AdminStatus.Yes, ServerlessStatus.Public)
Refresh = (24, AdminStatus.Yes, ServerlessStatus.Public)
CreateIndex = (25, AdminStatus.Yes, ServerlessStatus.Public)
DeleteIndex = (26, AdminStatus.Yes, ServerlessStatus.Public)
CreateIndexTemplate = (27, AdminStatus.Yes, ServerlessStatus.Blocked)
DeleteIndexTemplate = (28, AdminStatus.Yes, ServerlessStatus.Blocked)
ShrinkIndex = (29, AdminStatus.Yes, ServerlessStatus.Blocked)
CreateMlDatafeed = (30, AdminStatus.Yes, ServerlessStatus.Public)
DeleteMlDatafeed = (31, AdminStatus.Yes, ServerlessStatus.Public)
StartMlDatafeed = (32, AdminStatus.Yes, ServerlessStatus.Public)
StopMlDatafeed = (33, AdminStatus.Yes, ServerlessStatus.Public)
CreateMlJob = (34, AdminStatus.Yes, ServerlessStatus.Public)
DeleteMlJob = (35, AdminStatus.Yes, ServerlessStatus.Public)
OpenMlJob = (36, AdminStatus.Yes, ServerlessStatus.Public)
CloseMlJob = (37, AdminStatus.Yes, ServerlessStatus.Public)
Sleep = (38, AdminStatus.Yes, ServerlessStatus.Public)
DeleteSnapshotRepository = (39, AdminStatus.Yes, ServerlessStatus.Internal)
CreateSnapshotRepository = (40, AdminStatus.Yes, ServerlessStatus.Internal)
CreateSnapshot = (41, AdminStatus.Yes, ServerlessStatus.Internal)
RestoreSnapshot = (42, AdminStatus.Yes, ServerlessStatus.Internal)
PutSettings = (43, AdminStatus.Yes, ServerlessStatus.Internal)
CreateTransform = (44, AdminStatus.Yes, ServerlessStatus.Public)
StartTransform = (45, AdminStatus.Yes, ServerlessStatus.Public)
WaitForTransform = (46, AdminStatus.Yes, ServerlessStatus.Public)
DeleteTransform = (47, AdminStatus.Yes, ServerlessStatus.Public)
CreateDataStream = (48, AdminStatus.Yes, ServerlessStatus.Public)
DeleteDataStream = (49, AdminStatus.Yes, ServerlessStatus.Public)
CreateComposableTemplate = (50, AdminStatus.Yes, ServerlessStatus.Public)
DeleteComposableTemplate = (51, AdminStatus.Yes, ServerlessStatus.Public)
CreateComponentTemplate = (52, AdminStatus.Yes, ServerlessStatus.Public)
DeleteComponentTemplate = (53, AdminStatus.Yes, ServerlessStatus.Public)
TransformStats = (54, AdminStatus.Yes, ServerlessStatus.Public)
CreateIlmPolicy = (55, AdminStatus.Yes, ServerlessStatus.Blocked)
DeleteIlmPolicy = (56, AdminStatus.Yes, ServerlessStatus.Blocked)

def __init__(self, id: int, admin_status: AdminStatus, serverless_status: ServerlessStatus):
self.id = id
self.admin_status = admin_status
self.serverless_status = serverless_status

@property
def admin_op(self):
return self.value > 1000
return self.admin_status == AdminStatus.Yes

def to_hyphenated_string(self):
"""
Expand Down Expand Up @@ -1150,6 +1174,10 @@ def __init__(self, name, operation_type, meta_data=None, params=None, param_sour
def include_in_reporting(self):
return self.params.get("include-in-reporting", True)

@property
def run_on_serverless(self):
return self.params.get("run-on-serverless", None)

def __hash__(self):
return hash(self.name)

Expand Down
Loading