Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly handle SIGINT/SIGTERM for graceful exit #181

Merged
merged 7 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions karton/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
import argparse
import logging
import textwrap
from contextlib import contextmanager
from typing import Optional, Union, cast

from .backend import KartonBackend
from .config import Config
from .logger import KartonLogHandler
from .task import Task
from .utils import GracefulKiller, StrictClassMethod
from .utils import HardShutdownInterrupt, StrictClassMethod, graceful_killer


class KartonBase(abc.ABC):
Expand Down Expand Up @@ -191,12 +192,24 @@ def __init__(
) -> None:
super().__init__(config=config, identity=identity, backend=backend)
self.setup_logger()
self.shutdown = False
self.killer = GracefulKiller(self.graceful_shutdown)
self._shutdown = False

def graceful_shutdown(self) -> None:
self.log.info("Gracefully shutting down!")
self.shutdown = True
def _do_shutdown(self) -> None:
self.log.info("Got signal, shutting down...")
self._shutdown = True

@property
def shutdown(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a chance that some users try to shut down the service on their own by setting this to True?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen that pattern, but we can expose it in more controlled way in a future.

return self._shutdown

@contextmanager
def graceful_killer(self):
try:
with graceful_killer(self._do_shutdown):
yield
except HardShutdownInterrupt:
self.log.info("Hard shutting down!")
raise

# Base class for Karton services
@abc.abstractmethod
Expand Down
4 changes: 4 additions & 0 deletions karton/core/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
class TaskTimeoutError(Exception):
pass


class HardShutdownInterrupt(BaseException):
pass
43 changes: 20 additions & 23 deletions karton/core/karton.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,18 +331,14 @@ def loop(self) -> None:
for task_filter in self.filters:
self.log.info("Binding on: %s", task_filter)

try:
with self.graceful_killer():
while not self.shutdown:
if self.backend.get_bind(self.identity) != self._bind:
self.log.info("Binds changed, shutting down.")
break

task = self.backend.consume_routed_task(self.identity)
if task:
self.internal_process(task)
except KeyboardInterrupt as e:
self.log.info("Hard shutting down!")
raise e


class LogConsumer(KartonServiceBase):
Expand Down Expand Up @@ -391,24 +387,25 @@ def loop(self) -> None:
"""
self.log.info("Logger %s started", self.identity)

for log in self.backend.consume_log(
logger_filter=self.logger_filter, level=self.level
):
if self.shutdown:
# Consumer shutdown has been requested
break
if not log:
# No log record received until timeout, try again.
continue
try:
self.process_log(log)
except Exception:
"""
This is log handler exception, so DO NOT USE self.log HERE!
"""
import traceback

traceback.print_exc()
with self.graceful_killer():
for log in self.backend.consume_log(
logger_filter=self.logger_filter, level=self.level
):
if self.shutdown:
# Consumer shutdown has been requested
break
if not log:
# No log record received until timeout, try again.
continue
try:
self.process_log(log)
except Exception:
"""
This is log handler exception, so DO NOT USE self.log HERE!
"""
import traceback

traceback.print_exc()


class Karton(Consumer, Producer):
Expand Down
2 changes: 1 addition & 1 deletion karton/core/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def process(self, task):
pass

karton = KartonDummy(config=config, identity=karton_name)
karton.shutdown = True
karton._shutdown = True
karton.loop()


Expand Down
34 changes: 23 additions & 11 deletions karton/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from contextlib import contextmanager
from typing import Any, Callable, Iterator, Sequence, TypeVar

from .exceptions import TaskTimeoutError
from .exceptions import HardShutdownInterrupt, TaskTimeoutError

T = TypeVar("T")

Expand All @@ -26,18 +26,30 @@ def throw_timeout(signum: int, frame: Any) -> None:
signal.signal(signal.SIGALRM, original_handler)


class GracefulKiller:
def __init__(self, handle_func: Callable) -> None:
self.handle_func = handle_func
self.original_sigint_handler = signal.signal(
signal.SIGINT, self.exit_gracefully
)
signal.signal(signal.SIGTERM, self.exit_gracefully)
@contextmanager
def graceful_killer(handler: Callable[[], None]):
"""
Graceful killer for Karton consumers.
"""
first_try = True

def exit_gracefully(self, signum: int, frame: Any) -> None:
self.handle_func()
def signal_handler(signum: int, frame: Any) -> None:
nonlocal first_try
handler()
if signum == signal.SIGINT:
signal.signal(signal.SIGINT, self.original_sigint_handler)
# Sometimes SIGTERM can be prematurely repeated.
# Forced but still clean shutdown is implemented only for SIGINT.
if not first_try:
raise HardShutdownInterrupt()
first_try = False

original_sigint_handler = signal.signal(signal.SIGINT, signal_handler)
original_sigterm_handler = signal.signal(signal.SIGTERM, signal_handler)
try:
yield
finally:
signal.signal(signal.SIGINT, original_sigint_handler)
signal.signal(signal.SIGTERM, original_sigterm_handler)


class StrictClassMethod:
Expand Down
19 changes: 10 additions & 9 deletions karton/system/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,15 +251,16 @@ def process_routing(self) -> None:
def loop(self) -> None:
self.log.info("Manager %s started", self.identity)

while not self.shutdown:
if self.enable_router:
# This will wait for up to 5s, so this is not a busy loop
self.process_routing()
if self.enable_gc:
# This will only do anything once every self.gc_interval seconds
self.gc_collect()
if not self.enable_router:
time.sleep(1) # Avoid a busy loop
with self.graceful_killer():
while not self.shutdown:
if self.enable_router:
# This will wait for up to 5s, so this is not a busy loop
self.process_routing()
if self.enable_gc:
# This will only do anything once every self.gc_interval seconds
self.gc_collect()
if not self.enable_router:
time.sleep(1) # Avoid a busy loop

@classmethod
def args_parser(cls) -> argparse.ArgumentParser:
Expand Down