Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support data streams in create-track #1531

Merged
merged 18 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,17 @@ def add_track_source(subparser):
required=True,
help="Name of the generated track",
)
create_track_parser.add_argument(
indices_or_data_streams_group = create_track_parser.add_mutually_exclusive_group(required=True)
indices_or_data_streams_group.add_argument(
"--indices",
type=non_empty_list,
required=True,
help="Comma-separated list of indices to include in the track",
)
indices_or_data_streams_group.add_argument(
"--data-streams",
type=non_empty_list,
help="Comma-separated list of data streams to include in the track",
)
create_track_parser.add_argument(
"--target-hosts",
default="",
Expand Down Expand Up @@ -987,9 +992,16 @@ def dispatch_sub_command(arg_parser, args, cfg):
cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path)
generate(cfg)
elif sub_command == "create-track":
cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices)
cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path)
cfg.add(config.Scope.applicationOverride, "track", "track.name", args.track)
if args.data_streams is not None:
cfg.add(config.Scope.applicationOverride, "generator", "indices", "*")
cfg.add(config.Scope.applicationOverride, "generator", "data_streams", args.data_streams)
cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path)
cfg.add(config.Scope.applicationOverride, "track", "track.name", args.track)
elif args.indices is not None:
cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices)
cfg.add(config.Scope.applicationOverride, "generator", "data_streams", args.data_streams)
cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path)
cfg.add(config.Scope.applicationOverride, "track", "track.name", args.track)
configure_connection_params(arg_parser, args, cfg)

tracker.create_track(cfg)
Expand Down
33 changes: 26 additions & 7 deletions esrally/tracker/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,29 @@ def update_index_setting_parameters(settings):
settings[s] = param.format(orig=orig_value)


def is_valid(index_name):
def is_valid(index_name, flag_data_streams):
if len(index_name) == 0:
return False, "Index name is empty"
if index_name.startswith("."):
if index_name.startswith(".") and not flag_data_streams:
return False, f"Index [{index_name}] is hidden"
return True, None


def extract_index_mapping_and_settings(client, index_pattern):
def extract_index_mapping_and_settings(client, index_pattern, flag_data_streams):
"""
Calls index GET to retrieve mapping + settings, filtering settings
so they can be used to re-create this index
:param client: Elasticsearch client
:param index_pattern: name of index
:param flag_data_streams: boolean variable denotes whether flag data-streams is used
:return: index creation dictionary
"""
results = {}
logger = logging.getLogger(__name__)
# the response might contain multiple indices if a wildcard was provided
response = client.indices.get(index=index_pattern)
response = client.indices.get(index=index_pattern, params={"expand_wildcards": "all"})
for index, details in response.items():
valid, reason = is_valid(index)
valid, reason = is_valid(index, flag_data_streams)
if valid:
mappings = details["mappings"]
index_settings = filter_ephemeral_index_settings(details["settings"]["index"])
Expand All @@ -79,17 +80,18 @@ def extract_index_mapping_and_settings(client, index_pattern):
return results


def extract(client, outdir, index_pattern):
def extract(client, outdir, index_pattern, flag_data_streams):
"""
Request index information to format in "index.json" for Rally
:param client: Elasticsearch client
:param outdir: destination directory
:param index_pattern: name of index
:param flag_data_streams: boolean variable denotes whether flag data-streams is used
:return: Dict of template variables representing the index for use in track
"""
results = []

index_obj = extract_index_mapping_and_settings(client, index_pattern)
index_obj = extract_index_mapping_and_settings(client, index_pattern, flag_data_streams)
for index, details in index_obj.items():
filename = f"{index}.json"
outpath = os.path.join(outdir, filename)
Expand All @@ -104,3 +106,20 @@ def extract(client, outdir, index_pattern):
}
)
return results


def extract_indices_from_data_stream(client, data_stream_pattern):
"""
Calls Elasticsearch client get_data_stream function to retrieve list of indices
:param client: Elasticsearch client
:param data_stream_pattern: name of data stream
:return: list of index names
"""
results = []
# the response might contain multiple indices if a wildcard was provided
params_defined = {"expand_wildcards": "all", "filter_path": "data_streams.name"}
results_data_streams = client.indices.get_data_stream(name=data_stream_pattern, params=params_defined)

for indices in results_data_streams["data_streams"]:
results.append(indices.get("name"))
return results
30 changes: 25 additions & 5 deletions esrally/tracker/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,27 @@ def process_template(templates_path, template_filename, template_vars, output_pa
f.write(template.render(template_vars))


def extract_mappings_and_corpora(client, output_path, indices_to_extract):
def extract_indices_from_data_streams(client, data_streams_to_extract):
indices = []
# first extract index metadata (which is cheap) and defer extracting data to reduce the potential for
# errors due to invalid index names late in the process.
for data_stream_name in data_streams_to_extract:
try:
indices += index.extract_indices_from_data_stream(client, data_stream_name)
except ElasticsearchException:
logging.getLogger(__name__).exception("Failed to extract indices from data stream [%s]", data_stream_name)

return indices


def extract_mappings_and_corpora(client, output_path, indices_to_extract, flag_data_streams):
indices = []
corpora = []
# first extract index metadata (which is cheap) and defer extracting data to reduce the potential for
# errors due to invalid index names late in the process.
for index_name in indices_to_extract:
try:
indices += index.extract(client, output_path, index_name)
indices += index.extract(client, output_path, index_name, flag_data_streams)
except ElasticsearchException:
logging.getLogger(__name__).exception("Failed to extract index [%s]", index_name)

Expand All @@ -63,8 +76,8 @@ def create_track(cfg):
root_path = cfg.opts("generator", "output.path")
target_hosts = cfg.opts("client", "hosts")
client_options = cfg.opts("client", "options")

logger.info("Creating track [%s] matching indices [%s]", track_name, indices)
data_streams = cfg.opts("generator", "data_streams")
flag_data_streams = False

client = EsClientFactory(
hosts=target_hosts.all_hosts[opts.TargetHosts.DEFAULT], client_options=client_options.all_client_options[opts.TargetHosts.DEFAULT]
Expand All @@ -76,7 +89,14 @@ def create_track(cfg):
output_path = os.path.abspath(os.path.join(io.normalize_path(root_path), track_name))
io.ensure_dir(output_path)

indices, corpora = extract_mappings_and_corpora(client, output_path, indices)
if data_streams is not None:
flag_data_streams = True
logger.info("Creating track [%s] matching data streams [%s]", track_name, data_streams)
extracted_indices = extract_indices_from_data_streams(client, data_streams)
indices = extracted_indices
logger.info("Creating track [%s] matching indices [%s]", track_name, indices)

indices, corpora = extract_mappings_and_corpora(client, output_path, indices, flag_data_streams)
if len(indices) == 0:
raise RuntimeError("Failed to extract any indices for track!")

Expand Down
2 changes: 1 addition & 1 deletion tests/tracker/index_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,5 @@ def test_extract_index_create(client):
},
},
}
res = extract_index_mapping_and_settings(client, "_all")
res = extract_index_mapping_and_settings(client, "_all", False)
assert res == expected