Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exclude telemetry devices based on serverless status #1770

Merged
merged 7 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ def create_es_clients(self):
).create()
return es

def prepare_telemetry(self, es, enable, index_names, data_stream_names, build_hash):
def prepare_telemetry(self, es, enable, index_names, data_stream_names, build_hash, serverless_mode, serverless_operator):
enabled_devices = self.config.opts("telemetry", "devices")
telemetry_params = self.config.opts("telemetry", "params")
log_root = paths.race_root(self.config)
Expand Down Expand Up @@ -649,7 +649,12 @@ def prepare_telemetry(self, es, enable, index_names, data_stream_names, build_ha
]
else:
devices = []
self.telemetry = telemetry.Telemetry(enabled_devices, devices=devices)
self.telemetry = telemetry.Telemetry(
enabled_devices,
devices=devices,
serverless_mode=serverless_mode,
serverless_operator=serverless_operator,
)

def wait_for_rest_api(self, es):
es_default = es["default"]
Expand Down Expand Up @@ -706,6 +711,8 @@ def prepare_benchmark(self, t):

skip_rest_api_check = self.config.opts("mechanic", "skip.rest.api.check")
uses_static_responses = self.config.opts("client", "options").uses_static_responses
serverless_mode = False
serverless_operator = False
build_hash = None
if skip_rest_api_check:
self.logger.info("Skipping REST API check as requested explicitly.")
Expand All @@ -729,6 +736,8 @@ def prepare_benchmark(self, t):
index_names=self.track.index_names(),
data_stream_names=self.track.data_stream_names(),
build_hash=build_hash,
serverless_mode=serverless_mode,
serverless_operator=serverless_operator,
)

