From bf8330db4f1146547b074e5753a07a9e6276197a Mon Sep 17 00:00:00 2001 From: Stanislav Rakovsky Date: Wed, 20 Sep 2023 12:48:40 +0300 Subject: [PATCH] Init --- .gitignore | 160 ++++++++++++++++++++++ README.md | 63 +++++++++ karton/memory_watcher/__init__.py | 6 + karton/memory_watcher/watcher.py | 145 ++++++++++++++++++++ pyproject.toml | 24 ++++ tests/tests.py | 215 ++++++++++++++++++++++++++++++ 6 files changed, 613 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 karton/memory_watcher/__init__.py create mode 100644 karton/memory_watcher/watcher.py create mode 100644 pyproject.toml create mode 100644 tests/tests.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b0b6f3a --- /dev/null +++ b/.gitignore @@ -0,0 +1,160 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +.idea/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..b396386 --- /dev/null +++ b/README.md @@ -0,0 +1,63 @@ +# karton memory watcher + +Working for several months in a row, tasks gradually become more and more significant in terms of RAM consumption. +And sometimes forcing a call to the garbage collector doesn't help. +This is especially felt when working with libraries that help to parse certain file formats - +at the start, the process takes up 60 megabytes of RAM, and after a couple of months - 400. + +This library is a watcher that monitors the gradual increase in the service's RAM usage and shuts +it upon reaching a set threshold. + +### For what ways to run karton consumer is it suitable? + +For every way that uses autorestart. For example, I can suggest at least three of them: + +1. systemctl service restart policy: `on-failure` / `always` +2. docker service restart policy: `on-failure` / `always` / `unless-stopped` +3. Screen infinite loop: `while true; do your_app; sleep 10; done` + + +### How to use it + +Simple way: +```python +from karton.core import Consumer +from karton.memory_watcher import implant_watcher, RestartBehavior, RestartRule + +... +class YourConsumer(Consumer): + pass + +if __name__ == "__main__": + foobar = YourConsumer() + implant_watcher( + foobar, + RestartRule( + extra_consumed_memory_percent=80, + # call_before_exit=(close_db_connections, ) + ) + ) + foobar.loop() + +``` + +RestartRule modes: +1. `proceed_tasks`: count of tasks to proceed for restart +2. `elapsed_time`: how many seconds should pass +3. `extra_consumed_memory_percent`: extra memory used in % (100% means twice of memory compared to point before starting first task). It uses megabytes to calculate percents. +4. `extra_consumed_megabytes`: extra memory used in megabytes (e.g. your service at start uses 60MB and you need to kill it if it consumes extra 500+ MB) + +You can see usage cases in [tests](./tests). + +### Alternatives? + +#### Systemctl: + +In January 2023 there is a nice feature request about MemoryMax policy: +https://github.com/systemd/systemd/issues/25966 + +Also writing your own agent that will track memory leaks and send systemctl service to restart (but be careful not to kill ongoing tasks!). + +#### Docker + +wip, feel free to suggest how to gently restart container to allow task normally proceed \ No newline at end of file diff --git a/karton/memory_watcher/__init__.py b/karton/memory_watcher/__init__.py new file mode 100644 index 0000000..94a2a95 --- /dev/null +++ b/karton/memory_watcher/__init__.py @@ -0,0 +1,6 @@ +from .watcher import ( + MemoryWatcherExitException, + RestartBehavior, + RestartRule, + implant_watcher, +) diff --git a/karton/memory_watcher/watcher.py b/karton/memory_watcher/watcher.py new file mode 100644 index 0000000..c053171 --- /dev/null +++ b/karton/memory_watcher/watcher.py @@ -0,0 +1,145 @@ +import time +from enum import Enum +from typing import Callable + +import karton.core +import psutil + + +class RestartBehavior(Enum): + # We cannot exit by raising exception - there is an exception handler: + # https://github.com/CERT-Polska/karton/blob/2d10bd432928354a2030a0ee8aa976b64f4acb63/karton/core/karton.py#L301 + # RAISE_EXCEPTION = -1 + EXIT_0 = 1 + EXIT_1 = 2 + + +class MemoryWatcherExitException(Exception): + pass + + +class RestartRule: + def __init__( + self, + call_before_exit: tuple[Callable] = tuple(), + proceed_tasks: int = None, + elapsed_time: int = None, + extra_consumed_memory_percent: int = None, + extra_consumed_megabytes: int = None, + restart_behavior: RestartBehavior = RestartBehavior.EXIT_0, + ): + """ + Body of restart routine + At least one of parameters should be set: + + :param call_before_exit: one or more functions that need to call before exit + :param proceed_tasks: count of tasks to proceed for restart + :param elapsed_time: how many seconds should pass + :param extra_consumed_memory_percent: extra memory used in % (100% means twice of memory compared to point before starting first task) + :param extra_consumed_megabytes: extra memory used in megabytes (e.g. your service at start uses 60MB and you need to kill it if it consumes extra 500+ MB) + + Need to mention that the rule will be + """ + if all( + a is None + for a in ( + proceed_tasks, + elapsed_time, + extra_consumed_memory_percent, + extra_consumed_megabytes, + ) + ): + raise Exception("You should set at least one parameter!") + + self.call_before_exit = call_before_exit + + self.rule_proceed_tasks = proceed_tasks + self.rule_elapsed_time = elapsed_time + self.rule_extra_consumed_memory_percent = extra_consumed_memory_percent + self.rule_extra_consumed_megabytes = extra_consumed_megabytes + + self.restart_behavior = restart_behavior + + self.prehooked = False + self.tasks_counter = None + self.start_time = None + self.start_memory_usage = None + + @staticmethod + def get_process_memory_usage(): + return psutil.Process().memory_info().rss // 1024 // 1024 + + def pre_hook_behavior(self, *args, **kwargs): + """ + Init starting variables + """ + if not self.prehooked: + self.tasks_counter = 0 + self.start_memory_usage = self.get_process_memory_usage() + self.start_time = time.time() + self.prehooked = True + + def post_hook_behavior(self, *args, **kwargs): + """ + Check that everything is okay, so we don't need to restart + """ + self.tasks_counter += 1 + + # Trigger on count of proceed tasks (decrease call-based errors) + if self.rule_proceed_tasks is not None: + if self.tasks_counter >= self.rule_proceed_tasks: + self.execute_restart_behavior( + f"rule_proceed_tasks: {self.tasks_counter} >= {self.rule_proceed_tasks}" + ) + + # Trigger on elapsed time (decrease time-based errors) + if self.rule_elapsed_time is not None: + current_time = time.time() + if current_time - self.start_time >= self.rule_elapsed_time: + self.execute_restart_behavior( + f"rule_elapsed_time: {current_time} - {self.start_time} >= {self.rule_elapsed_time}" + ) + + # Trigger on memory usage + if ( + self.rule_extra_consumed_megabytes is not None + or self.rule_extra_consumed_memory_percent is not None + ): + memory_usage = self.get_process_memory_usage() + + if self.rule_extra_consumed_megabytes is not None: + if ( + memory_usage - self.start_memory_usage + >= self.rule_extra_consumed_megabytes + ): + self.execute_restart_behavior( + f"rule_extra_consumed_megabytes: {memory_usage} - {self.start_memory_usage} >= {self.rule_extra_consumed_megabytes}" + ) + + if self.rule_extra_consumed_memory_percent is not None: + if ( + memory_usage / self.start_memory_usage - 1 + >= self.rule_extra_consumed_memory_percent / 100 + ): + self.execute_restart_behavior( + f"extra_consumed_memory_percent: {memory_usage} / {self.start_memory_usage} - 1 >= {self.rule_extra_consumed_memory_percent} / 100" + ) + + def execute_restart_behavior(self, reason): + print(f"Restart policy triggered, reason: {reason}") + + for call in self.call_before_exit: + call() + + if self.restart_behavior == RestartBehavior.EXIT_0: + exit(0) + + if self.restart_behavior == RestartBehavior.EXIT_1: + exit(1) + + +def implant_watcher(consumer: karton.core.Consumer, rule: RestartRule): + consumer.add_pre_hook(rule.pre_hook_behavior, "Karton Memory Watcher pre-hook") + consumer.add_post_hook(rule.post_hook_behavior, "Karton Memory Watcher post-hook") + # just in case if someone will call `my_consumer = implant_watcher(my_consumer, ...)` + return consumer diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..33b73c2 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,24 @@ +[project] +name = "karton_memory_watcher" +version = "1.0.0" +authors = [ + { name="Rakovsky Stanislav", email="iam@disasm.me" }, +] +description = "Memory watcher for karton consumers" +readme = "README.md" +requires-python = ">=3.7" +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", +] + +dependencies = ["karton-core>=5.0.0"] + +[project.urls] +"Homepage" = "https://github.com/rakovskij-stanislav/karton_memory_watcher" + + +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" \ No newline at end of file diff --git a/tests/tests.py b/tests/tests.py new file mode 100644 index 0000000..5bd9022 --- /dev/null +++ b/tests/tests.py @@ -0,0 +1,215 @@ +import logging +import time +from typing import Callable, Optional, cast + +import psutil +import pytest +from karton.core import Task + +from karton.memory_watcher import ( + MemoryWatcherExitException, + RestartBehavior, + RestartRule, + implant_watcher, +) + + +class DummyConsumer: + """ + Oversimplified karton.core.Consumer class that represents situation with pre-hooks and post-hooks + """ + + def __init__(self): + self._pre_hooks = [] + self._post_hooks = [] + + self.log = logging.Logger("DummyConsumer") + + def process(self, task) -> None: + return None + + def internal_process(self, task) -> None: + self.current_task = task + + self._run_pre_hooks() + + self.process(self.current_task) + saved_exception = None + self._run_post_hooks(saved_exception) + + def add_pre_hook( + self, callback: Callable[[Task], None], name: Optional[str] = None + ) -> None: + """ + Add a function to be called before processing each task. + + :param callback: Function of the form ``callback(task)`` where ``task`` + is a :class:`karton.Task` + :param name: Name of the pre-hook + """ + self._pre_hooks.append((name, callback)) + + def add_post_hook( + self, + callback: Callable[[Task, Optional[Exception]], None], + name: Optional[str] = None, + ) -> None: + """ + Add a function to be called after processing each task. + + :param callback: Function of the form ``callback(task, exception)`` + where ``task`` is a :class:`karton.Task` and ``exception`` is + an exception thrown by the :meth:`karton.Consumer.process` function + or ``None``. + :param name: Name of the post-hook + """ + self._post_hooks.append((name, callback)) + + def _run_pre_hooks(self) -> None: + """ + Run registered preprocessing hooks + + :meta private: + """ + for name, callback in self._pre_hooks: + try: + callback(cast(Task, self.current_task)) + except Exception: + if name: + self.log.exception("Pre-hook (%s) failed", name) + else: + self.log.exception("Pre-hook failed") + + def _run_post_hooks(self, exception: Optional[Exception]) -> None: + """ + Run registered postprocessing hooks + + :param exception: Exception object that was caught while processing the task + + :meta private: + """ + for name, callback in self._post_hooks: + try: + callback(cast(Task, self.current_task), exception) + except Exception: + if name: + self.log.exception("Post-hook (%s) failed", name) + else: + self.log.exception("Post-hook failed") + + +def test_init(): + foo_service = DummyConsumer() + implant_watcher(foo_service, RestartRule(extra_consumed_memory_percent=100)) + foo_service.process(None) + + +def test_policy_proceed_tasks(): + foo_service = DummyConsumer() + implant_watcher( + foo_service, + RestartRule(proceed_tasks=100, restart_behavior=RestartBehavior.EXIT_0), + ) + + for i in range(99): + foo_service.internal_process(None) + + with pytest.raises(SystemExit) as exc: + foo_service.internal_process(None) + print(exc.value.code) + assert exc.value.code == 0 + + +def test_policy_elapsed_time(): + foo_service = DummyConsumer() + implant_watcher( + foo_service, + RestartRule(elapsed_time=5, restart_behavior=RestartBehavior.EXIT_0), + ) + + foo_service.internal_process(None) + time.sleep(2) + foo_service.internal_process(None) + time.sleep(4) + + with pytest.raises(SystemExit) as exc: + foo_service.internal_process(None) + assert exc.value.code == 0 + + +def test_policy_extra_consumed_memory_percent(): + foo_service = DummyConsumer() + implant_watcher( + foo_service, + RestartRule( + extra_consumed_memory_percent=15, restart_behavior=RestartBehavior.EXIT_0 + ), + ) + + current_memory_usage = psutil.Process().memory_info().rss // 1024 // 1024 + limit_to_reach = int(current_memory_usage * 0.15) + 1 + + print("MB to reach", limit_to_reach) + + foo_service.internal_process(None) + # 60% of "A" + runtime_string = "A" * 1024 * 1024 * int(limit_to_reach * 0.7) + print("Runtime string len", len(runtime_string)) + foo_service.internal_process(None) + # extra 60% of "A" + runtime_string += "A" * 1024 * 1024 * int(limit_to_reach * 0.7) + + with pytest.raises(SystemExit) as exc: + foo_service.internal_process(None) + assert exc.value.code == 0 + + +def test_policy_extra_consumed_megabytes(): + foo_service = DummyConsumer() + implant_watcher( + foo_service, + RestartRule( + extra_consumed_megabytes=15, restart_behavior=RestartBehavior.EXIT_0 + ), + ) + + foo_service.internal_process(None) + # 10 megabytes of "A" + runtime_string = "A" * 10 * 1024 * 1024 + foo_service.internal_process(None) + # extra 10 megabytes of "A" + runtime_string += "A" * 10 * 1024 * 1024 + + with pytest.raises(SystemExit) as exc: + foo_service.internal_process(None) + assert exc.value.code == 0 + + +def test_exit_via_exception(): + foo_service = DummyConsumer() + implant_watcher( + foo_service, + RestartRule(proceed_tasks=5, restart_behavior=RestartBehavior.EXIT_0), + ) + + for i in range(4): + foo_service.internal_process(None) + + with pytest.raises(SystemExit) as exc: + foo_service.internal_process(None) + assert exc.value.code == 0 + + +def test_exit_via_SystemExit1(): + foo_service = DummyConsumer() + implant_watcher( + foo_service, + RestartRule(proceed_tasks=5, restart_behavior=RestartBehavior.EXIT_1), + ) + + for i in range(4): + foo_service.internal_process(None) + + with pytest.raises(SystemExit) as exc: + foo_service.internal_process(None) + assert exc.value.code == 1