Skip to content

Commit

Permalink
updated fio wrapper to trigger prom collection
Browse files Browse the repository at this point in the history
  • Loading branch information
acalhounRH authored and dry923 committed Oct 27, 2020
1 parent ce9723e commit ab94b45
Show file tree
Hide file tree
Showing 6 changed files with 608 additions and 1,170 deletions.
19 changes: 18 additions & 1 deletion snafu/fio_wrapper/trigger_fio.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ def emit_actions(self):
fio_output_file = os.path.join(job_dir, "fio-result.json")
fio_job_file = os.path.join(job_dir, "fiojob")
self._build_fio_job(job, job_dir, fio_job_file)

# capture sample start time, used for prom data collection
sample_starttime = datetime.utcnow().strftime('%s')
stdout, stderr, rc = self._run_fiod(fio_job_file, job_dir, fio_output_file)

if rc != 0:
logger.error("Fio failed to execute")
with open(fio_output_file, "r") as output:
Expand All @@ -276,6 +280,8 @@ def emit_actions(self):
"are in the dir {}\n".format(
self.sample, job, job_dir))

# capture sample end time, used for prom data collection
sample_endtime = datetime.utcnow().strftime('%s')
with open(fio_output_file) as f:
data = json.load(f)
fio_endtime = int(data['timestamp']) # in epoch seconds
Expand Down Expand Up @@ -328,7 +334,18 @@ def emit_actions(self):
self._process_histogram(job, job_dir, processed_histogram_prefix, histogram_output_file)
histogram_documents = self._histogram_payload(histogram_output_file, earliest_starttime, job)
# if indexing is turned on yield back normalized data

index = "hist-log"
for document in histogram_documents:
yield document, index
# trigger collection of prom data
sample_info_dict = {"uuid": self.uuid,
"user": self.user,
"cluster_name": self.cluster_name,
"starttime": sample_starttime,
"endtime": sample_endtime,
"sample": self.sample,
"tool": "fio",
"test_config": self.fio_jobs_dict
}

yield sample_info_dict, "get_prometheus_trigger"
51 changes: 43 additions & 8 deletions snafu/run_snafu.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ def main():
ssl_context=ssl_ctx, use_ssl=True)
else:
es = elasticsearch.Elasticsearch([_es_connection_string], send_get_body_as='POST')
logger.info("Connected to the elasticsearch cluster with info as follows:{0}".format(
str(es.info())))
logger.info("Connected to the elasticsearch cluster with info as follows:")
logger.info(json.dumps(es.info(), indent=4))

except Exception as e:
logger.warn("Elasticsearch connection caused an exception : %s" % e)
index_args.index_results = False
Expand Down Expand Up @@ -130,7 +131,6 @@ def main():
total_capacity_bytes = index_args.document_size_capacity_bytes
logger.info("Duration of execution - %s, with total size of %s bytes" % (tdelta, total_capacity_bytes))


def process_generator(index_args, parser):
benchmark_wrapper_object_generator = generate_wrapper_object(index_args, parser)