for host in self.config.opts("driver", "load_driver_hosts"):
Expand Down
2 changes: 1 addition & 1 deletion esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2575,7 +2575,7 @@ class Composite(Runner):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Since Composite is marked as ServerlessStatus.Public, only add public
# Since Composite is marked as serverless.Status.Public, only add public
# operation types here.
self.supported_op_types = [
"open-point-in-time",
Expand Down
39 changes: 37 additions & 2 deletions esrally/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from esrally import exceptions, metrics, time
from esrally.metrics import MetaInfoScope
from esrally.utils import console, io, opts, process, sysstats
from esrally.utils import console, io, opts, process, serverless, sysstats
from esrally.utils.versions import Version


Expand Down Expand Up @@ -55,13 +55,15 @@ def list_telemetry():


class Telemetry:
def __init__(self, enabled_devices=None, devices=None):
def __init__(self, enabled_devices=None, devices=None, serverless_mode=False, serverless_operator=False):
if devices is None:
devices = []
if enabled_devices is None:
enabled_devices = []
self.enabled_devices = enabled_devices
self.devices = devices
self.serverless_mode = serverless_mode
self.serverless_operator = serverless_operator

def instrument_candidate_java_opts(self):
opts = []
Expand Down Expand Up @@ -90,6 +92,11 @@ def detach_from_node(self, node, running):
def on_benchmark_start(self):
for device in self.devices:
if self._enabled(device):
if self.serverless_mode and not self._available_on_serverless(device):
if getattr(device, "command", None) in self.enabled_devices:
b-deam marked this conversation as resolved.
Show resolved Hide resolved
console.info(f"Excluding telemetry device [{device.command}] as it is unavailable on serverless.")
continue

device.on_benchmark_start()

def on_benchmark_stop(self):
Expand All @@ -105,6 +112,12 @@ def store_system_metrics(self, node, metrics_store):
def _enabled(self, device):
return device.internal or device.command in self.enabled_devices

def _available_on_serverless(self, device):
if self.serverless_operator:
return device.serverless_status >= serverless.Status.Internal
else:
return device.serverless_status == serverless.Status.Public


########################################################################################
#
Expand Down Expand Up @@ -340,6 +353,7 @@ def detach_from_node(self, node, running):

class SegmentStats(TelemetryDevice):
internal = False
serverless_status = serverless.Status.Internal
command = "segment-stats"
human_name = "Segment Stats"
help = "Determines segment stats at the end of the benchmark."
Expand All @@ -363,6 +377,7 @@ def on_benchmark_stop(self):

class CcrStats(TelemetryDevice):
internal = False
serverless_status = serverless.Status.Blocked
command = "ccr-stats"
human_name = "CCR Stats"
help = "Regularly samples Cross Cluster Replication (CCR) related stats"
Expand Down Expand Up @@ -507,6 +522,7 @@ def record_stats_per_index(self, name, stats):

class RecoveryStats(TelemetryDevice):
internal = False
serverless_status = serverless.Status.Internal
command = "recovery-stats"
human_name = "Recovery Stats"
help = "Regularly samples shard recovery stats"
Expand Down Expand Up @@ -633,6 +649,7 @@ class ShardStats(TelemetryDevice):
"""

internal = False
serverless_status = serverless.Status.Internal
command = "shard-stats"
human_name = "Shard Stats"
help = "Regularly samples nodes stats at shard level"
Expand Down Expand Up @@ -740,6 +757,7 @@ class NodeStats(TelemetryDevice):
"""

internal = False
serverless_status = serverless.Status.Internal
command = "node-stats"
human_name = "Node Stats"
help = "Regularly samples node stats"
Expand Down Expand Up @@ -924,6 +942,7 @@ def sample(self):

class TransformStats(TelemetryDevice):
internal = False
serverless_status = serverless.Status.Public
command = "transform-stats"
human_name = "Transform Stats"
help = "Regularly samples transform stats"
Expand Down Expand Up @@ -1102,6 +1121,7 @@ def record_stats_per_transform(self, transform_id, stats, prefix=""):

class SearchableSnapshotsStats(TelemetryDevice):
internal = False
serverless_status = serverless.Status.Blocked
command = "searchable-snapshots-stats"
human_name = "Searchable Snapshots Stats"
help = "Regularly samples searchable snapshots stats"
Expand Down Expand Up @@ -1297,6 +1317,7 @@ class DataStreamStats(TelemetryDevice):
"""

internal = False
serverless_status = serverless.Status.Public
command = "data-stream-stats"
human_name = "Data Stream Stats"
help = "Regularly samples data stream stats"
Expand Down Expand Up @@ -1415,6 +1436,7 @@ def record(self):

class IngestPipelineStats(InternalTelemetryDevice):
command = "ingest-pipeline-stats"
serverless_status = serverless.Status.Internal
human_name = "Ingest Pipeline Stats"
help = "Reports Ingest Pipeline stats at the end of the benchmark."

Expand Down Expand Up @@ -1762,6 +1784,8 @@ class ClusterEnvironmentInfo(InternalTelemetryDevice):
Gathers static environment information on a cluster level (e.g. version numbers).
"""

serverless_status = serverless.Status.Public

def __init__(self, client, metrics_store, revision_override):
super().__init__()
self.metrics_store = metrics_store
Expand Down Expand Up @@ -1818,6 +1842,8 @@ class ExternalEnvironmentInfo(InternalTelemetryDevice):
Gathers static environment information for externally provisioned clusters.
"""

serverless_status = serverless.Status.Internal

def __init__(self, client, metrics_store):
super().__init__()
self.metrics_store = metrics_store
Expand Down Expand Up @@ -1862,6 +1888,8 @@ class JvmStatsSummary(InternalTelemetryDevice):
Gathers a summary of various JVM statistics during the whole race.
"""

serverless_status = serverless.Status.Internal

def __init__(self, client, metrics_store):
super().__init__()
self.metrics_store = metrics_store
Expand Down Expand Up @@ -1950,6 +1978,8 @@ class IndexStats(InternalTelemetryDevice):
Gathers statistics via the Elasticsearch index stats API
"""

serverless_status = serverless.Status.Internal

def __init__(self, client, metrics_store):
super().__init__()
self.client = client
Expand Down Expand Up @@ -2078,6 +2108,8 @@ def extract_value(self, primaries, path, default_value=None):


class MlBucketProcessingTime(InternalTelemetryDevice):
serverless_status = serverless.Status.Public

def __init__(self, client, metrics_store):
super().__init__()
self.client = client
Expand Down Expand Up @@ -2176,6 +2208,7 @@ class MasterNodeStats(InternalTelemetryDevice):
Collects and pushes the current master node name to the metric store.
"""

serverless_status = serverless.Status.Internal
command = "master-node-stats"
human_name = "Master Node Stats"
help = "Regularly samples master node name"
Expand Down Expand Up @@ -2266,6 +2299,7 @@ class DiskUsageStats(TelemetryDevice):
"""

internal = False
serverless_status = serverless.Status.Internal
command = "disk-usage-stats"
human_name = "Disk usage of each field"
help = "Runs the indices disk usage API after benchmarking"
Expand Down Expand Up @@ -2368,6 +2402,7 @@ def handle_telemetry_usage(self, response):

class BlobStoreStats(TelemetryDevice):
internal = False
serverless_status = serverless.Status.Internal
command = "blob-store-stats"
human_name = "Blob Store Stats"
help = "Regularly samples blob store stats, only applicable to serverless Elasticsearch"
Expand Down
18 changes: 14 additions & 4 deletions esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,17 @@
from esrally import PROGRAM_NAME, config, exceptions, paths, time, version
from esrally.track import params, track
from esrally.track.track import Parallel
from esrally.utils import collections, console, convert, io, modules, net, opts, repo
from esrally.utils import (
collections,
console,
convert,
io,
modules,
net,
opts,
repo,
serverless,
)


class TrackSyntaxError(exceptions.InvalidSyntax):
Expand Down Expand Up @@ -908,11 +918,11 @@ def _is_filtered_task(self, operation):

try:
op = track.OperationType.from_hyphenated_string(operation.type)
# Comparisons rely on the ordering of auto() in track.ServerlessStatus which is an IntEnum
# Comparisons rely on the ordering in serverless.Status which is an IntEnum
if self.serverless_operator:
return op.serverless_status < track.ServerlessStatus.Internal
return op.serverless_status < serverless.Status.Internal
else:
return op.serverless_status < track.ServerlessStatus.Public
return op.serverless_status < serverless.Status.Public
except KeyError:
self.logger.info("Treating user-provided operation type [%s] for operation [%s] as public.", operation.type, operation.name)
return False
Expand Down
Loading