Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task tree and iterators: backend performance improvements #207

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
minor: [7, 8, 9]
minor: [8, 9, 10, 11]
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
Expand Down
16 changes: 16 additions & 0 deletions docs/karton_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,19 @@ karton.core.Config

.. autoclass:: karton.core.config.Config
:members:


Low-level Karton access
-----------------------

.. autoclass:: karton.core.backend.KartonBackend
:members:

.. autoclass:: karton.core.inspect.KartonState
:members:

.. autoclass:: karton.core.inspect.KartonAnalysis
:members:

.. autoclass:: karton.core.inspect.KartonQueue
:members:
102 changes: 66 additions & 36 deletions karton/core/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,36 +365,36 @@ def get_online_services(self) -> List[KartonServiceInfo]:
continue
return bound_services

def get_task(self, task_uid: str) -> Optional[Task]:
def get_task(self, task_fquid: str) -> Optional[Task]:
"""
Get task object with given identifier

:param task_uid: Task identifier
:param task_fquid: Task fully-qualified identifier
:return: Task object
"""
task_data = self.redis.get(f"{KARTON_TASK_NAMESPACE}:{task_uid}")
task_data = self.redis.get(f"{KARTON_TASK_NAMESPACE}:{task_fquid}")
if not task_data:
return None
return Task.unserialize(task_data, backend=self)

