From 04de93239cf3e91be57cc22ca482e812dfb0b1c0 Mon Sep 17 00:00:00 2001 From: dry923 Date: Fri, 26 Mar 2021 10:25:38 -0400 Subject: [PATCH] Remove redis coordination --- setup.cfg | 2 +- snafu/log_generator_wrapper/ci_test.sh | 11 +++++ .../log_generator_wrapper.py | 15 ------- .../trigger_log_generator.py | 44 +++++-------------- 4 files changed, 22 insertions(+), 50 deletions(-) create mode 100755 snafu/log_generator_wrapper/ci_test.sh diff --git a/setup.cfg b/setup.cfg index bb2fa834a..0ccb7c1d5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -21,7 +21,7 @@ zip_safe = False packages = find: include_package_data = True # Add here dependencies of your project (semicolon/line-separated), e.g. -install_requires = configparser; elasticsearch>=6.0.0,<=7.0.2; statistics; numpy; pyyaml; requests; redis; python-dateutil>=2.7.3; prometheus_api_client; scipy; openshift==0.11; kubernetes==11; setuptools>=40.3.0; boto3; flent +install_requires = configparser; elasticsearch>=6.0.0,<=7.0.2; statistics; numpy; pyyaml; requests; redis; python-dateutil>=2.7.3; prometheus_api_client; scipy; openshift==0.11; kubernetes==11; setuptools>=40.3.0; boto3; flent; importlib_metadata # tests_require = pytest; pytest-cov # Require a specific Python version, e.g. Python 2.7 or >= 3.4 python_requires = >=3.6 diff --git a/snafu/log_generator_wrapper/ci_test.sh b/snafu/log_generator_wrapper/ci_test.sh new file mode 100755 index 000000000..f1f780658 --- /dev/null +++ b/snafu/log_generator_wrapper/ci_test.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +set -x + +source ci/common.sh + +# Build image for ci +image_spec=$SNAFU_WRAPPER_IMAGE_PREFIX/log_generator:$SNAFU_IMAGE_TAG +build_and_push snafu/log_generator_wrapper/Dockerfile $image_spec +pushd ripsaw +source tests/test_log_generator.sh diff --git a/snafu/log_generator_wrapper/log_generator_wrapper.py b/snafu/log_generator_wrapper/log_generator_wrapper.py index a7b4d749e..ca67d06ac 100755 --- a/snafu/log_generator_wrapper/log_generator_wrapper.py +++ b/snafu/log_generator_wrapper/log_generator_wrapper.py @@ -54,14 +54,6 @@ def __init__(self, parent_parser): type=int, default=1, help='Total number of log generator pods to run') - parser.add_argument( - '--redisip', - help='IP address for redis server') - parser.add_argument( - '--redisport', - type=int, - default=6379, - help='Port for the redis server') parser.add_argument( '--pod-name', default=None, @@ -102,13 +94,6 @@ def __init__(self, parent_parser): type=str, default="app*", help='The ES index to search for the messages') - parser.add_argument( - '--incluster', - default="false", - help='Is this running from a pod within the cluster [true|false]') - parser.add_argument( - '--kubeconfig', - help='Optional kubeconfig location. Incluster cannot be true') self.args = parser_object.parse_args() diff --git a/snafu/log_generator_wrapper/trigger_log_generator.py b/snafu/log_generator_wrapper/trigger_log_generator.py index 1ece95001..47a16fa8b 100755 --- a/snafu/log_generator_wrapper/trigger_log_generator.py +++ b/snafu/log_generator_wrapper/trigger_log_generator.py @@ -18,7 +18,6 @@ import random import string import logging -import redis import boto3 import json import subprocess @@ -36,10 +35,7 @@ def __init__(self, args): self.messages_per_second = args.messages_per_second self.duration = args.duration self.pod_count = args.pod_count - self.redisip = args.redisip - self.redisport = args.redisport self.pod_name = args.pod_name - self.namespace = args.namespace self.timeout = args.timeout self.cloudwatch_log_group = args.cloudwatch_log_group self.aws_access_key = args.aws_access_key @@ -118,17 +114,6 @@ def _run_log_test(self): count += self.messages_per_second return count - def _wait_for_pods(self,my_redis,channel): - subchannel = my_redis.pubsub() - subchannel.subscribe(channel) - subscribers = dict(my_redis.pubsub_numsub(channel))[channel] - while subscribers < self.pod_count: - sleep(1) - subscribers = dict(my_redis.pubsub_numsub(channel))[channel] - # Sleep 10 to ensure all pods are ready and allow any lingering messages to clear - sleep(10) - subchannel.unsubscribe(channel) - def _check_cloudwatch(self,start_time,end_time): logger.info("Checking CloudWatch for expected messages") if self.aws_access_key is not None and self.aws_secret_key is not None: @@ -166,7 +151,8 @@ def _check_cloudwatch(self,start_time,end_time): def _check_es(self,start_time,end_time): logger.info("Checking ElasticSearch for expected messages") header_json = 'Content-Type: application/json' - header_auth = "Authorization: Bearer " + self.es_token + if self.es_token: + header_auth = "Authorization: Bearer " + self.es_token s_time = datetime.datetime.fromtimestamp(start_time - 60).strftime("%Y-%m-%dT%H:%M:%S") e_time = datetime.datetime.fromtimestamp(end_time + 60).strftime("%Y-%m-%dT%H:%M:%S") data = { @@ -196,9 +182,14 @@ def _check_es(self,start_time,end_time): es_url = self.es_url + "/" + self.es_index + "/_count" try: - response = subprocess.check_output(['curl','--insecure','--header',header_auth, - '--header',header_json,es_url,'-d',json.dumps(data), - '-s']).decode("utf-8") + if self.es_token: + response = subprocess.check_output(['curl','--insecure','--header',header_auth, + '--header',header_json,es_url,'-d',json.dumps(data), + '-s']).decode("utf-8") + else: + response = subprocess.check_output(['curl','--insecure','--header', + header_json,es_url,'-d',json.dumps(data), + '-s']).decode("utf-8") except Exception as err: logging.info("ElasticSearch query failed") logging.info(err) @@ -215,21 +206,6 @@ def emit_actions(self): (self.size, self.duration, self.messages_per_second)) logger.info("Test UUID is %s on cluster %s" % (self.uuid, self.cluster_name)) - logger.info("Waiting for all pods to be ready") - # Check redis for all running pods - if self.redisip is not None\ - and self.redisport is not None\ - and self.pod_name is not None\ - and self.uuid is not None\ - and self.pod_count != 1: - my_redis = redis.StrictRedis( - host=self.redisip, - port=self.redisport, - charset="utf-8", - decode_responses=True) - channel = "log-generator-" + self.uuid - self._wait_for_pods(my_redis,channel) - timestamp = time.strftime("%Y-%m-%dT%H:%M:%S") start_time = time.time() message_count = self._run_log_test()