Skip to content

Commit

Permalink
Remove redis coordination
Browse files Browse the repository at this point in the history
  • Loading branch information
dry923 committed Mar 29, 2021
1 parent 680f77b commit 04de932
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 50 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ zip_safe = False
packages = find:
include_package_data = True
# Add here dependencies of your project (semicolon/line-separated), e.g.
install_requires = configparser; elasticsearch>=6.0.0,<=7.0.2; statistics; numpy; pyyaml; requests; redis; python-dateutil>=2.7.3; prometheus_api_client; scipy; openshift==0.11; kubernetes==11; setuptools>=40.3.0; boto3; flent
install_requires = configparser; elasticsearch>=6.0.0,<=7.0.2; statistics; numpy; pyyaml; requests; redis; python-dateutil>=2.7.3; prometheus_api_client; scipy; openshift==0.11; kubernetes==11; setuptools>=40.3.0; boto3; flent; importlib_metadata
# tests_require = pytest; pytest-cov
# Require a specific Python version, e.g. Python 2.7 or >= 3.4
python_requires = >=3.6
Expand Down
11 changes: 11 additions & 0 deletions snafu/log_generator_wrapper/ci_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

set -x

source ci/common.sh

# Build image for ci
image_spec=$SNAFU_WRAPPER_IMAGE_PREFIX/log_generator:$SNAFU_IMAGE_TAG
build_and_push snafu/log_generator_wrapper/Dockerfile $image_spec
pushd ripsaw
source tests/test_log_generator.sh
15 changes: 0 additions & 15 deletions snafu/log_generator_wrapper/log_generator_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,6 @@ def __init__(self, parent_parser):
type=int,
default=1,
help='Total number of log generator pods to run')
parser.add_argument(
'--redisip',
help='IP address for redis server')
parser.add_argument(
'--redisport',
type=int,
default=6379,
help='Port for the redis server')
parser.add_argument(
'--pod-name',
default=None,
Expand Down Expand Up @@ -102,13 +94,6 @@ def __init__(self, parent_parser):
type=str,
default="app*",
help='The ES index to search for the messages')
parser.add_argument(
'--incluster',
default="false",
help='Is this running from a pod within the cluster [true|false]')
parser.add_argument(
'--kubeconfig',
help='Optional kubeconfig location. Incluster cannot be true')

self.args = parser_object.parse_args()

Expand Down
44 changes: 10 additions & 34 deletions snafu/log_generator_wrapper/trigger_log_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import random
import string
import logging
import redis
import boto3
import json
import subprocess
Expand All @@ -36,10 +35,7 @@ def __init__(self, args):
self.messages_per_second = args.messages_per_second
self.duration = args.duration
self.pod_count = args.pod_count
self.redisip = args.redisip
self.redisport = args.redisport
self.pod_name = args.pod_name
self.namespace = args.namespace
self.timeout = args.timeout
self.cloudwatch_log_group = args.cloudwatch_log_group
self.aws_access_key = args.aws_access_key
Expand Down Expand Up @@ -118,17 +114,6 @@ def _run_log_test(self):
count += self.messages_per_second
return count

def _wait_for_pods(self,my_redis,channel):
subchannel = my_redis.pubsub()
subchannel.subscribe(channel)
subscribers = dict(my_redis.pubsub_numsub(channel))[channel]
while subscribers < self.pod_count:
sleep(1)
subscribers = dict(my_redis.pubsub_numsub(channel))[channel]
# Sleep 10 to ensure all pods are ready and allow any lingering messages to clear
sleep(10)
subchannel.unsubscribe(channel)

def _check_cloudwatch(self,start_time,end_time):
logger.info("Checking CloudWatch for expected messages")
if self.aws_access_key is not None and self.aws_secret_key is not None:
Expand Down Expand Up @@ -166,7 +151,8 @@ def _check_cloudwatch(self,start_time,end_time):
def _check_es(self,start_time,end_time):
logger.info("Checking ElasticSearch for expected messages")
header_json = 'Content-Type: application/json'
header_auth = "Authorization: Bearer " + self.es_token
if self.es_token:
header_auth = "Authorization: Bearer " + self.es_token
s_time = datetime.datetime.fromtimestamp(start_time - 60).strftime("%Y-%m-%dT%H:%M:%S")
e_time = datetime.datetime.fromtimestamp(end_time + 60).strftime("%Y-%m-%dT%H:%M:%S")
data = {
Expand Down Expand Up @@ -196,9 +182,14 @@ def _check_es(self,start_time,end_time):
es_url = self.es_url + "/" + self.es_index + "/_count"

try:
response = subprocess.check_output(['curl','--insecure','--header',header_auth,
'--header',header_json,es_url,'-d',json.dumps(data),
'-s']).decode("utf-8")
if self.es_token:
response = subprocess.check_output(['curl','--insecure','--header',header_auth,
'--header',header_json,es_url,'-d',json.dumps(data),
'-s']).decode("utf-8")
else:
response = subprocess.check_output(['curl','--insecure','--header',
header_json,es_url,'-d',json.dumps(data),
'-s']).decode("utf-8")
except Exception as err:
logging.info("ElasticSearch query failed")
logging.info(err)
Expand All @@ -215,21 +206,6 @@ def emit_actions(self):
(self.size, self.duration, self.messages_per_second))
logger.info("Test UUID is %s on cluster %s" % (self.uuid, self.cluster_name))

logger.info("Waiting for all pods to be ready")
# Check redis for all running pods
if self.redisip is not None\
and self.redisport is not None\
and self.pod_name is not None\
and self.uuid is not None\
and self.pod_count != 1:
my_redis = redis.StrictRedis(
host=self.redisip,
port=self.redisport,
charset="utf-8",
decode_responses=True)
channel = "log-generator-" + self.uuid
self._wait_for_pods(my_redis,channel)

timestamp = time.strftime("%Y-%m-%dT%H:%M:%S")
start_time = time.time()
message_count = self._run_log_test()
Expand Down

0 comments on commit 04de932

Please sign in to comment.