Skip to content

Commit

Permalink
Init
Browse files Browse the repository at this point in the history
  • Loading branch information
Stanislav Rakovsky committed Sep 20, 2023
0 parents commit bf8330d
Show file tree
Hide file tree
Showing 6 changed files with 613 additions and 0 deletions.
160 changes: 160 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# karton memory watcher

Working for several months in a row, tasks gradually become more and more significant in terms of RAM consumption.
And sometimes forcing a call to the garbage collector doesn't help.
This is especially felt when working with libraries that help to parse certain file formats -
at the start, the process takes up 60 megabytes of RAM, and after a couple of months - 400.

This library is a watcher that monitors the gradual increase in the service's RAM usage and shuts
it upon reaching a set threshold.

### For what ways to run karton consumer is it suitable?

For every way that uses autorestart. For example, I can suggest at least three of them:

1. systemctl service restart policy: `on-failure` / `always`
2. docker service restart policy: `on-failure` / `always` / `unless-stopped`
3. Screen infinite loop: `while true; do your_app; sleep 10; done`


### How to use it

Simple way:
```python
from karton.core import Consumer
from karton.memory_watcher import implant_watcher, RestartBehavior, RestartRule

...
class YourConsumer(Consumer):
pass

if __name__ == "__main__":
foobar = YourConsumer()
implant_watcher(
foobar,
RestartRule(
extra_consumed_memory_percent=80,
# call_before_exit=(close_db_connections, )
)
)
foobar.loop()

```

RestartRule modes:
1. `proceed_tasks`: count of tasks to proceed for restart
2. `elapsed_time`: how many seconds should pass
3. `extra_consumed_memory_percent`: extra memory used in % (100% means twice of memory compared to point before starting first task). It uses megabytes to calculate percents.
4. `extra_consumed_megabytes`: extra memory used in megabytes (e.g. your service at start uses 60MB and you need to kill it if it consumes extra 500+ MB)

You can see usage cases in [tests](./tests).

### Alternatives?

#### Systemctl:

In January 2023 there is a nice feature request about MemoryMax policy:
https://github.com/systemd/systemd/issues/25966

Also writing your own agent that will track memory leaks and send systemctl service to restart (but be careful not to kill ongoing tasks!).

#### Docker

wip, feel free to suggest how to gently restart container to allow task normally proceed
6 changes: 6 additions & 0 deletions karton/memory_watcher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .watcher import (
MemoryWatcherExitException,
RestartBehavior,
RestartRule,
implant_watcher,
)
145 changes: 145 additions & 0 deletions karton/memory_watcher/watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import time
from enum import Enum
from typing import Callable

import karton.core
import psutil


class RestartBehavior(Enum):
# We cannot exit by raising exception - there is an exception handler:
# https://github.com/CERT-Polska/karton/blob/2d10bd432928354a2030a0ee8aa976b64f4acb63/karton/core/karton.py#L301
# RAISE_EXCEPTION = -1
EXIT_0 = 1
EXIT_1 = 2


class MemoryWatcherExitException(Exception):
pass


class RestartRule:
def __init__(
self,
call_before_exit: tuple[Callable] = tuple(),
proceed_tasks: int = None,
elapsed_time: int = None,
extra_consumed_memory_percent: int = None,
extra_consumed_megabytes: int = None,
restart_behavior: RestartBehavior = RestartBehavior.EXIT_0,
):
"""
Body of restart routine
At least one of parameters should be set:
:param call_before_exit: one or more functions that need to call before exit
:param proceed_tasks: count of tasks to proceed for restart
:param elapsed_time: how many seconds should pass
:param extra_consumed_memory_percent: extra memory used in % (100% means twice of memory compared to point before starting first task)
:param extra_consumed_megabytes: extra memory used in megabytes (e.g. your service at start uses 60MB and you need to kill it if it consumes extra 500+ MB)
Need to mention that the rule will be
"""
if all(
a is None
for a in (
proceed_tasks,
elapsed_time,
extra_consumed_memory_percent,
extra_consumed_megabytes,
)
):
raise Exception("You should set at least one parameter!")

self.call_before_exit = call_before_exit

self.rule_proceed_tasks = proceed_tasks
self.rule_elapsed_time = elapsed_time
self.rule_extra_consumed_memory_percent = extra_consumed_memory_percent
self.rule_extra_consumed_megabytes = extra_consumed_megabytes

self.restart_behavior = restart_behavior

self.prehooked = False
self.tasks_counter = None
self.start_time = None
self.start_memory_usage = None

@staticmethod
def get_process_memory_usage():
return psutil.Process().memory_info().rss // 1024 // 1024

def pre_hook_behavior(self, *args, **kwargs):
"""
Init starting variables
"""
if not self.prehooked:
self.tasks_counter = 0
self.start_memory_usage = self.get_process_memory_usage()
self.start_time = time.time()
self.prehooked = True

def post_hook_behavior(self, *args, **kwargs):
"""
Check that everything is okay, so we don't need to restart
"""
self.tasks_counter += 1

# Trigger on count of proceed tasks (decrease call-based errors)
if self.rule_proceed_tasks is not None:
if self.tasks_counter >= self.rule_proceed_tasks:
self.execute_restart_behavior(
f"rule_proceed_tasks: {self.tasks_counter} >= {self.rule_proceed_tasks}"
)

# Trigger on elapsed time (decrease time-based errors)
if self.rule_elapsed_time is not None:
current_time = time.time()
if current_time - self.start_time >= self.rule_elapsed_time:
self.execute_restart_behavior(
f"rule_elapsed_time: {current_time} - {self.start_time} >= {self.rule_elapsed_time}"
)

# Trigger on memory usage
if (
self.rule_extra_consumed_megabytes is not None
or self.rule_extra_consumed_memory_percent is not None
):
memory_usage = self.get_process_memory_usage()

if self.rule_extra_consumed_megabytes is not None:
if (
memory_usage - self.start_memory_usage
>= self.rule_extra_consumed_megabytes
):
self.execute_restart_behavior(
f"rule_extra_consumed_megabytes: {memory_usage} - {self.start_memory_usage} >= {self.rule_extra_consumed_megabytes}"
)

if self.rule_extra_consumed_memory_percent is not None:
if (
memory_usage / self.start_memory_usage - 1
>= self.rule_extra_consumed_memory_percent / 100
):
self.execute_restart_behavior(
f"extra_consumed_memory_percent: {memory_usage} / {self.start_memory_usage} - 1 >= {self.rule_extra_consumed_memory_percent} / 100"
)

def execute_restart_behavior(self, reason):
print(f"Restart policy triggered, reason: {reason}")

for call in self.call_before_exit:
call()

if self.restart_behavior == RestartBehavior.EXIT_0:
exit(0)

if self.restart_behavior == RestartBehavior.EXIT_1:
exit(1)


def implant_watcher(consumer: karton.core.Consumer, rule: RestartRule):
consumer.add_pre_hook(rule.pre_hook_behavior, "Karton Memory Watcher pre-hook")
consumer.add_post_hook(rule.post_hook_behavior, "Karton Memory Watcher post-hook")
# just in case if someone will call `my_consumer = implant_watcher(my_consumer, ...)`
return consumer
Loading

0 comments on commit bf8330d

Please sign in to comment.