Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exclude tasks based on serverless status #1760

Merged
merged 12 commits into from
Aug 23, 2023
2 changes: 2 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2575,6 +2575,8 @@ class Composite(Runner):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Since Composite is marked as ServerlessStatus.Public, only add public
# operation types here.
self.supported_op_types = [
"open-point-in-time",
"close-point-in-time",
Expand Down
12 changes: 4 additions & 8 deletions esrally/racecontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,6 @@ def __init__(self, cfg):
self.current_challenge = None

def setup(self, sources=False):
serverless_mode = False
serverless_operator = False

# to load the track we need to know the correct cluster distribution version. Usually, this value should be set
# but there are rare cases (external pipeline and user did not specify the distribution version) where we need
# to derive it ourselves. For source builds we always assume "main"
Expand All @@ -202,9 +199,9 @@ def setup(self, sources=False):
self.cfg.add(config.Scope.benchmark, "mechanic", "distribution.version", distribution_version)
self.cfg.add(config.Scope.benchmark, "mechanic", "distribution.flavor", distribution_flavor)
if versions.is_serverless(distribution_flavor):
serverless_mode = True
self.cfg.add(config.Scope.benchmark, "driver", "serverless.mode", True)
# operator privileges assumed for now
serverless_operator = True
self.cfg.add(config.Scope.benchmark, "driver", "serverless.operator", True)
else:
min_es_version = versions.Version.from_string(version.minimum_es_version())
specified_version = versions.Version.from_string(distribution_version)
Expand All @@ -213,9 +210,6 @@ def setup(self, sources=False):
f"Cluster version must be at least [{min_es_version}] but was [{distribution_version}]"
)

self.cfg.add(config.Scope.benchmark, "driver", "serverless.mode", serverless_mode)
self.cfg.add(config.Scope.benchmark, "driver", "serverless.operator", serverless_operator)

self.current_track = track.load_track(self.cfg, install_dependencies=True)
self.track_revision = self.cfg.opts("track", "repository.revision", mandatory=False)
challenge_name = self.cfg.opts("track", "challenge.name")
Expand All @@ -228,6 +222,8 @@ def setup(self, sources=False):
)
if self.current_challenge.user_info:
console.info(self.current_challenge.user_info)
for message in self.current_challenge.serverless_info:
console.info(message)
self.race = metrics.create_race(self.cfg, self.current_track, self.current_challenge, self.track_revision)

