Skip to content

Commit

Permalink
Task tree: backend performance improvements (#255)
Browse files Browse the repository at this point in the history
  • Loading branch information
psrok1 authored May 16, 2024
1 parent 13857a0 commit c7d56b3
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 36 deletions.
2 changes: 1 addition & 1 deletion dev/logger.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.7
FROM python:3.11

COPY requirements.txt /karton/
COPY setup.py /karton/
Expand Down
2 changes: 1 addition & 1 deletion karton/core/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "5.3.4"
__version__ = "5.4.0"
52 changes: 52 additions & 0 deletions karton/core/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,58 @@ def get_all_tasks(
self.iter_all_tasks(chunk_size=chunk_size, parse_resources=parse_resources)
)

def _iter_legacy_task_tree(
self, root_uid: str, chunk_size: int = 1000, parse_resources: bool = True
) -> Iterator[Task]:
"""
Processes tasks made by <5.4.0 (unrouted from <5.4.0 producers or existing
before upgrade)
Used internally by iter_task_tree.
"""
# Iterate over all karton tasks that do not match the new task id format
legacy_task_keys = self.redis.scan_iter(
match=f"{KARTON_TASK_NAMESPACE}:[^{{]*", count=chunk_size
)
for chunk in chunks_iter(legacy_task_keys, chunk_size):
yield from filter(
lambda task: task.root_uid == root_uid,
(
Task.unserialize(
task_data, backend=self, parse_resources=parse_resources
)
for task_data in self.redis.mget(chunk)
if task_data is not None
),
)

def iter_task_tree(
self, root_uid: str, chunk_size: int = 1000, parse_resources: bool = True
) -> Iterator[Task]:
"""
Iterates all tasks that belong to the same analysis task tree
and have the same root_uid
:param root_uid: Root identifier of task tree
:param chunk_size: Size of chunks passed to the Redis SCAN and MGET command
:param parse_resources: If set to False, resources are not parsed.
It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation
to learn more.
:return: Iterator with task objects
"""
# Process <5.4.0 tasks (unrouted from <5.4.0 producers
# or existing before upgrade)
yield from self._iter_legacy_task_tree(
root_uid, chunk_size=chunk_size, parse_resources=parse_resources
)
# Process >=5.4.0 tasks
task_keys = self.redis.scan_iter(
match=f"{KARTON_TASK_NAMESPACE}:{{{root_uid}}}:*", count=chunk_size
)
yield from self._iter_tasks(
task_keys, chunk_size=chunk_size, parse_resources=parse_resources
)

def register_task(self, task: Task, pipe: Optional[Pipeline] = None) -> None:
"""
Register or update task in Redis.
Expand Down
94 changes: 66 additions & 28 deletions karton/core/inspect.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections import defaultdict
from typing import Dict, List
from typing import Dict, List, Optional

from .backend import KartonBackend, KartonBind
from .task import Task, TaskState
Expand All @@ -9,9 +9,9 @@ class KartonQueue:
"""
View object representing a Karton queue
:param bind: :py:meth:`KartonBind` object representing the queue bind
:param bind: :class:`KartonBind` object representing the queue bind
:param tasks: List of tasks currently in queue
:param state: :py:meth:`KartonBackend` object to be used
:param state: :class:`KartonState` object to be used
"""

def __init__(
Expand Down Expand Up @@ -48,7 +48,7 @@ class KartonAnalysis:
:param root_uid: Analysis root task uid
:param tasks: List of tasks
:param state: :py:meth:`KartonBackend` object to be used
:param state: :class:`KartonState` object to be used
"""

def __init__(self, root_uid: str, tasks: List[Task], state: "KartonState") -> None:
Expand Down Expand Up @@ -89,7 +89,7 @@ def get_queues_for_tasks(
Group task objects by their queue name
:param tasks: Task objects to group
:param state: :py:meth:`KartonBackend` to bind to created queues
:param state: :class:`KartonState` object to be used
:return: A dictionary containing the queue names and lists of tasks
"""
tasks_per_queue = defaultdict(list)
Expand Down Expand Up @@ -119,30 +119,68 @@ class KartonState:
:param backend: :py:meth:`KartonBackend` object to use for data fetching
"""

def __init__(self, backend: KartonBackend) -> None:
def __init__(self, backend: KartonBackend, parse_resources: bool = False) -> None:
self.backend = backend
self.binds = {bind.identity: bind for bind in backend.get_binds()}
self.replicas = backend.get_online_consumers()
self.tasks = backend.get_all_tasks()
self.pending_tasks = [
task for task in self.tasks if task.status != TaskState.FINISHED
]

# Tasks grouped by root_uid
tasks_per_analysis = defaultdict(list)

for task in self.pending_tasks:
tasks_per_analysis[task.root_uid].append(task)

self.analyses = {
root_uid: KartonAnalysis(root_uid=root_uid, tasks=tasks, state=self)
for root_uid, tasks in tasks_per_analysis.items()
}
queues = get_queues_for_tasks(self.pending_tasks, self)
# Present registered queues without tasks
for bind_name, bind in self.binds.items():
if bind_name not in queues:
queues[bind_name] = KartonQueue(
bind=self.binds[bind_name], tasks=[], state=self
self.parse_resources = parse_resources

self._tasks: Optional[List[Task]] = None
self._pending_tasks: Optional[List[Task]] = None
self._analyses: Optional[Dict[str, KartonAnalysis]] = None
self._queues: Optional[Dict[str, KartonQueue]] = None

@property
def tasks(self) -> List[Task]:
if self._tasks is None:
self._tasks = self.backend.get_all_tasks(
parse_resources=self.parse_resources
)
return self._tasks

@property
def pending_tasks(self) -> List[Task]:
if self._pending_tasks is None:
self._pending_tasks = [
task for task in self.tasks if task.status != TaskState.FINISHED
]
return self._pending_tasks

@property
def analyses(self) -> Dict[str, KartonAnalysis]:
if self._analyses is None:
# Tasks grouped by root_uid
tasks_per_analysis = defaultdict(list)

for task in self.pending_tasks:
tasks_per_analysis[task.root_uid].append(task)

self._analyses = {
root_uid: KartonAnalysis(root_uid=root_uid, tasks=tasks, state=self)
for root_uid, tasks in tasks_per_analysis.items()
}
return self._analyses

@property
def queues(self) -> Dict[str, KartonQueue]:
if self._queues is None:
queues = get_queues_for_tasks(self.pending_tasks, self)
# Present registered queues without tasks
for bind_name, bind in self.binds.items():
if bind_name not in queues:
queues[bind_name] = KartonQueue(
bind=self.binds[bind_name], tasks=[], state=self
)
self._queues = queues
return self._queues

def get_analysis(self, root_uid: str) -> KartonAnalysis:
return KartonAnalysis(
root_uid=root_uid,
tasks=list(
self.backend.iter_task_tree(
root_uid, parse_resources=self.parse_resources
)
self.queues = queues
),
state=self,
)
30 changes: 25 additions & 5 deletions karton/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,18 @@ def __init__(
raise ValueError("Persistent headers should be an instance of a dict")

if uid is None:
self.uid = str(uuid.uuid4())
task_uid = str(uuid.uuid4())
if root_uid is None:
self.root_uid = task_uid
else:
self.root_uid = root_uid
# New-style UID format introduced in v5.4.0
# {12345678-1234-1234-1234-12345678abcd}:12345678-1234-1234-1234-12345678abcd
self.uid = f"{{{self.root_uid}}}:{task_uid}"
else:
self.uid = uid

if root_uid is None:
self.root_uid = self.uid
else:
if root_uid is None:
raise ValueError("root_uid cannot be None when uid is not None")
self.root_uid = root_uid

self.orig_uid = orig_uid
Expand All @@ -137,6 +142,21 @@ def headers_persistent(self) -> Dict[str, Any]:
def receiver(self) -> Optional[str]:
return self.headers.get("receiver")

@property
def task_uid(self) -> str:
return self.fquid_to_uid(self.uid)

@staticmethod
def fquid_to_uid(fquid: str) -> str:
"""
Gets task uid from fully-qualified fquid ({root_uid}:task_uid)
:return: Task uid
"""
if ":" not in fquid:
return fquid
return fquid.split(":")[-1]

def fork_task(self) -> "Task":
"""
Fork task to transfer single task to many queues (but use different UID).
Expand Down
2 changes: 1 addition & 1 deletion karton/system/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def gc_collect(self) -> None:

def route_task(self, task: Task, binds: List[KartonBind]) -> None:
# Performs routing of task
self.log.info("[%s] Processing task %s", task.root_uid, task.uid)
self.log.info("[%s] Processing task %s", task.root_uid, task.task_uid)
# store the producer-task relationship in redis for task tracking
self.backend.log_identity_output(
task.headers.get("origin", "unknown"), task.headers
Expand Down

0 comments on commit c7d56b3

Please sign in to comment.