Skip to content

Commit

Permalink
3 new routines for each type of result
Browse files Browse the repository at this point in the history
have to run each one as an iterator because of yield
  • Loading branch information
bengland2 committed Mar 16, 2022
1 parent 7d20903 commit 7a13528
Showing 1 changed file with 41 additions and 33 deletions.
74 changes: 41 additions & 33 deletions snafu/fs_drift_wrapper/trigger_fs_drift.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(self, logger, yaml_input_file, cluster_name, working_dir, result_di
self.working_dir = working_dir
self.network_shared_dir = os.path.join(self.working_dir, "network-shared")
self.result_dir = result_dir
self.json_output_file = os.path.join(self.result_dir, "fs-drift.json")
self.user = user
self.uuid = uuid
self.sample = sample
Expand Down Expand Up @@ -53,13 +54,12 @@ def emit_actions(self):
elif c.startswith("counters"):
os.unlink(os.path.join(self.network_shared_dir, c))

json_output_file = os.path.join(self.result_dir, "fs-drift.json")
cmd = [
"fs-drift.py",
"--top",
self.working_dir,
"--output-json",
json_output_file,
self.json_output_file,
"--response-times",
"Y",
"--input-yaml",
Expand All @@ -72,47 +72,55 @@ def emit_actions(self):
except subprocess.CalledProcessError as e:
self.logger.exception(e)
raise FsDriftWrapperException("fs-drift.py non-zero process return code %d" % e.returncode)
self.logger.info("completed sample {} , results in {}".format(self.sample, json_output_file))
self.logger.info("completed sample {} , results in {}".format(self.sample, self.json_output_file))

fsdict = get_vfs_stat_dict(self.working_dir)
with open(json_output_file) as f:
with open(self.json_output_file) as f:
data = json.load(f)
params = data["parameters"]
timestamp = data["results"]["date"]
threads = data["results"]["in-thread"]
for tid in threads.keys():
thrd = threads[tid]
thrd["fsdict"] = fsdict
thrd["date"] = timestamp
thrd["thr-id"] = tid
thrd["host"] = self.host
thrd["sample"] = self.sample
thrd["cluster_name"] = self.cluster_name
thrd["uuid"] = self.uuid
thrd["user"] = self.user
thrd["params"] = params
yield thrd, "results"

#self.process_per_thread_counters()

# comment out for now until we can debug
#self.process_rsptimes()


def process_rsptimes(self):
for thrd, index in self.process_result(data, fsdict):
yield thrd, index
elapsed_time = float(data["results"]["elapsed"])
start_time = data["results"]["start-time"]
self.logger.info('elapsed time = %f start_time = %d' % (elapsed_time, start_time))
for rsptime_interval, rsptime_index in self.process_rsptimes(start_time, elapsed_time):
yield rsptime_interval, rsptime_index
for rates_interval, rates_index in self.process_per_thread_counters(start_time):
yield rates_interval, rates_index

def process_result(self, data, filesys):
params = data["parameters"]
timestamp = data["results"]["date"]
threads = data["results"]["in-thread"]
for tid in threads.keys():
thrd = threads[tid]
thrd["fsdict"] = filesys
thrd["date"] = timestamp
thrd["thr-id"] = tid
thrd["host"] = self.host
thrd["sample"] = self.sample
thrd["cluster_name"] = self.cluster_name
thrd["uuid"] = self.uuid
thrd["user"] = self.user
thrd["params"] = params
yield thrd, "results"

def process_rsptimes(self, start_time, elapsed_time):
"""
convert response time logs to stats as a function of time
"""
rsptime_file = os.path.join(self.network_shared_dir, "stats-rsptimes.csv")
elapsed_time = float(data["results"]["elapsed"])
start_time = data["results"]["start-time"]
sampling_interval = max(int(elapsed_time / 120.0), 1)
cmd = ["rsptime_stats.py", "--time-interval", str(sampling_interval), rsptime_dir]
self.logger.info('sampling_interval %d' % sampling_interval)
if sampling_interval <= 1:
self.logger.info('not enough duration to calculate response time stats, skipping')
return
cmd = ["rsptime_stats.py", "--time-interval", str(sampling_interval), self.network_shared_dir]
self.logger.info("process response times with: %s" % " ".join(cmd))
try:
process = subprocess.check_call(cmd, stderr=subprocess.STDOUT) # noqa
except subprocess.CalledProcessError as e:
self.logger.exception(e)
raise FsDriftWrapperException("rsptime_stats return code %d" % e.returncode)
raise FsDriftWrapperException("rsptime_stats failed, see exception in log")
self.logger.info("response time result {}".format(rsptime_file))
with open(rsptime_file) as rf:
lines = [line.strip() for line in rf.readlines()]
Expand Down Expand Up @@ -154,7 +162,7 @@ def process_rsptimes(self):
yield interval, "rsptimes"


def process_per_thread_counters(self):
def process_per_thread_counters(self, start_time):
"""
reads in JSON per-thread counters, converts counters to rates
"""
Expand All @@ -168,7 +176,7 @@ def process_per_thread_counters(self):
thread_id = matched.group(1)
with open(pathnm, "r") as f:
thread_counters = json.load(f)
self.logger.info("process records from rates-over-time file %s " % (len(records), fn))
self.logger.info("process %d intervals from rates-over-time file %s " % (len(thread_counters), fn))
for snapshot in thread_counters:

# compute timestamp from start of test and time since start
Expand Down

0 comments on commit 7a13528

Please sign in to comment.