Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TaskTimeoutException regression #251

Merged
merged 9 commits into from
Mar 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions karton/core/karton.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .backend import KartonBackend, KartonBind, KartonMetrics
from .base import KartonBase, KartonServiceBase
from .config import Config
from .exceptions import TaskTimeoutError
from .resource import LocalResource
from .task import Task, TaskState
from .utils import timeout
Expand Down Expand Up @@ -129,7 +130,10 @@ def __init__(
self.current_task: Optional[Task] = None
self._pre_hooks: List[Tuple[Optional[str], Callable[[Task], None]]] = []
self._post_hooks: List[
Tuple[Optional[str], Callable[[Task, Optional[Exception]], None]]
Tuple[
Optional[str],
Callable[[Task, Optional[BaseException]], None],
]
] = []

@abc.abstractmethod
Expand Down Expand Up @@ -179,14 +183,14 @@ def internal_process(self, task: Task) -> None:
self.process(self.current_task)
else:
self.process(self.current_task)
except Exception as exc:
except (Exception, TaskTimeoutError) as exc:
saved_exception = exc
raise
finally:
self._run_post_hooks(saved_exception)

self.log.info("Task done - %s", self.current_task.uid)
except Exception:
except (Exception, TaskTimeoutError):
exc_info = sys.exc_info()
exception_str = traceback.format_exception(*exc_info)

Expand Down Expand Up @@ -260,7 +264,7 @@ def add_pre_hook(

def add_post_hook(
self,
callback: Callable[[Task, Optional[Exception]], None],
callback: Callable[[Task, Optional[BaseException]], None],
name: Optional[str] = None,
) -> None:
"""
Expand Down Expand Up @@ -289,7 +293,7 @@ def _run_pre_hooks(self) -> None:
else:
self.log.exception("Pre-hook failed")

def _run_post_hooks(self, exception: Optional[Exception]) -> None:
def _run_post_hooks(self, exception: Optional[BaseException]) -> None:
"""
Run registered postprocessing hooks
Expand Down Expand Up @@ -431,7 +435,7 @@ def _send_signaling_status_task_begin(self, task: Task) -> None:
self._send_signaling_status_task("task_begin")

def _send_signaling_status_task_end(
self, task: Task, ex: Optional[Exception]
self, task: Task, ex: Optional[BaseException]
) -> None:
"""Send a begin status signaling task.
Expand Down
Loading