self.metrics_store = metrics.metrics_store(
Expand Down
8 changes: 8 additions & 0 deletions esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,11 @@ def configure_reporting_params(args, cfg):
cfg.add(config.Scope.applicationOverride, "reporting", "numbers.align", args.report_numbers_align)


def configure_default_serverless_params(cfg):
cfg.add(config.Scope.benchmark, "driver", "serverless.mode", False)
cfg.add(config.Scope.benchmark, "driver", "serverless.operator", False)


def dispatch_sub_command(arg_parser, args, cfg):
sub_command = args.subcommand

Expand All @@ -1083,6 +1088,7 @@ def dispatch_sub_command(arg_parser, args, cfg):
cfg.add(config.Scope.applicationOverride, "system", "list.to_date", args.to_date)
configure_mechanic_params(args, cfg, command_requires_car=False)
configure_track_params(arg_parser, args, cfg, command_requires_track=False)
configure_default_serverless_params(cfg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a benefit of calling configure_default_serverless_params 3 times in list, delete and info instead of calling it once at the top of dispatch_sub_command ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, it felt weird to set those values for commands where it's not needed. Happy to change this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to keep current form as long as all sub-commands have been tested. Our test coverage is not complete hence the thought of moving it to the top.

dispatch_list(cfg)
elif sub_command == "delete":
cfg.add(config.Scope.applicationOverride, "system", "delete.config.option", args.configuration)
Expand Down Expand Up @@ -1159,6 +1165,7 @@ def dispatch_sub_command(arg_parser, args, cfg):
configure_connection_params(arg_parser, args, cfg)
configure_telemetry_params(args, cfg)
configure_mechanic_params(args, cfg)
configure_default_serverless_params(cfg)
cfg.add(config.Scope.applicationOverride, "mechanic", "runtime.jdk", args.runtime_jdk)
cfg.add(config.Scope.applicationOverride, "mechanic", "source.revision", args.revision)
cfg.add(config.Scope.applicationOverride, "mechanic", "source.build.method", args.source_build_method)
Expand Down Expand Up @@ -1186,6 +1193,7 @@ def dispatch_sub_command(arg_parser, args, cfg):
tracker.create_track(cfg)
elif sub_command == "info":
configure_track_params(arg_parser, args, cfg)
configure_default_serverless_params(cfg)
track.track_info(cfg)
else:
raise exceptions.SystemSetupError(f"Unknown subcommand [{sub_command}]")
Expand Down
40 changes: 39 additions & 1 deletion esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

from esrally import PROGRAM_NAME, config, exceptions, paths, time, version
from esrally.track import params, track
from esrally.track.track import Parallel
from esrally.utils import collections, console, convert, io, modules, net, opts, repo


Expand Down Expand Up @@ -79,7 +80,7 @@ def on_prepare_track(self, track: track.Track, data_root_dir: str) -> Generator[

class TrackProcessorRegistry:
def __init__(self, cfg):
self.required_processors = [TaskFilterTrackProcessor(cfg), TestModeTrackProcessor(cfg)]
self.required_processors = [TaskFilterTrackProcessor(cfg), ServerlessFilterTrackProcessor(cfg), TestModeTrackProcessor(cfg)]
self.track_processors = []
self.offline = cfg.opts("system", "offline.mode")
self.test_mode = cfg.opts("track", "test.mode.enabled", mandatory=False, default_value=False)
Expand Down Expand Up @@ -892,6 +893,43 @@ def on_after_load_track(self, track):
return track


class ServerlessFilterTrackProcessor(TrackProcessor):
def __init__(self, cfg):
self.logger = logging.getLogger(__name__)
self.serverless_mode = cfg.opts("driver", "serverless.mode")
self.serverless_operator = cfg.opts("driver", "serverless.operator")

def _is_filtered_task(self, operation):
try:
op = track.OperationType.from_hyphenated_string(operation.type)
if self.serverless_operator:
return op.serverless_status < track.ServerlessStatus.Internal
else:
return op.serverless_status < track.ServerlessStatus.Public
gbanasiak marked this conversation as resolved.
Show resolved Hide resolved
except KeyError:
self.logger.info("Treating user-provided operation type [%s] for operation [%s] as public.", operation.type, operation.name)
return True
gbanasiak marked this conversation as resolved.
Show resolved Hide resolved

def on_after_load_track(self, track):
if not self.serverless_mode:
return track

for challenge in track.challenges:
# don't modify the schedule while iterating over it
tasks_to_remove = []
for task in challenge.schedule:
if isinstance(task, Parallel):
challenge.serverless_info.append(f"Treating parallel task in challenge [{challenge}] as public.")
gbanasiak marked this conversation as resolved.
Show resolved Hide resolved
elif self._is_filtered_task(task.operation):
tasks_to_remove.append(task)
for task in tasks_to_remove:
challenge.remove_task(task)
task_str = ", ".join(f"[{task}]" for task in tasks_to_remove)
challenge.serverless_info.append(f"Excluding {task_str} as challenge [{challenge}] is run on serverless.")
gbanasiak marked this conversation as resolved.
Show resolved Hide resolved

return track


class TestModeTrackProcessor(TrackProcessor):
def __init__(self, cfg):
self.test_mode_enabled = cfg.opts("track", "test.mode.enabled", mandatory=False, default_value=False)
Expand Down
139 changes: 80 additions & 59 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import collections
import numbers
import re
from enum import Enum, unique
from enum import Enum, IntEnum, auto, unique

from esrally import exceptions

Expand Down Expand Up @@ -611,6 +611,7 @@ def __init__(
self.meta_data = meta_data if meta_data else {}
self.description = description
self.user_info = user_info
self.serverless_info = []
self.default = default
self.selected = selected
self.auto_generated = auto_generated
Expand Down Expand Up @@ -666,70 +667,90 @@ def __eq__(self, othr):


@unique
class AdminStatus(Enum):
# We can't use True/False as they are keywords
Yes = auto()
No = auto()


@unique
class ServerlessStatus(IntEnum):
Blocked = auto()
Internal = auto()
Public = auto()
gbanasiak marked this conversation as resolved.
Show resolved Hide resolved


class OperationType(Enum):
# for the time being we are not considering this action as administrative
IndexStats = 1
NodeStats = 2
Search = 3
Bulk = 4
RawRequest = 5
WaitForRecovery = 6
WaitForSnapshotCreate = 7
Composite = 8
SubmitAsyncSearch = 9
GetAsyncSearch = 10
DeleteAsyncSearch = 11
PaginatedSearch = 12
ScrollSearch = 13
OpenPointInTime = 14
ClosePointInTime = 15
Sql = 16
FieldCaps = 17
CompositeAgg = 18
WaitForCurrentSnapshotsCreate = 19
Downsample = 20
# TODO replace manual counts with auto() when we drop support for Python 3.10
# https://docs.python.org/3/library/enum.html#enum.auto
IndexStats = (1, AdminStatus.No, ServerlessStatus.Internal)
NodeStats = (2, AdminStatus.No, ServerlessStatus.Internal)
Search = (3, AdminStatus.No, ServerlessStatus.Public)
Bulk = (4, AdminStatus.No, ServerlessStatus.Public)
RawRequest = (5, AdminStatus.No, ServerlessStatus.Public)
gbanasiak marked this conversation as resolved.
Show resolved Hide resolved
WaitForRecovery = (6, AdminStatus.No, ServerlessStatus.Internal)
WaitForSnapshotCreate = (7, AdminStatus.No, ServerlessStatus.Internal)
# Public as all supported operation types are Public too
Composite = (8, AdminStatus.No, ServerlessStatus.Public)
gbanasiak marked this conversation as resolved.
Show resolved Hide resolved
SubmitAsyncSearch = (8, AdminStatus.No, ServerlessStatus.Public)
GetAsyncSearch = (9, AdminStatus.No, ServerlessStatus.Public)
DeleteAsyncSearch = (10, AdminStatus.No, ServerlessStatus.Public)
PaginatedSearch = (11, AdminStatus.No, ServerlessStatus.Public)
ScrollSearch = (12, AdminStatus.No, ServerlessStatus.Public)
OpenPointInTime = (13, AdminStatus.No, ServerlessStatus.Public)
ClosePointInTime = (14, AdminStatus.No, ServerlessStatus.Public)
Sql = (15, AdminStatus.No, ServerlessStatus.Public)
FieldCaps = (16, AdminStatus.No, ServerlessStatus.Public)
CompositeAgg = (17, AdminStatus.No, ServerlessStatus.Public)
WaitForCurrentSnapshotsCreate = (18, AdminStatus.No, ServerlessStatus.Internal)
Downsample = (19, AdminStatus.No, ServerlessStatus.Internal)

# administrative actions
ForceMerge = 1001
ClusterHealth = 1002
PutPipeline = 1003
Refresh = 1004
CreateIndex = 1005
DeleteIndex = 1006
CreateIndexTemplate = 1007
DeleteIndexTemplate = 1008
ShrinkIndex = 1009
CreateMlDatafeed = 1010
DeleteMlDatafeed = 1011
StartMlDatafeed = 1012
StopMlDatafeed = 1013
CreateMlJob = 1014
DeleteMlJob = 1015
OpenMlJob = 1016
CloseMlJob = 1017
Sleep = 1018
DeleteSnapshotRepository = 1019
CreateSnapshotRepository = 1020
CreateSnapshot = 1021
RestoreSnapshot = 1022
PutSettings = 1023
CreateTransform = 1024
StartTransform = 1025
WaitForTransform = 1026
DeleteTransform = 1027
CreateDataStream = 1028
DeleteDataStream = 1029
CreateComposableTemplate = 1030
DeleteComposableTemplate = 1031
CreateComponentTemplate = 1032
DeleteComponentTemplate = 1033
TransformStats = 1034
CreateIlmPolicy = 1035
DeleteIlmPolicy = 1036
ForceMerge = (20, AdminStatus.Yes, ServerlessStatus.Internal)
ClusterHealth = (21, AdminStatus.Yes, ServerlessStatus.Internal)
PutPipeline = (22, AdminStatus.Yes, ServerlessStatus.Public)
Refresh = (23, AdminStatus.Yes, ServerlessStatus.Public)
CreateIndex = (24, AdminStatus.Yes, ServerlessStatus.Public)
DeleteIndex = (25, AdminStatus.Yes, ServerlessStatus.Public)
CreateIndexTemplate = (26, AdminStatus.Yes, ServerlessStatus.Blocked)
DeleteIndexTemplate = (27, AdminStatus.Yes, ServerlessStatus.Blocked)
ShrinkIndex = (28, AdminStatus.Yes, ServerlessStatus.Blocked)
CreateMlDatafeed = (29, AdminStatus.Yes, ServerlessStatus.Public)
DeleteMlDatafeed = (30, AdminStatus.Yes, ServerlessStatus.Public)
StartMlDatafeed = (31, AdminStatus.Yes, ServerlessStatus.Public)
StopMlDatafeed = (32, AdminStatus.Yes, ServerlessStatus.Public)
CreateMlJob = (33, AdminStatus.Yes, ServerlessStatus.Public)
DeleteMlJob = (34, AdminStatus.Yes, ServerlessStatus.Public)
OpenMlJob = (35, AdminStatus.Yes, ServerlessStatus.Public)
CloseMlJob = (36, AdminStatus.Yes, ServerlessStatus.Public)
Sleep = (37, AdminStatus.Yes, ServerlessStatus.Public)
DeleteSnapshotRepository = (38, AdminStatus.Yes, ServerlessStatus.Internal)
CreateSnapshotRepository = (39, AdminStatus.Yes, ServerlessStatus.Internal)
CreateSnapshot = (40, AdminStatus.Yes, ServerlessStatus.Internal)
RestoreSnapshot = (41, AdminStatus.Yes, ServerlessStatus.Internal)
PutSettings = (42, AdminStatus.Yes, ServerlessStatus.Internal)
CreateTransform = (43, AdminStatus.Yes, ServerlessStatus.Public)
StartTransform = (44, AdminStatus.Yes, ServerlessStatus.Public)
WaitForTransform = (45, AdminStatus.Yes, ServerlessStatus.Public)
DeleteTransform = (46, AdminStatus.Yes, ServerlessStatus.Public)
CreateDataStream = (47, AdminStatus.Yes, ServerlessStatus.Public)
DeleteDataStream = (48, AdminStatus.Yes, ServerlessStatus.Public)
CreateComposableTemplate = (49, AdminStatus.Yes, ServerlessStatus.Public)
DeleteComposableTemplate = (50, AdminStatus.Yes, ServerlessStatus.Public)
CreateComponentTemplate = (51, AdminStatus.Yes, ServerlessStatus.Public)
DeleteComponentTemplate = (52, AdminStatus.Yes, ServerlessStatus.Public)
TransformStats = (53, AdminStatus.Yes, ServerlessStatus.Public)
CreateIlmPolicy = (54, AdminStatus.Yes, ServerlessStatus.Blocked)
DeleteIlmPolicy = (55, AdminStatus.Yes, ServerlessStatus.Blocked)

def __init__(self, id: int, admin_status: AdminStatus, serverless_status: ServerlessStatus):
self.id = id
self.admin_status = admin_status
self.serverless_status = serverless_status

@property
def admin_op(self):
return self.value > 1000
return self.admin_status == AdminStatus.Yes

def to_hyphenated_string(self):
"""
Expand Down
Loading
Loading