Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fs drift run snafu #43

Merged
merged 5 commits into from
Sep 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added fs_drift_wrapper/__init__.py
Empty file.
73 changes: 73 additions & 0 deletions fs_drift_wrapper/fs_drift_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
import os
import argparse
import configparser
import logging

import trigger_fs_drift

logger = logging.getLogger("snafu")

class fs_drift_wrapper():

def __init__(self, parser):
#collect arguments

# it is assumed that the parser was created using argparse and already knows
# about the --tool option
parser.add_argument(
'-s', '--samples',
type=int,
help='number of times to run benchmark, defaults to 1',
default=1)
parser.add_argument(
'-T', '--top',
help='directory to access files in')
parser.add_argument(
'-d', '--dir',
help='output parent directory',
default=os.path.dirname(os.getcwd()))
parser.add_argument(
'-y', '--yaml-input-file',
help='fs-drift parameters passed via YAML input file')
self.args = parser.parse_args()

self.server = ""

self.cluster_name = "mycluster"
if "clustername" in os.environ:
self.cluster_name = os.environ["clustername"]

self.uuid = ""
if "uuid" in os.environ:
self.uuid = os.environ["uuid"]

self.user = ""
if "test_user" in os.environ:
self.user = os.environ["test_user"]

if not self.args.top:
raise SnafuSmfException('must supply directory where you access flies')
self.samples = self.args.samples
self.working_dir = self.args.top
self.result_dir = self.args.dir
self.yaml_input_file = self.args.yaml_input_file

def run(self):
if not os.path.exists(self.result_dir):
os.mkdir(self.result_dir)
for s in range(1, self.samples + 1):
sample_dir = self.result_dir + '/' + str(s)
if not os.path.exists(sample_dir):
os.mkdir(sample_dir)
trigger_generator = trigger_fs_drift._trigger_fs_drift(
logger, self.yaml_input_file, self.cluster_name, self.working_dir, sample_dir, self.user, self.uuid, s)
yield trigger_generator


117 changes: 117 additions & 0 deletions fs_drift_wrapper/trigger_fs_drift.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from copy import deepcopy
import time
import os
import sys
import json
import subprocess
import logging


class FsDriftWrapperException(Exception):
pass

class _trigger_fs_drift:
"""
Will execute with the provided arguments and return normalized results for indexing
"""
def __init__(self, logger, yaml_input_file, cluster_name, working_dir, result_dir, user, uuid, sample):
self.logger = logger
self.yaml_input_file = yaml_input_file
self.working_dir = working_dir
self.result_dir = result_dir
self.user = user
self.uuid = uuid
self.sample = sample
self.cluster_name = cluster_name

def ensure_dir_exists(self, directory):
if not os.path.exists(directory):
os.mkdir(directory)

def emit_actions(self):
"""
Executes test and calls document parsers, if index_data is true will yield normalized data
"""

self.ensure_dir_exists(self.working_dir)
rsptime_dir = os.path.join(self.working_dir, 'network-shared')

# clear out any unconsumed response time files in this directory
if os.path.exists(rsptime_dir):
contents = os.listdir(rsptime_dir)
for c in contents:
if c.endswith('.csv'):
os.unlink(os.path.join(rsptime_dir, c))

json_output_file = os.path.join(self.result_dir, 'fs-drift.json')
network_shared_dir = os.path.join(self.working_dir, 'network-shared')
rsptime_file = os.path.join(network_shared_dir, 'stats-rsptimes.csv')
cmd = ["python", "fs-drift.py",
"--top", self.working_dir,
"--output-json", json_output_file,
"--response-times", "Y",
"--input-yaml", self.yaml_input_file ]
self.logger.info('running:' + ' '.join(cmd))
self.logger.info('from current directory %s' % os.getcwd())
try:
process = subprocess.check_call(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
self.logger.exception(e)
raise FsDriftWrapperException(
'fs-drift.py non-zero process return code %d' % e.returncode)
self.logger.info("completed sample {} , results in {}".format(
self.sample, json_output_file))
with open(json_output_file) as f:
data = json.load(f)
data['cluster_name'] = self.cluster_name
data['uuid'] = self.uuid
data['user'] = self.user
data['sample'] = self.sample
yield data, '-results'

# process response time data

elapsed_time = float(data['results']['elapsed'])
start_time = data['results']['start-time']
sampling_interval = max(int(elapsed_time/120.0), 1)
cmd = ["python", "rsptime_stats.py",
"--time-interval", str(sampling_interval),
rsptime_dir ]
self.logger.info("process response times with: %s" % ' '.join(cmd))
try:
process = subprocess.check_call(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
self.logger.exception(e)
raise FsDriftWrapperException(
'rsptime_stats return code %d' % e.returncode)
self.logger.info( "response time result {}".format( rsptime_file))
with open(rsptime_file) as rf:
lines = [ l.strip() for l in rf.readlines() ]
start_grabbing = False
for l in lines:
if l.startswith('time-since-start'):
start_grabbing = True
elif start_grabbing:
if l == '':
continue
flds = l.split(',')
interval = {}
interval['cluster_name'] = self.cluster_name
interval['uuid'] = self.uuid
interval['user'] = self.user
interval['sample'] = self.sample
rsptime_date = start_time + int(flds[0])
rsptime_date_str = time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime(rsptime_date))
interval['date'] = rsptime_date_str
# number of fs-drift file operations in this interval
interval['op-count'] = int(flds[2])
# file operations per second in this interval
interval['file-ops-per-sec'] = float(flds[2]) / sampling_interval
interval['min'] = float(flds[3])
interval['max'] = float(flds[4])
interval['mean'] = float(flds[5])
interval['50%'] = float(flds[7])
interval['90%'] = float(flds[8])
interval['95%'] = float(flds[9])
interval['99%'] = float(flds[10])
yield interval, '-rsptimes'
7 changes: 6 additions & 1 deletion utils/wrapper_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
from smallfile_wrapper.smallfile_wrapper import smallfile_wrapper
#from pgbench_wrapper.pgbench_wrapper import pgbench_wrapper
#from uperf_wrapper.uperf_wrapper import uperf_wrapper
from fs_drift_wrapper.fs_drift_wrapper import fs_drift_wrapper

import logging
logger = logging.getLogger("snafu")

wrapper_dict = {"fio": fio_wrapper, "smallfile": smallfile_wrapper}
wrapper_dict = {
"fio": fio_wrapper,
"smallfile": smallfile_wrapper,
"fs-drift": fs_drift_wrapper
}
# "backpack": pgbench_wrapper,
# "fio": fio_wrapper,
# "pgbench": pgbench_wrapper,
Expand Down