def get_tasks(
self,
task_uid_list: List[str],
task_fquid_list: List[str],
chunk_size: int = 1000,
parse_resources: bool = True,
) -> List[Task]:
"""
Get multiple tasks for given identifier list

:param task_uid_list: List of task identifiers
:param task_fquid_list: List of task fully-qualified identifiers
:param chunk_size: Size of chunks passed to the Redis MGET command
:param parse_resources: If set to False, resources are not parsed.
It speeds up deserialization. Read :py:meth:`Task.unserialize`
documentation to learn more.
:return: List of task objects
"""
keys = chunks(
[f"{KARTON_TASK_NAMESPACE}:{task_uid}" for task_uid in task_uid_list],
[f"{KARTON_TASK_NAMESPACE}:{task_fquid}" for task_fquid in task_fquid_list],
chunk_size,
)
return [
Expand Down Expand Up @@ -423,13 +423,13 @@ def _iter_tasks(

def iter_tasks(
self,
task_uid_list: Iterable[str],
task_fquid_list: Iterable[str],
chunk_size: int = 1000,
parse_resources: bool = True,
) -> Iterator[Task]:
"""
Get multiple tasks for given identifier list as an iterator
:param task_uid_list: List of task fully-qualified identifiers
:param task_fquid_list: List of task fully-qualified identifiers
:param chunk_size: Size of chunks passed to the Redis MGET command
:param parse_resources: If set to False, resources are not parsed.
It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation
Expand All @@ -438,8 +438,8 @@ def iter_tasks(
"""
return self._iter_tasks(
map(
lambda task_uid: f"{KARTON_TASK_NAMESPACE}:{task_uid}",
task_uid_list,
lambda task_fquid: f"{KARTON_TASK_NAMESPACE}:{task_fquid}",
task_fquid_list,
),
chunk_size=chunk_size,
parse_resources=parse_resources,
Expand All @@ -463,11 +463,40 @@ def iter_all_tasks(
task_keys, chunk_size=chunk_size, parse_resources=parse_resources
)

def iter_task_tree(
self, root_uid: str, chunk_size: int = 1000, parse_resources: bool = True
) -> Iterator[Task]:
"""
Iterates all tasks that belong to the same analysis task tree
and have the same root_uid

:param root_uid: Root identifier of task tree
:param chunk_size: Size of chunks passed to the Redis SCAN and MGET command
:param parse_resources: If set to False, resources are not parsed.
It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation
to learn more.
:return: Iterator with task objects

.. note::
This method processes only these tasks that are stored under
karton.task:<root_uid>:<task_uid> key format which is fully-qualified
identifier introduced in Karton 5.2.0

Requires karton-system to be upgraded to Karton 5.2.0
Unrouted tasks produced by older Karton versions won't be returned.
"""
task_keys = self.redis.scan_iter(
match=f"{KARTON_TASK_NAMESPACE}:{root_uid}:*", count=chunk_size
)
return self._iter_tasks(
task_keys, chunk_size=chunk_size, parse_resources=parse_resources
)

def get_all_tasks(
self, chunk_size: int = 1000, parse_resources: bool = True
) -> List[Task]:
"""
Get all tasks registered in Redis
Get multiple tasks for given identifier list as an iterator

.. warning::
This method loads all tasks into memory.
Expand All @@ -491,15 +520,15 @@ def register_task(self, task: Task, pipe: Optional[Pipeline] = None) -> None:
:param pipe: Optional pipeline object if operation is a part of pipeline
"""
rs = pipe or self.redis
rs.set(f"{KARTON_TASK_NAMESPACE}:{task.uid}", task.serialize())
rs.set(f"{KARTON_TASK_NAMESPACE}:{task.fquid}", task.serialize())

def register_tasks(self, tasks: List[Task]) -> None:
"""
Register or update multiple tasks in Redis.
:param tasks: List of task objects
"""
taskmap = {
f"{KARTON_TASK_NAMESPACE}:{task.uid}": task.serialize() for task in tasks
f"{KARTON_TASK_NAMESPACE}:{task.fquid}": task.serialize() for task in tasks
}
self.redis.mset(taskmap)

Expand Down Expand Up @@ -530,7 +559,7 @@ def delete_task(self, task: Task) -> None:

:param task: Task object
"""
self.redis.delete(f"{KARTON_TASK_NAMESPACE}:{task.uid}")
self.redis.delete(f"{KARTON_TASK_NAMESPACE}:{task.fquid}")

def delete_tasks(self, tasks: Iterable[Task], chunk_size: int = 1000) -> None:
"""
Expand All @@ -544,7 +573,7 @@ def delete_tasks(self, tasks: Iterable[Task], chunk_size: int = 1000) -> None:
:param tasks: List of Task objects
:param chunk_size: Size of chunks passed to the Redis DELETE command
"""
keys = [f"{KARTON_TASK_NAMESPACE}:{task.uid}" for task in tasks]
keys = [f"{KARTON_TASK_NAMESPACE}:{task.fquid}" for task in tasks]
for chunk in chunks(keys, chunk_size):
self.redis.delete(*chunk)

Expand All @@ -555,12 +584,12 @@ def get_task_queue(self, queue: str) -> List[Task]:
:param queue: Queue name
:return: List with Task objects contained in queue
"""
task_uids = self.redis.lrange(queue, 0, -1)
return self.get_tasks(task_uids)
task_fquids = self.redis.lrange(queue, 0, -1)
return self.get_tasks(task_fquids)

def get_task_ids_from_queue(self, queue: str) -> List[str]:
"""
Return all task UIDs in a queue
Return all task fquids in a queue

:param queue: Queue name
:return: List with task identifiers contained in queue
Expand Down Expand Up @@ -595,7 +624,7 @@ def produce_unrouted_task(self, task: Task) -> None:

:param task: Task object
"""
self.redis.rpush(KARTON_TASKS_QUEUE, task.uid)
self.redis.rpush(KARTON_TASKS_QUEUE, task.fquid)

def produce_routed_task(
self, identity: str, task: Task, pipe: Optional[Pipeline] = None
Expand All @@ -610,7 +639,7 @@ def produce_routed_task(
:param pipe: Optional pipeline object if operation is a part of pipeline
"""
rs = pipe or self.redis
rs.rpush(self.get_queue_name(identity, task.priority), task.uid)
rs.rpush(self.get_queue_name(identity, task.priority), task.fquid)

def consume_queues(
self, queues: Union[str, List[str]], timeout: int = 0
Expand All @@ -625,20 +654,6 @@ def consume_queues(
"""
return self.redis.blpop(queues, timeout=timeout)

def increment_multiple_metrics(
self, metric: KartonMetrics, increments: Dict[str, int]
) -> None:
"""
Increments metrics for multiple identities by given value via single pipeline
:param metric: Operation metric type
:param increments: Dictionary of Karton service identities and value
to add to the metric
"""
p = self.redis.pipeline()
for identity, increment in increments.items():
p.hincrby(metric.value, identity, increment)
p.execute()

def consume_queues_batch(self, queue: str, max_count: int) -> List[str]:
"""
Get a batch of items from the queue
Expand Down Expand Up @@ -783,6 +798,21 @@ def increment_metrics(
rs = pipe or self.redis
rs.hincrby(metric.value, identity, 1)

def increment_multiple_metrics(
self, metric: KartonMetrics, increments: Dict[str, int]
) -> None:
"""
Increments metrics for multiple identities by given value via single pipeline

:param metric: Operation metric type
:param increments: Dictionary of Karton service identities and value
to add to the metric
"""
p = self.redis.pipeline()
for identity, increment in increments.items():
p.hincrby(metric.value, identity, increment)
p.execute()

def increment_metrics_list(
self, metric: KartonMetrics, identities: List[str]
) -> None:
Expand Down Expand Up @@ -921,8 +951,8 @@ def check_bucket_exists(self, bucket: str, create: bool = False) -> bool:

def log_identity_output(self, identity: str, headers: Dict[str, Any]) -> None:
"""
Store the type of task outputted for given producer to
be used in tracking karton service connections.
Store the type of task outputted for given producer
to be used in tracking karton service connections.

:param identity: producer identity
:param headers: outputted headers
Expand Down
86 changes: 62 additions & 24 deletions karton/core/inspect.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections import defaultdict
from typing import Dict, List
from typing import Dict, List, Optional

from .backend import KartonBackend, KartonBind
from .task import Task, TaskState
Expand Down Expand Up @@ -119,30 +119,68 @@ class KartonState:
:param backend: :py:meth:`KartonBackend` object to use for data fetching
"""

def __init__(self, backend: KartonBackend) -> None:
def __init__(self, backend: KartonBackend, parse_resources: bool = True) -> None:
self.backend = backend
self.binds = {bind.identity: bind for bind in backend.get_binds()}
self.replicas = backend.get_online_consumers()
self.tasks = backend.get_all_tasks()
self.pending_tasks = [
task for task in self.tasks if task.status != TaskState.FINISHED
]

# Tasks grouped by root_uid
tasks_per_analysis = defaultdict(list)

for task in self.pending_tasks:
tasks_per_analysis[task.root_uid].append(task)

self.analyses = {
root_uid: KartonAnalysis(root_uid=root_uid, tasks=tasks, state=self)
for root_uid, tasks in tasks_per_analysis.items()
}
queues = get_queues_for_tasks(self.pending_tasks, self)
# Present registered queues without tasks
for bind_name, bind in self.binds.items():
if bind_name not in queues:
queues[bind_name] = KartonQueue(
bind=self.binds[bind_name], tasks=[], state=self
self.parse_resources = parse_resources

self._tasks: Optional[List[Task]] = None
self._pending_tasks: Optional[List[Task]] = None
self._analyses: Optional[Dict[str, KartonAnalysis]] = None
self._queues: Optional[Dict[str, KartonQueue]] = None

@property
def tasks(self) -> List[Task]:
if self._tasks is None:
self._tasks = self.backend.get_all_tasks(
parse_resources=self.parse_resources
)
return self._tasks

@property
def pending_tasks(self) -> List[Task]:
if self._pending_tasks is None:
self._pending_tasks = [
task for task in self.tasks if task.status != TaskState.FINISHED
]
return self._pending_tasks

@property
def analyses(self) -> Dict[str, KartonAnalysis]:
if self._analyses is None:
# Tasks grouped by root_uid
tasks_per_analysis = defaultdict(list)

for task in self.pending_tasks:
tasks_per_analysis[task.root_uid].append(task)

self._analyses = {
root_uid: KartonAnalysis(root_uid=root_uid, tasks=tasks, state=self)
for root_uid, tasks in tasks_per_analysis.items()
}
return self._analyses

@property
def queues(self) -> Dict[str, KartonQueue]:
if self._queues is None:
queues = get_queues_for_tasks(self.pending_tasks, self)
# Present registered queues without tasks
for bind_name, bind in self.binds.items():
if bind_name not in queues:
queues[bind_name] = KartonQueue(
bind=self.binds[bind_name], tasks=[], state=self
)
self._queues = queues
return self._queues

def get_analysis(self, root_uid: str) -> KartonAnalysis:
return KartonAnalysis(
root_uid=root_uid,
tasks=list(
self.backend.iter_task_tree(
root_uid, parse_resources=self.parse_resources
)
self.queues = queues
),
state=self,
)
Loading