Skip to content

Commit

Permalink
Fix TaskTimeoutException regression: crashing whole service instead o…
Browse files Browse the repository at this point in the history
…f single task (#251)
  • Loading branch information
nazywam authored Mar 26, 2024
1 parent 81d0bb9 commit 1c2bc13
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions karton/core/karton.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .backend import KartonBackend, KartonBind, KartonMetrics
from .base import KartonBase, KartonServiceBase
from .config import Config
from .exceptions import TaskTimeoutError
from .resource import LocalResource
from .task import Task, TaskState
from .utils import timeout
Expand Down Expand Up @@ -129,7 +130,10 @@ def __init__(
self.current_task: Optional[Task] = None
self._pre_hooks: List[Tuple[Optional[str], Callable[[Task], None]]] = []
self._post_hooks: List[
Tuple[Optional[str], Callable[[Task, Optional[Exception]], None]]
Tuple[
Optional[str],
Callable[[Task, Optional[BaseException]], None],
]
] = []

@abc.abstractmethod
Expand Down Expand Up @@ -179,14 +183,14 @@ def internal_process(self, task: Task) -> None:
self.process(self.current_task)
else:
self.process(self.current_task)
except Exception as exc:
except (Exception, TaskTimeoutError) as exc:
saved_exception = exc
raise
finally:
self._run_post_hooks(saved_exception)

self.log.info("Task done - %s", self.current_task.uid)
except Exception:
except (Exception, TaskTimeoutError):
exc_info = sys.exc_info()
exception_str = traceback.format_exception(*exc_info)

Expand Down Expand Up @@ -260,7 +264,7 @@ def add_pre_hook(

def add_post_hook(
self,
callback: Callable[[Task, Optional[Exception]], None],
callback: Callable[[Task, Optional[BaseException]], None],
name: Optional[str] = None,
) -> None:
"""
Expand Down Expand Up @@ -289,7 +293,7 @@ def _run_pre_hooks(self) -> None:
else:
self.log.exception("Pre-hook failed")

def _run_post_hooks(self, exception: Optional[Exception]) -> None:
def _run_post_hooks(self, exception: Optional[BaseException]) -> None:
"""
Run registered postprocessing hooks
Expand Down Expand Up @@ -431,7 +435,7 @@ def _send_signaling_status_task_begin(self, task: Task) -> None:
self._send_signaling_status_task("task_begin")

def _send_signaling_status_task_end(
self, task: Task, ex: Optional[Exception]
self, task: Task, ex: Optional[BaseException]
) -> None:
"""Send a begin status signaling task.
Expand Down

0 comments on commit 1c2bc13

Please sign in to comment.