Skip to content

Commit

Permalink
Added functionality for creating and processing archive file
Browse files Browse the repository at this point in the history
  • Loading branch information
acalhounRH committed Jun 2, 2021
1 parent 3981522 commit ff296a9
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 30 deletions.
89 changes: 70 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,74 @@
# Benchmark-Wrapper aka SNAFU - Situation Normal: All F'ed Up

Most Performance workload tools were written to tell you the performance at a given time under given circumstances.
Benchmark-wrapper provides a convenient mechanism for launching, processing, and storing data produced by a suite of performance benchmarks. Users can run Benchmark-wrapper in a traditional bare-metal environment or with the use of [benchmark-operator](https://github.com/cloud-bulldozer/benchmark-operator) ran in a containerized environment such as Kubernetes.

These scripts are to help enable these legacy tools store their data for long term investigations.
Note: If you need your benchmark to collect data for both Kubernetes and non-Kubernetes
environments, incorporate your benchmark into benchmark-wrapper and then write a benchmark-operator benchmark to integrate with Kubernetes.

Note: SNAFU does not depend upon Kubernetes, so you can use run_snafu.py on a bare-metal or VM cluster without relying
on Kubernetes to start/stop pods. So if you need your benchmark to collect data for both Kubernetes and non-Kubernetes
environments, develop in SNAFU and then write benchmark-operator benchmark to integrate with Kubernetes.
**Why Should I use Benchmark-wrapper?**

Traditionally benchmark tools have presented users with an adhoc raw standard output, limiting ability to perform detailed statistical analysis, no way to preserve results for long term archive, and difficulty at being platform agnostic. Benchmark-wrapper aims to solve all of these issues and provide users with a integrated streamlined interface.

# How to Run


It is suggested to use a virtual environment to install and run snafu.

```
python3 -m venv /path/to/new/virtual/environment
source /path/to/new/virtual/environment/bin/activate
git clone https://github.com/cloud-bulldozer/snafu
python setup.py develop
run_snafu --tool Your_Benchmark ...
```
## - Install ##

```
git clone https://github.com/cloud-bulldozer/benchmark-wrapper.git
sudo pip3 install /path-to-benchmark-wrapper/benchmark-wrapper
```

## - Configure ##
Benchmark-wrapper uses several environment variable to provide user context and interfaces to ES.

```
export uuid=<RFC4122 Version 4 uuid>
export test_user=<test user name>
export clustername=<platform or cluster descriptive name>
export es=<http://es_address:es_port>
```

## - Run ##

```
python3.7 ./snafu/run_snafu.py --tool <tool name> followed by tool dependent parameters.
```

for example:

```
python3.7 ./snafu/run_snafu.py --tool sysbench -f example__cpu_test.conf
```

## Archiving data

Benchmark-wrapper has two forms of capturing data. The first and preferred method is directly writing data to Elasticsearch, users will need to set the **es** environment variable in order to enable this. The second method used for capturing data is writing to a local archive file, this is intended to be enabled when Elasticsearch is not available for direct indexing or for a backup of indexed results. Both methods can be enabled at the same time, and are independent of each other.

To enable writing to an archive file users can use the --create-archive, if users require the file to be named/located in a specific location they can use --archive-file <file name>.

For example:

```
python3.7 ./snafu/run_snafu.py --tool sysbench -f example__cpu_test.conf --create-archive --archive-file /tmp/my_sysbench_data.archive
```

To index from an archive file users can invoke run_snafu as follows:

```
python3.7 ./snafu/run_snafu.py --tool archive --archive-file /tmp/my_sysbench_data.archive
```

**Note**: The archive file contains Elasticsearch friendly documents per line and is intended for future indexing, so it is not expect that users evaluate or review it manually.

## What workloads do we support?

Expand All @@ -27,26 +89,15 @@ environments, develop in SNAFU and then write benchmark-operator benchmark to in
| Image Pull | Time to copy from a container image from a repo | Working |
| sysbench | CPU,Memory,Mutex,Threads,Fileio | Working |

## What backend storage do we support?
## Supported backend data storage?

| Storage | Status |
| -------------- | -------- |
| Elasticsearch | Working |
| Prom | Planned |


It is suggested to use a venv to install and run snafu.

```
python3 -m venv /path/to/new/virtual/environment
source /path/to/new/virtual/environment/bin/activate
git clone https://github.com/cloud-bulldozer/snafu
python setup.py develop
run_snafu --tool Your_Benchmark ...
```


## how do I develop a snafu extension for my benchmark?
# how do I develop a snafu extension for my benchmark?

In what follows, your benchmark's name should be substituted for the name "Your_Benchmark". Use alphanumerics and
underscores only in your benchmark name.
Expand Down Expand Up @@ -107,7 +158,7 @@ server that is viewable with Kibana and Grafana!

Look at some of the other benchmarks for examples of how this works.

## How do I post results to Elasticsearch from my wrapper?
## How do I prepare results for Elasticsearch indexing from my wrapper?

Every snafu benchmark will use Elasticsearch index name of the form **orchestrator-benchmark-doctype**, consisting of the 3
components:
Expand Down
81 changes: 75 additions & 6 deletions snafu/run_snafu.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ def main():
)
parser.add_argument("-t", "--tool", help="Provide tool name", required=True)
parser.add_argument("--run-id", help="Run ID to unify benchmark results in ES", nargs="?", default="NA")
parser.add_argument("--archive-file", help="Archive file that will be indexed into ES")
parser.add_argument(
"--create-archive",
action="store_const",
dest="createarchive",
const=True,
default=False,
help="enables creation of archive file",
)
index_args, unknown = parser.parse_known_args()
index_args.index_results = False
index_args.prefix = "snafu-%s" % index_args.tool
Expand Down Expand Up @@ -85,7 +94,7 @@ def main():
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
es = elasticsearch.Elasticsearch(
[es_settings["server"]], send_get_body_as="POST", ssl_context=ssl_ctx, use_ssl=True
[es_settings["server"]], send_get_body_as="POST", ssl_context=ssl_ctx, use_ssl=False
)
else:
es = elasticsearch.Elasticsearch([es_settings["server"]], send_get_body_as="POST")
Expand All @@ -99,9 +108,23 @@ def main():
# call py es bulk using a process generator to feed it ES documents
if index_args.index_results:
parallel_setting = strtobool(os.environ.get("parallel", "false"))
res_beg, res_end, res_suc, res_dup, res_fail, res_retry = streaming_bulk(
es, process_generator(index_args, parser), parallel_setting
)

