Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add get prometheus data to run snafu #173

Merged
merged 17 commits into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pyyaml
requests
redis
python-dateutil
prometheus_api_client
rsevilla87 marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ include_package_data = True
package_dir =
=src
# Add here dependencies of your project (semicolon/line-separated), e.g.
install_requires = configparser; elasticsearch>=6.0.0,<=7.0.2; statistics; numpy; pyyaml; requests; redis
install_requires = configparser; elasticsearch>=6.0.0,<=7.0.2; statistics; numpy; pyyaml; requests; redis; python-dateutil>=2.7.3; prometheus_api_client; scipy
# tests_require = pytest; pytest-cov
# Require a specific Python version, e.g. Python 2.7 or >= 3.4
# python_requires = >=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*
Expand Down
2 changes: 1 addition & 1 deletion src/fio_wrapper/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ RUN pushd fio-fio-3.19 && ./configure --disable-native && make -j2

FROM registry.access.redhat.com/ubi8:latest
COPY --from=builder /fio-fio-3.19/fio /usr/local/bin/fio
RUN dnf install -y --nodocs git python3-pip python3-requests python3-numpy libaio zlib procps-ng iproute net-tools ethtool nmap iputils && dnf clean all
RUN dnf install -y --nodocs git python3-pip libaio zlib procps-ng iproute net-tools ethtool nmap iputils && dnf clean all
RUN ln -s /usr/bin/python3 /usr/bin/python
COPY . /opt/snafu/
RUN pip3 install -e /opt/snafu/
2 changes: 1 addition & 1 deletion src/fs_drift_wrapper/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM registry.access.redhat.com/ubi8:latest

RUN dnf install -y --nodocs git python3-pip python3-numpy python3-requests python3-scipy
RUN dnf install -y --nodocs git python3-pip
RUN dnf install -y --nodocs procps-ng iproute net-tools ethtool nmap iputils
RUN ln -s /usr/bin/python3 /usr/bin/python
COPY . /opt/snafu/
Expand Down
2 changes: 1 addition & 1 deletion src/hammerdb/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM registry.access.redhat.com/ubi8:latest

# install requirements
RUN dnf install -y --nodocs tcl unixODBC python3-pip python3-requests
RUN dnf install -y --nodocs tcl unixODBC python3-pip
RUN dnf install -y --nodocs procps-ng iproute net-tools ethtool nmap iputils

RUN curl https://packages.microsoft.com/config/rhel/8/prod.repo -o /etc/yum.repos.d/mssql-release.repo
Expand Down
2 changes: 1 addition & 1 deletion src/linpack_wrapper/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM registry.access.redhat.com/ubi8:latest

RUN dnf install -y --nodocs python3-pip python3-numpy python3-requests python3-numpy
RUN dnf install -y --nodocs python3-pip
RUN ln -s /usr/bin/python3 /usr/bin/python
RUN mkdir -p /opt/snafu/
COPY . /opt/snafu/
Expand Down
2 changes: 1 addition & 1 deletion src/pgbench_wrapper/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM registry.access.redhat.com/ubi8:latest

RUN dnf install -y --nodocs https://download.postgresql.org/pub/repos/yum/reporpms/EL-8-x86_64/pgdg-redhat-repo-latest.noarch.rpm
RUN dnf install -y --nodocs python3-pip python3-numpy python3-requests postgresql11
RUN dnf install -y --nodocs python3-pip postgresql11
COPY src/image_resources/centos8-appstream.repo /etc/yum.repos.d/centos8-appstream.repo
RUN dnf install -y --nodocs redis --enablerepo=centos8-appstream
RUN dnf install -y --nodocs procps-ng iproute net-tools ethtool nmap iputils
Expand Down
92 changes: 79 additions & 13 deletions src/run_snafu.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import hashlib
import json
import ssl
from distutils.util import strtobool
from utils.py_es_bulk import streaming_bulk
from utils.common_logging import setup_loggers
from utils.wrapper_factory import wrapper_factory
from utils.get_prometheus_data import get_prometheus_data
from utils.request_cache_drop import drop_cache