Expand All @@ -146,19 +146,20 @@ def process_generator(index_args, parser):
"uuid": <uuid>
"user": <user>
"clustername": <clustername>
"sample": <int>
"starttime": <datetime> datetime.utcnow().strftime('%s')
"endtime": <datetime>
test_config: {...}
}
"""

index_prom_data(index_args, action)
else:
es_valid_document = get_valid_es_document(action,
index,
index_args)
yield es_valid_document


def generate_wrapper_object(index_args, parser):
benchmark_wrapper_object = wrapper_factory(index_args.tool, parser)

Expand All @@ -181,35 +182,69 @@ def get_valid_es_document(action, index, index_args):

return es_valid_document

def index_prom_data(prometheus_doc, index_args, action):
def index_prom_data(index_args, action):

# definition of prometheus data getter, will yield back prom doc
def get_prometheus_generator(index_args, action):
prometheus_doc_generator = get_prometheus_data(action)
for prometheus_doc in prometheus_doc_generator.get_all_metrics():

es_valid_document = get_valid_es_document(prometheus_doc,
"prometheus_data",
index_args)

yield es_valid_document

es = {}
if "prom_es" in os.environ:
es = {}
if os.environ["prom_es"] != "":
es['server'] = os.environ["prom_es"]
logger.info("Using Prometheus elasticsearch server with host: %s" % es['server'])
if os.environ["prom_port"] != "":
es['port'] = os.environ["prom_port"]
logger.info("Using Prometheus elasticsearch server with port: %s" % es['port'])
es_verify_cert = os.getenv("es_verify_cert", "true")
if len(es.keys()) == 2:
if os.environ["es_index"] != "":
index_args.prefix = os.environ["es_index"]
logger.info("Using index prefix for ES:" + index_args.prefix)
index_args.index_results = True
try:
_es_connection_string = str(es['server']) + ':' + str(es['port'])
if es_verify_cert == "false":
logger.info("Turning off TLS certificate verification")
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
es = elasticsearch.Elasticsearch([_es_connection_string], send_get_body_as='POST',
ssl_context=ssl_ctx, use_ssl=True)
else:
es = elasticsearch.Elasticsearch([_es_connection_string], send_get_body_as='POST')
logger.info("Connected to the elasticsearch cluster with info as follows:")
logger.info(json.dumps(es.info(), indent=4))

if index_args.index_results:
except Exception as e:
logger.warn("Elasticsearch connection caused an exception : %s" % e)
index_args.index_results = False

# check that we want to index and that the prom_es exist.
if index_args.index_results and "prom_es" in os.environ:
logger.info("initializing prometheus indexing")
parallel_setting = strtobool(os.environ.get('parallel', "false"))
res_beg, res_end, res_suc, res_dup, res_fail, res_retry = streaming_bulk(es,
get_prometheus_generator(
prometheus_doc,
index_args,
action),
parallel_setting)

logger.info(
"Prometheus indexed results - %s success, %s duplicates, %s failures, with %s retries." % (
res_suc,
res_dup,
res_fail,
res_retry))
start_t = time.strftime('%Y-%m-%dT%H:%M:%SGMT', time.gmtime(res_beg))
end_t = time.strftime('%Y-%m-%dT%H:%M:%SGMT', time.gmtime(res_end))
# set up a standard format for time
Expand Down
85 changes: 46 additions & 39 deletions snafu/utils/get_prometheus_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
import json
import logging
import urllib3
from datetime import datetime, timedelta
from datetime import datetime
import time
import sys
from prometheus_api_client import PrometheusConnect
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

Expand All @@ -13,23 +12,27 @@
class get_prometheus_data():
def __init__(self, action):

self.sample_info_dict = action
self.uuid = action["uuid"]
self.user = action["user"]
self.cluster_name = action["cluster_name"]
self.test_config = action["test_config"]

# change datetime in seconds string to datetime object
starttime = datetime.fromtimestamp(int(action["starttime"]))
self.start = starttime.datetime()
starttime = datetime.fromtimestamp(int(self.sample_info_dict["starttime"]))
self.start = starttime

# change datetime in seconds string to datetime object
endtime = datetime.fromtimestamp(int(action["endtime"]))
# add 120s buffer to end time
endtime = endtime + timedelta(seconds=120)
self.end = endtime.datetime
endtime = datetime.fromtimestamp(int(self.sample_info_dict["endtime"]))
self.end = endtime

# step value to be used in prometheus query
self.T_Delta = 30
# default is 30 seconds(openshift default scraping interval)
# but can be overridden with env
if "prom_step" in os.environ:
self.T_Delta = os.environ["prom_step"]
else:
self.T_Delta = 30

self.get_data = False
if "prom_token" in os.environ and "prom_url" in os.environ:
Expand All @@ -49,41 +52,49 @@ def get_all_metrics(self):
if self.get_data:
start_time = time.time()

filename = os.path.join(sys.path[0], 'utils/prometheus_labels/included_labels.json')
# resolve directory the tool include file
dirname = os.path.dirname(__file__)
include_file_dir = os.path.join(dirname, '/utils/prometheus_labels/')
tool_include_file = include_file_dir + self.sample_info_dict["tool"] + "_included_labels.json"

# check if tools include file is there
# if not use the default include file
if os.path.isfile(tool_include_file):
filename = tool_include_file
else:
filename = os.path.join(include_file_dir, 'included_labels.json')

# open tools include file and loop through all
with open(filename, 'r') as f:
datastore = json.load(f)

# for label in self.get_label_list():
for label in datastore["data"]:
for metric_name in datastore["data"]:

query_item = datastore["data"][metric_name]
query = query_item["query"]
label = query_item["label"]

# query_start_time = time.time()
query = "irate(%s[1m])" % label
"""
If there are additional queries need we should create a list or dict that can be iterated on
"""
step = str(self.T_Delta) + "s"
try:
# response = self.api_call(query)
# Execute custom query to pull the desired labels between X and Y time.
response = self.pc.custom_query_range(query,
self.start,
self.end,
step,
None)

except Exception as e:
logger.info(query)
logger.warn("failure to get metric results %s" % e)

results = response['result']

# results is a list of all hits
"""
TODO: update with proper parsing of response document
"""

for result in results:
for result in response:
# clean up name key from __name__ to name
result["metric"]["name"] = ""
result["metric"]["name"] = result["metric"]["__name__"]
del result["metric"]["__name__"]
if "__name__" in result["metric"]:
result["metric"]["name"] = result["metric"]["__name__"]
del result["metric"]["__name__"]
else:
result["metric"]["name"] = label
# each result has a list, we must flatten it out in order to send to ES
for value in result["values"]:
# fist index is time stamp
Expand All @@ -94,18 +105,14 @@ def get_all_metrics(self):
else:
metric_value = float(value[1])

flat_doc = {"uuid": self.uuid,
"user": self.user,
"cluster_name": self.cluster_name,
"metric": result["metric"],
"Date": timestamp,
"value": metric_value,
"test_config": self.test_config
}
flat_doc = {
"metric": result["metric"],
"Date": timestamp,
"value": metric_value,
"metric_name": metric_name
}

flat_doc.update(self.sample_info_dict)
yield flat_doc
else:
pass
# logger.debug("Not exporting data for %s" % label)

logger.debug("Total Time --- %s seconds ---" % (time.time() - start_time))
Loading

0 comments on commit ab94b45

Please sign in to comment.