From de0aecf844bea58dddf4b2aedf7ae3a5c5b1d02f Mon Sep 17 00:00:00 2001 From: Alexander Calhoun Date: Thu, 8 Aug 2019 14:07:16 -0400 Subject: [PATCH] Added common logging and py_es_bulk --- fio-wrapper/__init__.py | 0 fio-wrapper/fio-wrapper.py | 515 ++++++++---------------------------- fio-wrapper/fio_analyzer.py | 136 ++++++++++ fio-wrapper/trigger_fio.py | 288 ++++++++++++++++++++ utils/__init__.py | 0 utils/common_logging.py | 81 ++++++ utils/py_es_bulk.py | 197 ++++++++++++++ 7 files changed, 807 insertions(+), 410 deletions(-) create mode 100644 fio-wrapper/__init__.py create mode 100644 fio-wrapper/fio_analyzer.py create mode 100644 fio-wrapper/trigger_fio.py create mode 100644 utils/__init__.py create mode 100644 utils/common_logging.py create mode 100644 utils/py_es_bulk.py diff --git a/fio-wrapper/__init__.py b/fio-wrapper/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fio-wrapper/fio-wrapper.py b/fio-wrapper/fio-wrapper.py index 911f468a..354fa9cf 100644 --- a/fio-wrapper/fio-wrapper.py +++ b/fio-wrapper/fio-wrapper.py @@ -14,415 +14,38 @@ # This wrapper assumes the following in fiojob # per_job_logs=true # - -import argparse -from datetime import datetime -from copy import deepcopy -from fio_hist_parser import compute_percentiles_from_logs -import re import os import sys -import json -import subprocess -import elasticsearch -import numpy as np -import configparser -import statistics -import time - -_log_files={'bw':{'metric':'bandwidth'},'iops':{'metric':'iops'},'lat':{'metric':'latency'},'clat':{'metric':'latency'},'slat':{'metric':'latency'}} # ,'clat_hist_processed' -_data_direction={0:'read',1:'write',2:'trim'} - - -class Fio_Analyzer: - """ - Fio Analyzer - this class will consume processed fio json results and calculate the average total iops for x number of samples. - results are analyzed based on operation and io size, this is a static evaluation and future enhancements could evalute results based on - other properties dynamically. - """ - def __init__(self, uuid, user): - self.uuid = uuid - self.user = user - self.suffix = "analyzed_result" - self.fio_processed_results_list = [] - self.sample_list = [] - self.operation_list = [] - self.io_size_list = [] - self.sumdoc = {} - - def add_fio_result_documents(self, document_list, starttime): - """ - for each new document add it to the results list with its starttime - """ - for document in document_list: - fio_result = {} - fio_result["document"] = document - fio_result["starttime"] = starttime - self.fio_processed_results_list.append(fio_result) - - - def calculate_iops_sum(self): - """ - will loop through all documents and will populate parameter lists and sum total iops across all host - for a specific operation and io size - """ - - for fio_result in self.fio_processed_results_list: - if fio_result['document']['fio']['jobname'] != 'All clients': - sample = fio_result['document']['sample'] - bs = fio_result['document']['global_options']['bs'] - rw = fio_result['document']['fio']['job options']['rw'] - - if sample not in self.sample_list: self.sample_list.append(sample) - if rw not in self.operation_list: self.operation_list.append(rw) - if bs not in self.io_size_list: self.io_size_list.append(bs) - - for sample in self.sample_list: - self.sumdoc[sample] = {} - for rw in self.operation_list: - self.sumdoc[sample][rw] = {} - for bs in self.io_size_list: - self.sumdoc[sample][rw][bs] = {} - - #get measurements - - for fio_result in self.fio_processed_results_list: - if fio_result['document']['fio']['jobname'] != 'All clients': - sample = fio_result['document']['sample'] - bs = fio_result['document']['global_options']['bs'] - rw = fio_result['document']['fio']['job options']['rw'] - - if not self.sumdoc[sample][rw][bs]: - time_s = fio_result['starttime'] / 1000.0 - self.sumdoc[sample][rw][bs]['date'] = time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime(time_s)) - self.sumdoc[sample][rw][bs]['write'] = 0 - self.sumdoc[sample][rw][bs]['read'] = 0 - - self.sumdoc[sample][rw][bs]['write'] += int(fio_result['document']['fio']["write"]["iops"]) - self.sumdoc[sample][rw][bs]['read'] += int(fio_result['document']['fio']["read"]["iops"]) - - def emit_payload(self): - """ - Will calculate the average iops across multiple samples and return list containing items for each result based on operation/io size - """ - - importdoc = {"ceph_benchmark_test": {"test_data": {}}, - "uuid": self.uuid, - "user": self.user - } - - self.calculate_iops_sum() - payload_list = [] - - for oper in self.operation_list: - for io_size in self.io_size_list: - average_write_result_list = [] - average_read_result_list = [] - total_ary = [] - tmp_doc = {} - tmp_doc['object_size'] = io_size # set document's object size - tmp_doc['operation'] = oper # set documents operation - firstrecord = True - calcuate_percent_std_dev = False - - for itera in self.sample_list: # - average_write_result_list.append(self.sumdoc[itera][oper][io_size]['write']) - average_read_result_list.append(self.sumdoc[itera][oper][io_size]['read']) - - if firstrecord: - importdoc['date'] = self.sumdoc[itera][oper][io_size]['date'] - firstrecord = True - - read_average = (sum(average_read_result_list)/len(average_read_result_list)) - if read_average > 0.0: - tmp_doc['read-iops'] = read_average - if len(average_read_result_list) > 1: - calcuate_percent_std_dev = True - else: - tmp_doc['read-iops'] = 0 - - write_average = (sum(average_write_result_list)/len(average_write_result_list)) - if write_average > 0.0: - tmp_doc['write-iops'] = write_average - if len(average_write_result_list) > 1: - calcuate_percent_std_dev = True - else: - tmp_doc['write-iops'] = 0 - - tmp_doc['total-iops'] = (tmp_doc['write-iops'] + tmp_doc['read-iops']) - - if calcuate_percent_std_dev: - if "read" in oper: - tmp_doc['std-dev-%s' % io_size] = round(((statistics.stdev(average_read_result_list) / read_average) * 100), 3) - elif "write" in oper: - tmp_doc['std-dev-%s' % io_size] = round(((statistics.stdev(average_write_result_list) / write_average) * 100), 3) - elif "randrw" in oper: - tmp_doc['std-dev-%s' % io_size] = round((((statistics.stdev(average_read_result_list) + statistics.stdev(average_write_result_list)) / tmp_doc['total-iops'])* 100), 3) - - importdoc['ceph_benchmark_test']['test_data'] = tmp_doc - payload_list.append(importdoc) - - return payload_list - - -def _document_payload(data, user, uuid, sample, list_hosts, end_time, fio_version, fio_jobs_dict): #pod_details, - processed = [] - fio_starttime = {} - earliest_starttime = float('inf') - for result in data["client_stats"] : - document = { - "uuid": uuid, - "user": user, - "hosts": list_hosts, - "fio-version": fio_version, - "timestamp_end": int(end_time)*1000, #this is in ms - #"nodeName": pod_details["hostname"], - "sample": int(sample), - "fio": result - } - if 'global' in fio_jobs_dict.keys(): - document['global_options'] = fio_jobs_dict['global'] - processed.append(document) - if result['jobname'] != 'All clients': - start_time= (int(end_time) * 1000) - result['job_runtime'] - fio_starttime[result['hostname']] = start_time - if start_time < earliest_starttime: - earliest_starttime = start_time - return processed, fio_starttime, earliest_starttime - -def _log_payload(directory, user, uuid, sample, fio_jobs_dict, fio_version, fio_starttime, list_hosts, job): #pod_details - logs = [] - _current_log_files = deepcopy(_log_files) - job_options = fio_jobs_dict[job] - if 'gtod_reduce' in job_options: - del _current_log_files['slat'] - del _current_log_files['clat'] - del _current_log_files['bw'] - if 'disable_lat' in job_options: - del _current_log_files['lat'] - if 'disable_slat' in job_options: - del _current_log_files['slat'] - if 'disable_clat' in job_options: - del _current_log_files['clat'] - if 'disable_bw' in job_options or 'disable_bw_measurement' in job_options: - del _current_log_files['bw'] - for log in _current_log_files.keys(): - for host in list_hosts: - log_file_prefix_string = 'write_' + str(log) + '_log' - if log in ['clat','slat']: - log_file_prefix_string = 'write_lat_log' - try: - log_file_name = str(job_options[log_file_prefix_string]) + '_' + str(log) + '.1.log.' + str(host) - except KeyError: - try: - log_file_name = str(fio_jobs_dict['global'][log_file_prefix_string]) + '_' + str(log) + '.1.log.' + str(host) - except: - print("Error setting log_file_name") - with open(directory+'/'+str(log_file_name), 'r') as log_file: - for log_line in log_file: - log_line_values = str(log_line).split(", ") - if len(log_line_values) == 4: - log_dict = { - "uuid": uuid, - "user": user, - "host": host, - "fio-version": fio_version, - "job_options": job_options, - "job_name": str(job), - "sample": int(sample), - "log_name": str(log), - "timestamp": int(fio_starttime[host]) + int(log_line_values[0]), #this is in ms - str(_current_log_files[log]['metric']): int(log_line_values[1]), - #"nodeName": pod_details["hostname"], - "data_direction": _data_direction[int(log_line_values[2])], - "offset": int(log_line_values[3]) - } - if 'global' in fio_jobs_dict.keys(): - log_dict['global_options'] = fio_jobs_dict['global'] - logs.append(log_dict) - return logs - -def _histogram_payload(processed_histogram_file, user, uuid, sample, fio_jobs_dict, fio_version, longest_fio_startime, list_hosts, job, numjob=1): #pod_details - logs = [] - with open(processed_histogram_file, 'r') as log_file: - for log_line in log_file: - log_line_values = str(log_line).split(", ") - if len(log_line_values) == 7 and not (any(len(str(x)) <= 0 for x in log_line_values)): - print(log_line_values) - log_dict = { - "uuid": uuid, - "user": user, - "hosts": list_hosts, - "fio-version": fio_version, - "job_options": fio_jobs_dict[job], - "job_name": str(job), - "sample": int(sample), - "log_name": "clat_hist", - "timestamp": int(longest_fio_startime) + int(log_line_values[0]), #this is in ms - "number_samples_histogram": int(log_line_values[1]), - "min": float(log_line_values[2]), - "median": float(log_line_values[3]), - "p95": float(log_line_values[4]), - "p99": float(log_line_values[5]), - "max": float(log_line_values[6]) - } - if 'global' in fio_jobs_dict.keys(): - log_dict['global_options'] = fio_jobs_dict['global'] - logs.append(log_dict) - return logs - -def _index_result(es,document_index_suffix,payload): - index = es['index_prefix'] + '-' + str(document_index_suffix) - es = elasticsearch.Elasticsearch([ - {'host': es['server'],'port': es['port'] }],send_get_body_as='POST') - indexed=True - processed_count = 0 - total_count = 0 - for result in payload: - try: - es.index(index=index, doc_type="result", body=result) - processed_count += 1 - except Exception as e: - print(repr(e) + "occurred for the json document:") - print(str(result)) - indexed=False - total_count += 1 - return indexed, processed_count, total_count - -def _clean_output(fio_output_file): - cmd = ["sed", "-i", "/{/,$!d"] - cmd.append(str(fio_output_file)) - process = subprocess.Popen(cmd, stdout=subprocess.PIPE) - stdout,stderr = process.communicate() - return stdout.strip(), process.returncode - -def _run_fiod(hosts_file, fiojob_file, output_dir, fio_output_file): - cmd = ["fio", "--client=", "path_file" ,"--output-format=json", "--output="] - cmd[1] = "--client=" + str(hosts_file) - cmd[2] = fiojob_file - cmd[4] = "--output=" + str(fio_output_file) - process = subprocess.Popen(cmd, stdout=subprocess.PIPE, cwd=output_dir) - stdout,stderr = process.communicate() - return stdout.strip(), process.returncode - -def _parse_jobfile(job_path): - config = configparser.ConfigParser(allow_no_value=True) - config.read(job_path) - return config +# in order to run need to add parent dir to sys.path +parent_dir = os.path.abspath(os.path.join(__file__ ,"../..")) +sys.path.append(parent_dir) -def _parse_jobs(job_dict, jobs): - job_dicts = {} - if 'global' in job_dict.keys(): - job_dicts['global'] = dict(job_dict['global']) - for job in jobs: - job_dicts[job] = dict(job_dict[job]) - return job_dicts +import argparse +import configparser +import elasticsearch +import time, datetime +import logging +import hashlib +import json +from copy import deepcopy +from fio_analyzer import Fio_Analyzer +from utils.py_es_bulk import streaming_bulk +from utils.common_logging import setup_loggers +import trigger_fio -def _process_histogram(job_dict, hosts, job, working_dir, processed_histogram_prefix, histogram_output_file, numjob=1): - histogram_input_file_list = [] - for host in hosts: - input_file = working_dir + '/' + processed_histogram_prefix + '.' + str(numjob) + '.log.' + str(host) - histogram_input_file_list.append(input_file) - print(histogram_input_file_list) - if 'log_hist_msec' not in job_dict[job].keys(): - if 'global' in job_dict.keys() and 'log_hist_msec' not in job_dict['global'].keys(): - print("log_hist_msec, so can't process histogram logs") - exit(1) - else: - _log_hist_msec = job_dict['global']['log_hist_msec'] - else: - _log_hist_msec = job_dict[job]['log_hist_msec'] - compute_percentiles_from_logs(output_csv_file=histogram_output_file,file_list=histogram_input_file_list,log_hist_msec=_log_hist_msec) +logger = logging.getLogger("fio_wrapper") -def _build_fio_job(job_name, job_dict, parent_dir, fio_job_file_name): - config = configparser.ConfigParser() - if 'global' in job_dict.keys(): - config['global'] = job_dict['global'] - config[job_name] = job_dict[job_name] - if os.path.exists(fio_job_file_name): - os.remove(fio_job_file_name) - print("file " + fio_job_file_name + " already exists. overwriting") - with open(fio_job_file_name, 'w') as configfile: - config.write(configfile, space_around_delimiters=False) +es_log = logging.getLogger("elasticsearch") +es_log.setLevel(logging.CRITICAL) +urllib3_log = logging.getLogger("urllib3") +urllib3_log.setLevel(logging.CRITICAL) -def _trigger_fio(fio_jobs, working_dir, fio_jobs_dict, host_file, user, uuid, sample, fio_analyzer_obj, es, indexed=False, numjob=1): - with open(host_file) as f: - hosts = f.read().splitlines() - for job in fio_jobs: - job_dir = working_dir + '/' + str(job) - if not os.path.exists(job_dir): - os.mkdir(job_dir) - fio_output_file = job_dir + '/' + str('fio-result.json') - fio_job_file = job_dir + '/fiojob' - _build_fio_job(job, fio_jobs_dict, job_dir, fio_job_file) - stdout = _run_fiod(host_file, fio_job_file, job_dir, fio_output_file) - if stdout[1] != 0: - print("Fio failed to execute, trying one more time..") - stdout = _run_fiod(host_file, fio_job_file, job_dir, fio_output_file) - if stdout[1] != 0: - print("Fio failed to execute a second time, stopping...") - exit(1) - clean_stdout = _clean_output(fio_output_file) - if clean_stdout[1] != 0: - print("failed to parse the output file") - exit(1) - print("fio has successfully finished sample {} executing for jobname {} and results are in the dir {}\n".format(sample, job, job_dir)) - if indexed: - with open(fio_output_file) as f: - data = json.load(f) - fio_endtime = int(data['timestamp']) # in epoch seconds - fio_version = data["fio version"] - fio_result_documents, fio_starttime, earliest_starttime = _document_payload(data, user, uuid, sample, hosts, fio_endtime, fio_version, fio_jobs_dict) #hosts_metadata - # Add fio result document to fio analyzer object - fio_analyzer_obj.add_fio_result_documents(fio_result_documents, earliest_starttime) - if indexed: - if len(fio_result_documents) > 0: - _status_results, processed_count, total_count = _index_result(es, 'results', fio_result_documents) - if _status_results: - print("Succesfully indexed " + str(total_count) + " fio result documents to index {}".format(str(es['index_prefix'])+'-results')) - else: - print(str(processed_count) + "/" + str(total_count) + "succesfully indexed") - try: - if fio_jobs_dict[job]['filename_format'] != 'f.\$jobnum.\$filenum' or int(fio_jobs_dict[job]['numjobs']) != 1: - print("filename_format is not 'f.\$jobnum.\$filenum' and/or numjobs is not 1, so can't process logs") - exit(1) - except KeyError: - try: - if fio_jobs_dict['global']['filename_format'] != 'f.\$jobnum.\$filenum' or int(fio_jobs_dict['global']['numjobs']) != 1: - print("filename_format is not 'f.\$jobnum.\$filenum' and/or numjobs is not 1, so can't process logs") - exit(1) - except: - print("Error getting filename_format") - fio_log_documents = _log_payload(job_dir, user, uuid, sample, fio_jobs_dict, fio_version, fio_starttime, hosts, job) - if indexed: - if len(fio_log_documents) > 0: - _status_results, processed_count, total_count = _index_result(es, 'logs', fio_log_documents) - if _status_results: - print("Succesfully indexed " + str(total_count) + " fio logs to index {}".format(str(es['index_prefix'])+'-logs')) - else: - print(str(processed_count) + "/" + str(total_count) + "succesfully indexed") - try: - processed_histogram_prefix = fio_jobs_dict[job]['write_hist_log'] +'_clat_hist' - except KeyError: - try: - processed_histogram_prefix = fio_jobs_dict['global']['write_hist_log'] +'_clat_hist' - except: - print("Error setting processed_histogram_prefix") - histogram_output_file = job_dir + '/' + processed_histogram_prefix + '_processed.' + str(numjob) - _process_histogram(fio_jobs_dict, hosts, job, job_dir, processed_histogram_prefix, histogram_output_file) - histogram_documents = _histogram_payload(histogram_output_file, user, uuid, sample, fio_jobs_dict, fio_version, earliest_starttime, hosts, job) - if indexed: - if len(histogram_documents) > 0: - _status_results, processed_count, total_count =_index_result(es, 'logs', histogram_documents) - if _status_results: - print("Succesfully indexed " + str(total_count) + " processed histogram logs to index {}".format(str(es['index_prefix'])+'-logs')) - else: - print(str(processed_count) + "/" + str(total_count) + "succesfully indexed") - else: - print("Not indexing\n") +setup_loggers("fio_wrapper", logging.DEBUG) def main(): + + #collect arguments parser = argparse.ArgumentParser(description="fio-d Wrapper script") parser.add_argument( 'hosts', help='Provide host file location') @@ -432,18 +55,63 @@ def main(): '-s', '--sample', type=int, default=1, help='number of times to run benchmark, defaults to 1') parser.add_argument( '-d', '--dir', help='output parent directory', default=os.path.dirname(os.getcwd())) + parser.add_argument( + '-hp', '--histogramprocess', help='Process and index histogram results', default=False) args = parser.parse_args() - uuid = "" - user = "" - server = "" + args.index_results = False + args.prefix = "ripsaw-fio-" es={} - index_results = False if "es" in os.environ: es['server'] = os.environ["es"] es['port'] = os.environ["es_port"] - es['index_prefix'] = os.environ["es_index"] - index_results = True + args.prefix = os.environ["es_index"] + args.index_results = True + + es = elasticsearch.Elasticsearch([ + {'host': es['server'],'port': es['port'] }],send_get_body_as='POST') + + #call py es bulk using a process generator to feed it ES documents + res_beg, res_end, res_suc, res_dup, res_fail, res_retry = streaming_bulk(es, process_generator(args)) + + # set up a standard format for time + FMT = '%Y-%m-%dT%H:%M:%SGMT' + start_t = time.strftime('%Y-%m-%dT%H:%M:%SGMT', time.gmtime(res_beg)) + end_t = time.strftime('%Y-%m-%dT%H:%M:%SGMT', time.gmtime(res_end)) + start_t = datetime.datetime.strptime(start_t, FMT) + end_t = datetime.datetime.strptime(end_t, FMT) + + #get time delta for indexing run + tdelta = end_t - start_t + logger.info("Duration of indexing - %s" % tdelta) + logger.info("Indexed results - %s success, %s duplicates, %s failures, with %s retries." % (res_suc, + res_dup, + res_fail, + res_retry)) + +def process_generator(args): + + object_generator = process_data(args) + + for object in object_generator: + for action, index in object.emit_actions(): + + es_valid_document = { "_index": index, + "_type": "result", + "_op_type": "create", + "_source": action, + "_id": "" } + es_valid_document["_id"] = hashlib.md5(str(action).encode()).hexdigest() + #logger.debug(json.dumps(es_valid_document, indent=4)) + yield es_valid_document + +def process_data(args): + uuid = "" + user = "" + server = "" + + index_results = args.index_results + if "uuid" in os.environ: uuid = os.environ["uuid"] if "test_user" in os.environ: @@ -458,14 +126,41 @@ def main(): if 'global' in fio_job_names: fio_job_names.remove('global') fio_jobs_dict = _parse_jobs(_fio_job_dict, fio_job_names) - fio_analyzer_obj = Fio_Analyzer(uuid, user) + document_index_prefix = args.prefix + + fio_analyzer_obj = Fio_Analyzer(uuid, user, document_index_prefix) + #execute fio for X number of samples, yield the trigger_fio_generator for i in range(1, sample + 1): sample_dir = working_dir + '/' + str(i) if not os.path.exists(sample_dir): os.mkdir(sample_dir) - _trigger_fio(fio_job_names, sample_dir, fio_jobs_dict, host_file_path, user, uuid, i, fio_analyzer_obj, es, index_results) - if index_results: - _index_result(es, fio_analyzer_obj.suffix, fio_analyzer_obj.emit_payload()) + trigger_fio_generator = trigger_fio._trigger_fio(fio_job_names, + sample_dir, + fio_jobs_dict, + host_file_path, + user, + uuid, + i, + fio_analyzer_obj, + document_index_prefix, + index_results, + args.histogramprocess) + yield trigger_fio_generator + + yield fio_analyzer_obj + +def _parse_jobs(job_dict, jobs): + job_dicts = {} + if 'global' in job_dict.keys(): + job_dicts['global'] = dict(job_dict['global']) + for job in jobs: + job_dicts[job] = dict(job_dict[job]) + return job_dicts + +def _parse_jobfile(job_path): + config = configparser.ConfigParser(allow_no_value=True) + config.read(job_path) + return config if __name__ == '__main__': sys.exit(main()) diff --git a/fio-wrapper/fio_analyzer.py b/fio-wrapper/fio_analyzer.py new file mode 100644 index 00000000..0d701845 --- /dev/null +++ b/fio-wrapper/fio_analyzer.py @@ -0,0 +1,136 @@ + + +import statistics +import time + + +class Fio_Analyzer: + """ + Fio Analyzer - this class will consume processed fio json results and calculate the average total iops for x number of samples. + results are analyzed based on operation and io size, this is a static evaluation and future enhancements could evalute results based on + other properties dynamically. + """ + def __init__(self, uuid, user, prefix): + self.uuid = uuid + self.user = user + self.prefix = prefix + self.fio_processed_results_list = [] + self.sample_list = [] + self.operation_list = [] + self.io_size_list = [] + self.sumdoc = {} + + def add_fio_result_documents(self, document_list, starttime): + """ + for each new document add it to the results list with its starttime + """ + for document in document_list: + fio_result = {} + fio_result["document"] = document + fio_result["starttime"] = starttime + self.fio_processed_results_list.append(fio_result) + + + def calculate_iops_sum(self): + """ + will loop through all documents and will populate parameter lists and sum total iops across all host + for a specific operation and io size + """ + + for fio_result in self.fio_processed_results_list: + if fio_result['document']['fio']['jobname'] != 'All clients': + sample = fio_result['document']['sample'] + bs = fio_result['document']['global_options']['bs'] + rw = fio_result['document']['fio']['job options']['rw'] + + if sample not in self.sample_list: self.sample_list.append(sample) + if rw not in self.operation_list: self.operation_list.append(rw) + if bs not in self.io_size_list: self.io_size_list.append(bs) + + for sample in self.sample_list: + self.sumdoc[sample] = {} + for rw in self.operation_list: + self.sumdoc[sample][rw] = {} + for bs in self.io_size_list: + self.sumdoc[sample][rw][bs] = {} + + #get measurements + + for fio_result in self.fio_processed_results_list: + if fio_result['document']['fio']['jobname'] != 'All clients': + sample = fio_result['document']['sample'] + bs = fio_result['document']['global_options']['bs'] + rw = fio_result['document']['fio']['job options']['rw'] + + if not self.sumdoc[sample][rw][bs]: + time_s = fio_result['starttime'] / 1000.0 + self.sumdoc[sample][rw][bs]['date'] = time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime(time_s)) + self.sumdoc[sample][rw][bs]['write'] = 0 + self.sumdoc[sample][rw][bs]['read'] = 0 + + self.sumdoc[sample][rw][bs]['write'] += int(fio_result['document']['fio']["write"]["iops"]) + self.sumdoc[sample][rw][bs]['read'] += int(fio_result['document']['fio']["read"]["iops"]) + + def emit_actions(self): + """ + Will calculate the average iops across multiple samples and return list containing items for each result based on operation/io size + """ + + importdoc = {"ceph_benchmark_test": {"test_data": {}}, + "uuid": self.uuid, + "user": self.user + } + + self.calculate_iops_sum() + payload_list = [] + + for oper in self.operation_list: + for io_size in self.io_size_list: + average_write_result_list = [] + average_read_result_list = [] + total_ary = [] + tmp_doc = {} + tmp_doc['object_size'] = io_size # set document's object size + tmp_doc['operation'] = oper # set documents operation + firstrecord = True + calcuate_percent_std_dev = False + + for itera in self.sample_list: # + average_write_result_list.append(self.sumdoc[itera][oper][io_size]['write']) + average_read_result_list.append(self.sumdoc[itera][oper][io_size]['read']) + + if firstrecord: + importdoc['date'] = self.sumdoc[itera][oper][io_size]['date'] + firstrecord = True + + read_average = (sum(average_read_result_list)/len(average_read_result_list)) + if read_average > 0.0: + tmp_doc['read-iops'] = read_average + if len(average_read_result_list) > 1: + calcuate_percent_std_dev = True + else: + tmp_doc['read-iops'] = 0 + + write_average = (sum(average_write_result_list)/len(average_write_result_list)) + if write_average > 0.0: + tmp_doc['write-iops'] = write_average + if len(average_write_result_list) > 1: + calcuate_percent_std_dev = True + else: + tmp_doc['write-iops'] = 0 + + tmp_doc['total-iops'] = (tmp_doc['write-iops'] + tmp_doc['read-iops']) + + if calcuate_percent_std_dev: + if "read" in oper: + tmp_doc['std-dev-%s' % io_size] = round(((statistics.stdev(average_read_result_list) / read_average) * 100), 3) + elif "write" in oper: + tmp_doc['std-dev-%s' % io_size] = round(((statistics.stdev(average_write_result_list) / write_average) * 100), 3) + elif "randrw" in oper: + tmp_doc['std-dev-%s' % io_size] = round((((statistics.stdev(average_read_result_list) + statistics.stdev(average_write_result_list)) / tmp_doc['total-iops'])* 100), 3) + + importdoc['ceph_benchmark_test']['test_data'] = tmp_doc + #TODO add ID to document + index = self.prefix + "-analyzed_result" + yield importdoc, index + diff --git a/fio-wrapper/trigger_fio.py b/fio-wrapper/trigger_fio.py new file mode 100644 index 00000000..157d6abf --- /dev/null +++ b/fio-wrapper/trigger_fio.py @@ -0,0 +1,288 @@ + + +from datetime import datetime +from copy import deepcopy +from fio_hist_parser import compute_percentiles_from_logs +import re +import os +import sys +import json +import subprocess +import numpy as np +import configparser +import statistics +import time +import logging + +logger = logging.getLogger("fio_wrapper") + +_log_files={'bw':{'metric':'bandwidth'},'iops':{'metric':'iops'},'lat':{'metric':'latency'},'clat':{'metric':'latency'},'slat':{'metric':'latency'}} # ,'clat_hist_processed' +_data_direction={0:'read',1:'write',2:'trim'} + +class _trigger_fio: + """ + Will execute fio with the provided arguments and return normalized results for indexing + """ + def __init__(self, fio_jobs, working_dir, fio_jobs_dict, host_file, user, uuid, sample, fio_analyzer_obj, document_index_prefix, indexed=False, numjob=1, process_histogram=False): + self.fio_jobs = fio_jobs + self.working_dir = working_dir + self.fio_jobs_dict = fio_jobs_dict + self.host_file = host_file + self.user = user + self.uuid = uuid + self.sample = sample + self.fio_analyzer_obj = fio_analyzer_obj + self.document_index_prefix = document_index_prefix + self.indexed = indexed + self.numjob = numjob + self.histogram_process = process_histogram + + + def _document_payload(self, data, user, uuid, sample, list_hosts, end_time, fio_version, fio_jobs_dict): #pod_details, + processed = [] + fio_starttime = {} + earliest_starttime = float('inf') + for result in data["client_stats"] : + document = { + "uuid": uuid, + "user": user, + "hosts": list_hosts, + "fio-version": fio_version, + "timestamp_end": int(end_time)*1000, #this is in ms + #"nodeName": pod_details["hostname"], + "sample": int(sample), + "fio": result + } + if 'global' in fio_jobs_dict.keys(): + document['global_options'] = fio_jobs_dict['global'] + processed.append(document) + if result['jobname'] != 'All clients': + start_time= (int(end_time) * 1000) - result['job_runtime'] + fio_starttime[result['hostname']] = start_time + if start_time < earliest_starttime: + earliest_starttime = start_time + return processed, fio_starttime, earliest_starttime + + def _log_payload(self, directory, user, uuid, sample, fio_jobs_dict, fio_version, fio_starttime, list_hosts, job): #pod_details + logs = [] + _current_log_files = deepcopy(_log_files) + job_options = fio_jobs_dict[job] + if 'gtod_reduce' in job_options: + del _current_log_files['slat'] + del _current_log_files['clat'] + del _current_log_files['bw'] + if 'disable_lat' in job_options: + del _current_log_files['lat'] + if 'disable_slat' in job_options: + del _current_log_files['slat'] + if 'disable_clat' in job_options: + del _current_log_files['clat'] + if 'disable_bw' in job_options or 'disable_bw_measurement' in job_options: + del _current_log_files['bw'] + for log in _current_log_files.keys(): + for host in list_hosts: + log_file_prefix_string = 'write_' + str(log) + '_log' + if log in ['clat','slat']: + log_file_prefix_string = 'write_lat_log' + try: + log_file_name = str(job_options[log_file_prefix_string]) + '_' + str(log) + '.1.log.' + str(host) + except KeyError: + try: + log_file_name = str(fio_jobs_dict['global'][log_file_prefix_string]) + '_' + str(log) + '.1.log.' + str(host) + except: + logger.info("Error setting log_file_name") + with open(directory+'/'+str(log_file_name), 'r') as log_file: + for log_line in log_file: + log_line_values = str(log_line).split(", ") + if len(log_line_values) == 4: + timestamp_ms = int(fio_starttime[host]) + int(log_line_values[0]) + newtime = datetime.fromtimestamp(timestamp_ms / 1000.0) + log_dict = { + "uuid": uuid, + "user": user, + "host": host, + "fio-version": fio_version, + "job_options": job_options, + "job_name": str(job), + "sample": int(sample), + "log_name": str(log), + "timestamp": timestamp_ms, #this is in ms + "date": newtime.strftime('%Y-%m-%dT%H:%M:%S.%fZ'), + str(_current_log_files[log]['metric']): int(log_line_values[1]), + #"nodeName": pod_details["hostname"], + "data_direction": _data_direction[int(log_line_values[2])], + "offset": int(log_line_values[3]) + } + if 'global' in fio_jobs_dict.keys(): + log_dict['global_options'] = fio_jobs_dict['global'] + logs.append(log_dict) + return logs + + def _histogram_payload(self, processed_histogram_file, user, uuid, sample, fio_jobs_dict, fio_version, longest_fio_startime, list_hosts, job, numjob=1): #pod_details + logs = [] + with open(processed_histogram_file, 'r') as log_file: + for log_line in log_file: + log_line_values = str(log_line).split(", ") + if len(log_line_values) == 7 and not (any(len(str(x)) <= 0 for x in log_line_values)): + logger.debug(log_line_values) + timestamp_ms = int(longest_fio_startime) + int(log_line_values[0]) + newtime = datetime.fromtimestamp(timestamp_ms / 1000.0) + log_dict = { + "uuid": uuid, + "user": user, + "hosts": list_hosts, + "fio-version": fio_version, + "job_options": fio_jobs_dict[job], + "job_name": str(job), + "sample": int(sample), + "log_name": "clat_hist", + "timestamp": timestamp_ms, #this is in ms + "date": newtime.strftime('%Y-%m-%dT%H:%M:%S.%fZ'), + "number_samples_histogram": int(log_line_values[1]), + "min": float(log_line_values[2]), + "median": float(log_line_values[3]), + "p95": float(log_line_values[4]), + "p99": float(log_line_values[5]), + "max": float(log_line_values[6]) + } + if 'global' in fio_jobs_dict.keys(): + log_dict['global_options'] = fio_jobs_dict['global'] + logs.append(log_dict) + return logs + + def _clean_output(self, fio_output_file): + cmd = ["sed", "-i", "/{/,$!d"] + cmd.append(str(fio_output_file)) + process = subprocess.Popen(cmd, stdout=subprocess.PIPE) + stdout,stderr = process.communicate() + return stdout.strip(), process.returncode + + def _run_fiod(self, hosts_file, fiojob_file, output_dir, fio_output_file): + cmd = ["fio", "--client=", "path_file" ,"--output-format=json", "--output="] + cmd[1] = "--client=" + str(hosts_file) + cmd[2] = fiojob_file + cmd[4] = "--output=" + str(fio_output_file) + #logger.debug(cmd) + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, cwd=output_dir) + stdout,stderr = process.communicate() + return stdout.strip(), process.returncode + + def _process_histogram(self, job_dict, hosts, job, working_dir, processed_histogram_prefix, histogram_output_file, numjob=1): + histogram_input_file_list = [] + for host in hosts: + input_file = working_dir + '/' + processed_histogram_prefix + '.' + str(numjob) + '.log.' + str(host) + histogram_input_file_list.append(input_file) + logger.debug(histogram_input_file_list) + if 'log_hist_msec' not in job_dict[job].keys(): + if 'global' in job_dict.keys() and 'log_hist_msec' not in job_dict['global'].keys(): + logger.info("log_hist_msec not found, so can't process histogram logs") + exit(1) + else: + _log_hist_msec = job_dict['global']['log_hist_msec'] + else: + _log_hist_msec = job_dict[job]['log_hist_msec'] + compute_percentiles_from_logs(output_csv_file=histogram_output_file,file_list=histogram_input_file_list,log_hist_msec=_log_hist_msec) + + def _build_fio_job(self, job_name, job_dict, parent_dir, fio_job_file_name): + config = configparser.ConfigParser() + if 'global' in job_dict.keys(): + config['global'] = job_dict['global'] + config[job_name] = job_dict[job_name] + if os.path.exists(fio_job_file_name): + os.remove(fio_job_file_name) + logger.info("file " + fio_job_file_name + " already exists. overwriting") + with open(fio_job_file_name, 'w') as configfile: + config.write(configfile, space_around_delimiters=False) + + def emit_actions(self): + """ + Executes fio test and calls document parsers, if index_data is true will yield normalized data + """ + + #access user specified host file + with open(self.host_file) as f: + hosts = f.read().splitlines() + + #execute for each job in the user specified job file + for job in self.fio_jobs: + + job_dir = self.working_dir + '/' + str(job) + if not os.path.exists(job_dir): + os.mkdir(job_dir) + fio_output_file = job_dir + '/' + str('fio-result.json') + fio_job_file = job_dir + '/fiojob' + + self._build_fio_job(job, self.fio_jobs_dict, job_dir, fio_job_file) + + stdout = self._run_fiod(self.host_file, fio_job_file, job_dir, fio_output_file) + if stdout[1] != 0: + logger.error("Fio failed to execute, trying one more time..") + stdout = self._run_fiod(self.host_file, fio_job_file, job_dir, fio_output_file) + if stdout[1] != 0: + logger.error("Fio failed to execute a second time, stopping...") + logger.error(stdout) + exit(1) + clean_stdout = self._clean_output(fio_output_file) + + if clean_stdout[1] != 0: + logger.error("failed to parse the output file") + exit(1) + logger.info("fio has successfully finished sample {} executing for jobname {} and results are in the dir {}\n".format(self.sample, job, job_dir)) + if self.indexed: + with open(fio_output_file) as f: + data = json.load(f) + fio_endtime = int(data['timestamp']) # in epoch seconds + fio_version = data["fio version"] + + #parse fio json file, return list of normalized documents and structured start times + fio_result_documents, fio_starttime, earliest_starttime = self._document_payload(data, self.user, self.uuid, self.sample, hosts, fio_endtime, fio_version, self.fio_jobs_dict) #hosts_metadata + + # Add fio result document to fio analyzer object + self.fio_analyzer_obj.add_fio_result_documents(fio_result_documents, earliest_starttime) + #if indexing is turned on yield back normalized data + if self.indexed: + #from the returned normalized fio json document yield up for indexing + index = self.document_index_prefix + "-results" + for document in fio_result_documents: + yield document, index + + #check to determine if logs can be parsed, if not fail + try: + if self.fio_jobs_dict[job]['filename_format'] != 'f.\$jobnum.\$filenum' or int(self.fio_jobs_dict[job]['numjobs']) != 1: + logger.error("filename_format is not 'f.\$jobnum.\$filenum' and/or numjobs is not 1, so can't process logs") + exit(1) + except KeyError: + try: + if self.fio_jobs_dict['global']['filename_format'] != 'f.\$jobnum.\$filenum' or int(self.fio_jobs_dict['global']['numjobs']) != 1: + logger.error("filename_format is not 'f.\$jobnum.\$filenum' and/or numjobs is not 1, so can't process logs") + exit(1) + except: + logger.error("Error getting filename_format") + + #parse all fio log files, return list of normalized log documents + fio_log_documents = self._log_payload(job_dir, self.user, self.uuid, self.sample, self.fio_jobs_dict, fio_version, fio_starttime, hosts, job) + + #if indexing is turned on yield back normalized data + if self.indexed: + index = self.document_index_prefix + "-log" + for document in fio_log_documents: + yield document, index + if self.histogram_process: + try: + processed_histogram_prefix = self.fio_jobs_dict[job]['write_hist_log'] +'_clat_hist' + except KeyError: + try: + processed_histogram_prefix = self.fio_jobs_dict['global']['write_hist_log'] +'_clat_hist' + except: + logger.error("Error setting processed_histogram_prefix") + histogram_output_file = job_dir + '/' + processed_histogram_prefix + '_processed.' + str(self.numjob) + self._process_histogram(self.fio_jobs_dict, hosts, job, job_dir, processed_histogram_prefix, histogram_output_file) + histogram_documents = self._histogram_payload(histogram_output_file, self.user, self.uuid, self.sample, self.fio_jobs_dict, fio_version, earliest_starttime, hosts, job) + #if indexing is turned on yield back normalized data + if self.indexed: + index = self.document_index_prefix + "-hist-log" + for document in histogram_documents: + yield document, index + + else: + logger.info("Not indexing\n") diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/utils/common_logging.py b/utils/common_logging.py new file mode 100644 index 00000000..bf84eceb --- /dev/null +++ b/utils/common_logging.py @@ -0,0 +1,81 @@ +import logging +import os + +has_a_tty = os.isatty(1) # test stdout + +def color_me(color): + RESET_SEQ = "\033[0m" + COLOR_SEQ = "\033[1;%dm" + + color_seq = COLOR_SEQ % (30 + color) + + def closure(msg): + return color_seq + msg + RESET_SEQ + return closure + + +class ColoredFormatter(logging.Formatter): + BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) + + colors = { + 'WARNING': color_me(YELLOW), + 'DEBUG': color_me(BLUE), + 'CRITICAL': color_me(RED), + 'ERROR': color_me(RED), + 'INFO': color_me(GREEN) + } + + def __init__(self, msg, use_color=True, datefmt=None): + logging.Formatter.__init__(self, msg, datefmt=datefmt) + self.use_color = use_color + + def format(self, record): + orig = record.__dict__ + record.__dict__ = record.__dict__.copy() + levelname = record.levelname + + prn_name = levelname + ' ' * (8 - len(levelname)) + if (levelname in self.colors) and has_a_tty: + record.levelname = self.colors[levelname](prn_name) + else: + record.levelname = prn_name + + # super doesn't work here in 2.6 O_o + res = logging.Formatter.format(self, record) + # res = super(ColoredFormatter, self).format(record) + + # restore record, as it will be used by other formatters + record.__dict__ = orig + return res + + +def setup_loggers(logger_name, def_level=logging.DEBUG, log_fname=None): + logger = logging.getLogger(logger_name) + logger.setLevel(def_level) + sh = logging.StreamHandler() + sh.setLevel(def_level) + + FMT = "%Y-%m-%dT%H:%M:%SZ" + + log_format = '%(asctime)s - %(levelname)s - %(processName)s - %(module)s: %(message)s' + colored_formatter = ColoredFormatter(log_format, datefmt=FMT) + + sh.setFormatter(colored_formatter) + logger.addHandler(sh) + + if log_fname is not None: + #add check for existing file with same name if so move old log to .old. + #will only preserve a single .old file. + if os.path.exist(log_fname): + backup = "%s.old" % log_fname + os.rename(log_fname, backup) + fh = logging.FileHandler(log_fname) + formatter = logging.Formatter(log_format, datefmt=FMT) + fh.setFormatter(formatter) + fh.setLevel(def_level) + logger.addHandler(fh) + else: + fh = None + + + diff --git a/utils/py_es_bulk.py b/utils/py_es_bulk.py new file mode 100644 index 00000000..50d23e81 --- /dev/null +++ b/utils/py_es_bulk.py @@ -0,0 +1,197 @@ +""" +Opinionated methods for interfacing with Elasticsearch. We provide two +such opinions for creating templates (put_template()) and bulk indexing +(streaming_bulk). +""" + +import sys, os, time, json, errno, logging, math + +from random import SystemRandom +from collections import Counter, deque +from urllib3 import exceptions as ul_excs +try: + from elasticsearch1 import VERSION as es_VERSION, helpers, exceptions as es_excs + _es_logger = "elasticsearch1" +except ImportError: + from elasticsearch import VERSION as es_VERSION, helpers, exceptions as es_excs + _es_logger = "elasticsearch" + + +# Use the random number generator provided by the host OS to calculate our +# random backoff. +_r = SystemRandom() +# The maximum amount of time (in seconds) we'll wait before retrying an +# operation. +_MAX_SLEEP_TIME = 120 +# Always use "create" operations, as we also ensure each JSON document being +# indexed has an "_id" field, so we can tell when we are indexing duplicate +# data. +_op_type = "create" +# 100,000 minute timeout talking to Elasticsearch; basically we just don't +# want to timeout waiting for Elasticsearch and then have to retry, as that +# can add undue burden to the Elasticsearch cluster. +_request_timeout = 100000*60.0 + + +def _tstos(ts=None): + return time.strftime("%Y-%m-%dT%H:%M:%S-%Z", time.gmtime(ts)) + + +def _calc_backoff_sleep(backoff): + global _r + b = math.pow(2, backoff) + return _r.uniform(0, min(b, _MAX_SLEEP_TIME)) + + +def quiet_loggers(): + """ + A convenience function to quiet the urllib3 and elasticsearch1 loggers. + """ + logging.getLogger("urllib3").setLevel(logging.FATAL) + logging.getLogger(es_logger).setLevel(logging.FATAL) + + +def put_template(es, name, body): + """ + put_template(es, name, body) + Arguments: + es - An Elasticsearch client object already constructed + name - The name of the template to use + body - The payload body of the template + Returns: + A tuple with the start and end times of the PUT operation, along + with the number of times the operation was retried. + Failure modes are raised as exceptions. + """ + retry = True + retry_count = 0 + backoff = 1 + beg, end = time.time(), None + while retry: + try: + es.indices.put_template(name=name, body=body) + except es_excs.ConnectionError as exc: + # We retry all connection errors + time.sleep(_calc_backoff_sleep(backoff)) + backoff += 1 + retry_count += 1 + except es_excs.TransportError as exc: + # Only retry on certain 500 errors + if exc.status_code not in [500, 503, 504]: + raise + time.sleep(_calc_backoff_sleep(backoff)) + backoff += 1 + retry_count += 1 + else: + retry = False + end = time.time() + return beg, end, retry_count + + +def streaming_bulk(es, actions): + """ + streaming_bulk(es, actions) + Arguments: + es - An Elasticsearch client object already constructed + actions - An iterable for the documents to be indexed + Returns: + A tuple with the start and end times, the # of successfully indexed, + duplicate, and failed documents, along with number of times a bulk + request was retried. + """ + + # These need to be defined before the closure below. These work because + # a closure remembers the binding of a name to an object. If integer + # objects were used, the name would be bound to that integer value only + # so for the retries, incrementing the integer would change the outer + # scope's view of the name. By using a Counter object, the name to + # object binding is maintained, but the object contents are changed. + actions_deque = deque() + actions_retry_deque = deque() + retries_tracker = Counter() + + def actions_tracking_closure(cl_actions): + for cl_action in cl_actions: + assert '_id' in cl_action + assert '_index' in cl_action + assert '_type' in cl_action + assert _op_type == cl_action['_op_type'] + + actions_deque.append((0, cl_action)) # Append to the right side ... + yield cl_action + # if after yielding an action some actions appear on the retry deque + # start yielding those actions until we drain the retry queue. + backoff = 1 + while len(actions_retry_deque) > 0: + time.sleep(calc_backoff_sleep(backoff)) + retries_tracker['retries'] += 1 + retry_actions = [] + # First drain the retry deque entirely so that we know when we + # have cycled through the entire list to be retried. + while len(actions_retry_deque) > 0: + retry_actions.append(actions_retry_deque.popleft()) + for retry_count, retry_action in retry_actions: + actions_deque.append((retry_count, retry_action)) # Append to the right side ... + yield retry_action + # if after yielding all the actions to be retried, some show up + # on the retry deque again, we extend our sleep backoff to avoid + # pounding on the ES instance. + backoff += 1 + + beg, end = time.time(), None + successes = 0 + duplicates = 0 + failures = 0 + + # Create the generator that closes over the external generator, "actions" + generator = actions_tracking_closure(actions) + + streaming_bulk_generator = helpers.streaming_bulk( + es, generator, raise_on_error=False, + raise_on_exception=False, request_timeout=_request_timeout) + + for ok, resp_payload in streaming_bulk_generator: + retry_count, action = actions_deque.popleft() + try: + resp = resp_payload[_op_type] + status = resp['status'] + except KeyError as e: + assert not ok + # resp is not of expected form + print(resp) + status = 999 + else: + assert action['_id'] == resp['_id'] + if ok: + successes += 1 + else: + if status == 409: + if retry_count == 0: + # Only count duplicates if the retry count is 0 ... + duplicates += 1 + else: + # ... otherwise consider it successful. + successes += 1 + elif status == 400: + doc = { + "action": action, + "ok": ok, + "resp": resp, + "retry_count": retry_count, + "timestamp": tstos(time.time()) + } + jsonstr = json.dumps(doc, indent=4, sort_keys=True) + print(jsonstr) + #errorsfp.flush() + failures += 1 + else: + # Retry all other errors + print(resp) + actions_retry_deque.append((retry_count + 1, action)) + + end = time.time() + + assert len(actions_deque) == 0 + assert len(actions_retry_deque) == 0 + + return (beg, end, successes, duplicates, failures, retries_tracker['retries'])