From 0279c092561d262ce532d4e34075e5602be42e88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Srokosz?= Date: Wed, 22 Feb 2023 12:53:46 +0100 Subject: [PATCH] Backend feature: restart task (#205) --- karton/core/__version__.py | 2 +- karton/core/backend.py | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/karton/core/__version__.py b/karton/core/__version__.py index 2fe5fde1..0d72820f 100644 --- a/karton/core/__version__.py +++ b/karton/core/__version__.py @@ -1 +1 @@ -__version__ = "5.0.1" +__version__ = "5.1.0" diff --git a/karton/core/backend.py b/karton/core/backend.py index bfb5db55..7184e249 100644 --- a/karton/core/backend.py +++ b/karton/core/backend.py @@ -444,6 +444,29 @@ def consume_routed_task(self, identity: str, timeout: int = 5) -> Optional[Task] queue, data = item return self.get_task(data) + def restart_task(self, task: Task) -> Task: + """ + Requeues consumed task back to the consumer queue. + + New task is created with new uid and can be consumed by any active replica. + + Original task is marked as finished. + + :param task: Task to be restarted + :return: Restarted task object + """ + new_task = task.fork_task() + # Preserve orig_uid to point at unrouted task + new_task.orig_uid = task.orig_uid + new_task.status = TaskState.SPAWNED + + p = self.make_pipeline() + self.register_task(new_task, pipe=p) + self.produce_routed_task(new_task.headers["receiver"], new_task, pipe=p) + self.set_task_status(task, status=TaskState.FINISHED, pipe=p) + p.execute() + return new_task + @staticmethod def _log_channel(logger_name: Optional[str], level: Optional[str]) -> str: return ".".join(