Skip to content

Commit

Permalink
Backend feature: restart task (#205)
Browse files Browse the repository at this point in the history
  • Loading branch information
psrok1 authored Feb 22, 2023
1 parent e146436 commit 0279c09
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
2 changes: 1 addition & 1 deletion karton/core/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "5.0.1"
__version__ = "5.1.0"
23 changes: 23 additions & 0 deletions karton/core/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,29 @@ def consume_routed_task(self, identity: str, timeout: int = 5) -> Optional[Task]
queue, data = item
return self.get_task(data)

def restart_task(self, task: Task) -> Task:
"""
Requeues consumed task back to the consumer queue.
New task is created with new uid and can be consumed by any active replica.
Original task is marked as finished.
:param task: Task to be restarted
:return: Restarted task object
"""
new_task = task.fork_task()
# Preserve orig_uid to point at unrouted task
new_task.orig_uid = task.orig_uid
new_task.status = TaskState.SPAWNED

p = self.make_pipeline()
self.register_task(new_task, pipe=p)
self.produce_routed_task(new_task.headers["receiver"], new_task, pipe=p)
self.set_task_status(task, status=TaskState.FINISHED, pipe=p)
p.execute()
return new_task

@staticmethod
def _log_channel(logger_name: Optional[str], level: Optional[str]) -> str:
return ".".join(
Expand Down

0 comments on commit 0279c09

Please sign in to comment.