logger = logging.getLogger("snafu")
Expand Down Expand Up @@ -88,16 +90,19 @@ def main():
logger.info("Connected to the elasticsearch cluster with info as follows:{0}".format(
str(es.info())))
except Exception as e:
logger.warning("Elasticsearch connection caused an exception : %s" % e)
logger.warn("Elasticsearch connection caused an exception : %s" % e)
index_args.index_results = False

index_args.document_size_capacity_bytes = 0
if index_args.index_results:
# call py es bulk using a process generator to feed it ES documents

parallel_setting = strtobool(os.environ.get('parallel', "false"))
res_beg, res_end, res_suc, res_dup, res_fail, res_retry = streaming_bulk(es,
process_generator(
index_args,
parser))
parser),
parallel_setting)

logger.info(
"Indexed results - %s success, %s duplicates, %s failures, with %s retries." % (
Expand Down Expand Up @@ -134,24 +139,85 @@ def process_generator(index_args, parser):
# drop cache after every sample
drop_cache()
for action, index in data_object.emit_actions():
es_index = index_args.prefix + '-' + index
es_valid_document = {"_index": es_index,
"_op_type": "create",
"_source": action,
"_id": ""}
es_valid_document["_id"] = hashlib.sha256(str(action).encode()).hexdigest()
document_size_bytes = sys.getsizeof(es_valid_document)
index_args.document_size_capacity_bytes += document_size_bytes
logger.debug("document size is: %s" % document_size_bytes)
logger.debug(json.dumps(es_valid_document, indent=4, default=str))
yield es_valid_document
if "get_prometheus_trigger" in index:
# Action will contain the following
"""
action: {
"uuid": <uuid>
"user": <user>
"clustername": <clustername>
"starttime": <datetime> datetime.utcnow().strftime('%s')
"endtime": <datetime>
test_config: {...}
}
"""
index_prom_data(index_args, action)
else:
es_valid_document = get_valid_es_document(action,
index,
index_args)
yield es_valid_document


def generate_wrapper_object(index_args, parser):
benchmark_wrapper_object = wrapper_factory(index_args.tool, parser)

yield benchmark_wrapper_object

def get_valid_es_document(action, index, index_args):
es_index = index_args.prefix + '-' + index
es_valid_document = {"_index": es_index,
"_op_type": "create",
"_source": action,
"_id": ""}
es_valid_document["_id"] = hashlib.sha256(str(action).encode()).hexdigest()
document_size_bytes = sys.getsizeof(es_valid_document)
index_args.document_size_capacity_bytes += document_size_bytes
logger.debug("document size is: %s" % document_size_bytes)
logger.debug(json.dumps(es_valid_document, indent=4, default=str))

return es_valid_document

def index_prom_data(prometheus_doc, index_args, action):

# definition of prometheus data getter, will yield back prom doc
def get_prometheus_generator(index_args, action):
prometheus_doc_generator = get_prometheus_data(action)
for prometheus_doc in prometheus_doc_generator.get_all_metrics():
es_valid_document = get_valid_es_document(prometheus_doc,
"prometheus_data",
index_args)
yield es_valid_document

if "prom_es" in os.environ:
es = {}
if os.environ["prom_es"] != "":
es['server'] = os.environ["prom_es"]
logger.info("Using Prometheus elasticsearch server with host: %s" % es['server'])
if os.environ["prom_port"] != "":
es['port'] = os.environ["prom_port"]
logger.info("Using Prometheus elasticsearch server with port: %s" % es['port'])

if index_args.index_results:
parallel_setting = strtobool(os.environ.get('parallel', "false"))
res_beg, res_end, res_suc, res_dup, res_fail, res_retry = streaming_bulk(es,
get_prometheus_generator(
prometheus_doc,
index_args,
action),
parallel_setting)

start_t = time.strftime('%Y-%m-%dT%H:%M:%SGMT', time.gmtime(res_beg))
end_t = time.strftime('%Y-%m-%dT%H:%M:%SGMT', time.gmtime(res_end))
# set up a standard format for time
FMT = '%Y-%m-%dT%H:%M:%SGMT'
start_t = datetime.datetime.strptime(start_t, FMT)
end_t = datetime.datetime.strptime(end_t, FMT)

# get time delta for indexing run
tdelta = end_t - start_t
logger.info("Prometheus indexing duration of execution - %s" % tdelta)


if __name__ == "__main__":
sys.exit(main())
2 changes: 1 addition & 1 deletion src/smallfile_wrapper/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM registry.access.redhat.com/ubi8:latest

RUN dnf install -y --nodocs git python3-pip python3-numpy python3-requests python3-scipy
RUN dnf install -y --nodocs git python3-pip
RUN dnf install -y --nodocs procps-ng iproute net-tools ethtool nmap iputils
RUN ln -s /usr/bin/python3 /usr/bin/python
ADD https://api.github.com/repos/distributed-system-analysis/smallfile/git/refs/heads/master /tmp/bustcache
Expand Down
1 change: 1 addition & 0 deletions src/uperf_wrapper/trigger_uperf.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Trigger_uperf():
def __init__(self, args):
self.uuid = args.uuid
self.user = args.user

self.clientips = args.clientips
self.remoteip = args.remoteip
self.hostnetwork = args.hostnetwork
Expand Down
111 changes: 111 additions & 0 deletions src/utils/get_prometheus_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import os
import json
import logging
import urllib3
from datetime import datetime, timedelta
import time
import sys
from prometheus_api_client import PrometheusConnect
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

logger = logging.getLogger("snafu")

class get_prometheus_data():
def __init__(self, action):

self.uuid = action["uuid"]
self.user = action["user"]
self.cluster_name = action["cluster_name"]
self.test_config = action["test_config"]

# change datetime in seconds string to datetime object
starttime = datetime.fromtimestamp(int(action["starttime"]))
self.start = starttime.datetime()

# change datetime in seconds string to datetime object
endtime = datetime.fromtimestamp(int(action["endtime"]))
# add 120s buffer to end time
endtime = endtime + timedelta(seconds=120)
self.end = endtime.datetime

# step value to be used in prometheus query
self.T_Delta = 30

self.get_data = False
if "prom_token" in os.environ and "prom_url" in os.environ:
self.get_data = True
token = os.environ["prom_token"]
self.url = os.environ["prom_url"]
bearer = "Bearer " + token
self.headers = {'Authorization': bearer}
self.pc = PrometheusConnect(url=self.url, headers=self.headers, disable_ssl=True)
else:
logger.warn("""snafu service account token and prometheus url not set \n
No Prometheus data will be indexed""")

def get_all_metrics(self):

# check get_data bool, if false by-pass all processing
if self.get_data:
start_time = time.time()

filename = os.path.join(sys.path[0], 'utils/prometheus_labels/included_labels.json')
with open(filename, 'r') as f:
datastore = json.load(f)

# for label in self.get_label_list():
for label in datastore["data"]:

# query_start_time = time.time()
query = "irate(%s[1m])" % label
"""
If there are additional queries need we should create a list or dict that can be iterated on
"""
step = str(self.T_Delta) + "s"
try:
# response = self.api_call(query)
response = self.pc.custom_query_range(query,
self.start,
self.end,
step,
None)
except Exception as e:
logger.warn("failure to get metric results %s" % e)

results = response['result']

# results is a list of all hits
"""
TODO: update with proper parsing of response document
"""

for result in results:
# clean up name key from __name__ to name
result["metric"]["name"] = ""
result["metric"]["name"] = result["metric"]["__name__"]
del result["metric"]["__name__"]
# each result has a list, we must flatten it out in order to send to ES
for value in result["values"]:
# fist index is time stamp
timestamp = datetime.utcfromtimestamp(value[0]).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
# second index is value of metric
if "NaN" in value[1]: # need to handle values that are NaN, Inf, or -Inf
metric_value = 0
else:
metric_value = float(value[1])

flat_doc = {"uuid": self.uuid,
"user": self.user,
"cluster_name": self.cluster_name,
"metric": result["metric"],
"Date": timestamp,
"value": metric_value,
"test_config": self.test_config
}

yield flat_doc
else:
pass
# logger.debug("Not exporting data for %s" % label)

logger.debug("Total Time --- %s seconds ---" % (time.time() - start_time))
Loading