if "archive" in index_args.tool:
if index_args.archive_file:
# if processing a archive file use the process archive file function
res_beg, res_end, res_suc, res_dup, res_fail, res_retry = streaming_bulk(
es, process_archive_file(index_args), parallel_setting
)
else:
logger.error(
"Attempted to index archive without specifying a file, use --archive-file=<file>"
)
exit(1)
else:
# else run a test and process new result documents
res_beg, res_end, res_suc, res_dup, res_fail, res_retry = streaming_bulk(
es, process_generator(index_args, parser), parallel_setting
)

logger.info(
"Indexed results - %s success, %s duplicates, %s failures, with %s retries."
Expand All @@ -112,11 +135,23 @@ def main():
end_t = time.strftime("%Y-%m-%dT%H:%M:%SGMT", time.gmtime(res_end))

else:
logger.info("Not connected to Elasticsearch")
start_t = time.strftime("%Y-%m-%dT%H:%M:%SGMT", time.gmtime())
# need to loop through generator and pass on all yields
# this will execute all jobs without elasticsearch
for i in process_generator(index_args, parser):
pass
if "archive" in index_args.tool:
if index_args.archive_file:
logger.info("Processing archive file, but not indexing results...")
for es_friendly_doc in process_archive_file(index_args):
pass
else:
logger.error(
"Attempted to index archive without specifying a file, use --archive-file=<file>"
)
exit(1)
else:
for i in process_generator(index_args, parser):
pass
end_t = time.strftime("%Y-%m-%dT%H:%M:%SGMT", time.gmtime())

start_t = datetime.datetime.strptime(start_t, FMT)
Expand Down Expand Up @@ -176,6 +211,8 @@ def get_valid_es_document(action, index, index_args):
logger.debug("document size is: %s" % document_size_bytes)
logger.debug(json.dumps(es_valid_document, indent=4, default=str))

if index_args.createarchive:
write_to_archive_file(index_args, es_valid_document)
return es_valid_document


Expand Down Expand Up @@ -240,5 +277,37 @@ def get_prometheus_generator(index_args, action):
logger.info("Prometheus indexing duration of execution - %s" % tdelta)


def process_archive_file(index_args):

if os.path.isfile(index_args.archive_file):
with open(index_args.archive_file) as f:
for line in f:
es_friendly_document = json.loads(line)
document_size_bytes = sys.getsizeof(es_friendly_document)
index_args.document_size_capacity_bytes += document_size_bytes
yield es_friendly_document
else:
logger.error("%s Not found" % index_args.archive_file)
exit(1)


def write_to_archive_file(index_args, es_friendly_documment):

if index_args.archive_file:
archive_filename = index_args.archive_file
else:
# assumes that all documents have the same structure
user = es_friendly_documment["_source"]["user"]
clustername = es_friendly_documment["_source"]["clustername"]
uuid = es_friendly_documment["_source"]["uuid"]
# create archive file as user_clustername_uuid.archive in cwd
archive_filename = user + "_" + clustername + "_" + uuid + ".archive"

# Will write each es friendly document on 1 line, this makes re-indexing easier later
with open(archive_filename, "a") as f:
json.dump(es_friendly_documment, f)
f.write(os.linesep)


if __name__ == "__main__":
sys.exit(main())
27 changes: 22 additions & 5 deletions snafu/sysbench/trigger_sysbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import subprocess
import logging
import os
import re
from datetime import datetime

logger = logging.getLogger("snafu")
Expand Down Expand Up @@ -99,6 +100,16 @@ def emit_actions(self):
# loop through each line in stdout and capture results
for line in nospace_stdout.splitlines():
# all result fields have : so targeting those lines

if "transferred" in line and "memory" in self.test_config["test"]:
mem_total_transfered, mem_total_transferedpersecond = line.split("transferred")
mem_total_transfered = float(mem_total_transfered.replace("MiB", ""))

mem_total_transferedpersecond = re.sub("[()]", "", mem_total_transferedpersecond)
mem_total_transferedpersecond = float(mem_total_transferedpersecond.replace("MiB/sec", ""))

test_results["transferred(MiB)"] = mem_total_transfered
test_results["transferredpersec(MiB/sec)"] = mem_total_transferedpersecond
if ":" in line:
# break the line into Key value pairs
key, value = line.split(":")
Expand All @@ -115,21 +126,27 @@ def emit_actions(self):
# create a nested dict
test_results[section] = {}
elif section is None:
test_results[key] = value
test_results[key] = float(value)
else:
# there are fields with two values, we need to identify them and break
# them down into two sub-fields
if "(avg/stddev)" in key:
key = key.replace("(avg/stddev)", "")
avg, stddev = value.split("/")
test_results[section][key] = {}
test_results[section][key]["avg"] = avg
test_results[section][key]["stddev"] = stddev
test_results[section][key]["avg"] = float(avg)
test_results[section][key]["stddev"] = float(stddev)
elif "Totaloperations" in key and "persecond" in value:
totaloperations, totaloperationspersecond = value.split("(")
totaloperationspersecond = totaloperationspersecond.replace("persecond)", "")
test_results[section]["Totaloperations"] = totaloperations
test_results[section]["Totaloperationspersecond"] = totaloperationspersecond
test_results[section]["Totaloperations"] = float(totaloperations)
test_results[section]["Totaloperationspersecond"] = float(totaloperationspersecond)
elif "totaltime" in key:
key = key + "(seconds)"
value = value.replace("s", "")
test_results[section][key] = float(value)
elif "option" not in section:
test_results[section][key] = float(value)
else:
# store the Key value pair in the appropriate section
test_results[section][key] = value
Expand Down

0 comments on commit ff296a9

Please sign in to comment.