Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop cache #196

Merged
merged 10 commits into from
Aug 12, 2020
3 changes: 3 additions & 0 deletions src/run_snafu.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from utils.py_es_bulk import streaming_bulk
from utils.common_logging import setup_loggers
from utils.wrapper_factory import wrapper_factory
from utils.request_cache_drop import drop_cache

logger = logging.getLogger("snafu")

Expand Down Expand Up @@ -130,6 +131,8 @@ def process_generator(index_args, parser):

for wrapper_object in benchmark_wrapper_object_generator:
for data_object in wrapper_object.run():
# drop cache after every sample
drop_cache()
for action, index in data_object.emit_actions():
es_index = index_args.prefix + '-' + index
es_valid_document = {"_index": es_index,
Expand Down
83 changes: 83 additions & 0 deletions src/utils/request_cache_drop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# this module accepts environment variables which control
# cache dropping behavior
# cache dropping is implemented by privileged pods separate from
# the benchmark pods, and these are invoked by
# HTTP GET from here.
# if no environment variables are defined, nothing happens

import os
import http.client
import logging


def getPortNum(envVar, defaultPort):
portStr = os.getenv(envVar)
if portStr is not None:
return int(portStr)
return defaultPort


class RunSnafuCacheDropException(Exception):
pass


dropKernelCachePort = getPortNum('KCACHE_DROP_PORT_NUM', 9435)
dropCephCachePort = getPortNum('CEPH_CACHE_DROP_PORT_NUM', 9437)

logger = logging.getLogger("snafu")

dbgLevel = os.getenv('DROP_CACHE_DEBUG_LEVEL')
if dbgLevel is not None:
logger.setLevel(logging.DEBUG)
logger.info('drop_cache debug log level')

http_debug_level = int(os.getenv('HTTP_DEBUG_LEVEL', default=0))

http_timeout = 10


# drop Ceph OSD cache if requested to

def drop_cache():
ceph_cache_drop_pod_ip = os.getenv('ceph_drop_pod_ip')
rsevilla87 marked this conversation as resolved.
Show resolved Hide resolved
logger.info('ceph OSD cache drop pod: %s' % str(ceph_cache_drop_pod_ip))
if ceph_cache_drop_pod_ip is not None:
conn = http.client.HTTPConnection(ceph_cache_drop_pod_ip,
port=dropCephCachePort,
timeout=http_timeout)
if http_debug_level > 0:
conn.set_debuglevel(http_debug_level)
logger.info('requesting ceph to drop cache via %s:%d' %
(ceph_cache_drop_pod_ip, dropCephCachePort))
conn.request('GET', '/DropCephCache')
rsp = conn.getresponse()
if rsp.status != http.client.OK:
logger.error('HTTP ERROR %d: %s' % (rsp.status, rsp.reason))
raise RunSnafuCacheDropException('Ceph OSD cache drop %s:%d' %
(ceph_cache_drop_pod_ip,
dropCephCachePort))

# drop kernel cache if requested to

kernel_cache_drop_pod_ips = os.getenv('kcache_drop_pod_ips')
logger.info('kernel cache drop pods: %s' % str(kernel_cache_drop_pod_ips))
if kernel_cache_drop_pod_ips is not None:
pod_ip_list = kernel_cache_drop_pod_ips.split()
for ip in pod_ip_list:
conn = http.client.HTTPConnection(ip,
port=dropKernelCachePort,
timeout=http_timeout)
if http_debug_level > 0:
conn.set_debuglevel(http_debug_level)
logger.info('requesting kernel to drop cache via %s:%d' %
(ip, dropKernelCachePort))
conn.request('GET', '/DropKernelCache')
rsp = conn.getresponse()
if rsp.status != http.client.OK:
logger.error('HTTP code %d: %s' % (rsp.status, rsp.reason))
raise RunSnafuCacheDropException('kernel cache drop %s:%d' %
(ip, dropKernelCachePort))


if __name__ == '__main__':
drop_cache()