Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task tree and iterators: backend performance improvements #207

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
minor: [7, 8, 9]
minor: [8, 9, 10, 11]
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.7
FROM python:3.11

WORKDIR /app/service
COPY ./requirements.txt ./requirements.txt
Expand Down
16 changes: 16 additions & 0 deletions docs/karton_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,19 @@ karton.core.Config

.. autoclass:: karton.core.config.Config
:members:


Low-level Karton access
-----------------------

.. autoclass:: karton.core.backend.KartonBackend
:members:

.. autoclass:: karton.core.inspect.KartonState
:members:

.. autoclass:: karton.core.inspect.KartonAnalysis
:members:

.. autoclass:: karton.core.inspect.KartonQueue
:members:
190 changes: 163 additions & 27 deletions karton/core/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from .exceptions import InvalidIdentityError
from .task import Task, TaskPriority, TaskState
from .utils import chunks
from .utils import chunks, chunks_iter

KARTON_TASKS_QUEUE = "karton.tasks"
KARTON_OPERATIONS_QUEUE = "karton.operations"
Expand Down Expand Up @@ -365,51 +365,154 @@ def get_online_services(self) -> List[KartonServiceInfo]:
continue
return bound_services

def get_task(self, task_uid: str) -> Optional[Task]:
def get_task(self, task_fquid: str) -> Optional[Task]:
"""
Get task object with given identifier

:param task_uid: Task identifier
:param task_fquid: Task fully-qualified identifier
:return: Task object
"""
task_data = self.redis.get(f"{KARTON_TASK_NAMESPACE}:{task_uid}")
task_data = self.redis.get(f"{KARTON_TASK_NAMESPACE}:{task_fquid}")
if not task_data:
return None
return Task.unserialize(task_data, backend=self)

def get_tasks(self, task_uid_list: List[str], chunk_size: int = 1000) -> List[Task]:
def get_tasks(
self,
task_fquid_list: List[str],
chunk_size: int = 1000,
parse_resources: bool = True,
) -> List[Task]:
"""
Get multiple tasks for given identifier list

:param task_uid_list: List of task identifiers
:param task_fquid_list: List of task fully-qualified identifiers
:param chunk_size: Size of chunks passed to the Redis MGET command
:param parse_resources: If set to False, resources are not parsed.
It speeds up deserialization. Read :py:meth:`Task.unserialize`
documentation to learn more.
:return: List of task objects
"""
keys = chunks(
[f"{KARTON_TASK_NAMESPACE}:{task_uid}" for task_uid in task_uid_list],
[f"{KARTON_TASK_NAMESPACE}:{task_fquid}" for task_fquid in task_fquid_list],
chunk_size,
)
return [
Task.unserialize(task_data, backend=self)
if parse_resources
else Task.unserialize(task_data, parse_resources=False)
for chunk in keys
for task_data in self.redis.mget(chunk)
if task_data is not None
]

def get_all_tasks(self, chunk_size: int = 1000) -> List[Task]:
def _iter_tasks(
self,
task_keys: Iterator[str],
chunk_size: int = 1000,
parse_resources: bool = True,
) -> Iterator[Task]:
for chunk in chunks_iter(task_keys, chunk_size):
yield from (
Task.unserialize(task_data, backend=self)
if parse_resources
else Task.unserialize(task_data, parse_resources=False)
for task_data in self.redis.mget(chunk)
if task_data is not None
)

def iter_tasks(
self,
task_fquid_list: Iterable[str],
chunk_size: int = 1000,
parse_resources: bool = True,
) -> Iterator[Task]:
"""
Get all tasks registered in Redis
Get multiple tasks for given identifier list as an iterator

:param task_fquid_list: List of task fully-qualified identifiers
:param chunk_size: Size of chunks passed to the Redis MGET command
:param parse_resources: If set to False, resources are not parsed.
It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation
to learn more.
:return: Iterator with task objects
"""
return self._iter_tasks(
map(
lambda task_fquid: f"{KARTON_TASK_NAMESPACE}:{task_fquid}",
task_fquid_list,
),
chunk_size=chunk_size,
parse_resources=parse_resources,
)

def iter_task_tree(
self, root_uid: str, chunk_size: int = 1000, parse_resources: bool = True
) -> Iterator[Task]:
"""
Iterates all tasks that belong to the same analysis task tree
and have the same root_uid

:param root_uid: Root identifier of task tree
:param chunk_size: Size of chunks passed to the Redis SCAN and MGET command
:param parse_resources: If set to False, resources are not parsed.
It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation
to learn more.
:return: Iterator with task objects

.. note::
This method processes only these tasks that are stored under
karton.task:<root_uid>:<task_uid> key format which is fully-qualified
identifier introduced in Karton 5.2.0

Requires karton-system to be upgraded to Karton 5.2.0
Unrouted tasks produced by older Karton versions won't be returned.
"""
task_keys = self.redis.scan_iter(
match=f"{KARTON_TASK_NAMESPACE}:{root_uid}:*", count=chunk_size
)
return self._iter_tasks(
task_keys, chunk_size=chunk_size, parse_resources=parse_resources
)

def iter_all_tasks(
self, chunk_size: int = 1000, parse_resources: bool = True
) -> Iterator[Task]:
"""
Iterates all tasks registered in Redis

:param chunk_size: Size of chunks passed to the Redis SCAN and MGET command
:param parse_resources: If set to False, resources are not parsed.
It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation
to learn more.
:return: Iterator with Task objects
"""
task_keys = self.redis.scan_iter(
match=f"{KARTON_TASK_NAMESPACE}:*", count=chunk_size
)
return self._iter_tasks(
task_keys, chunk_size=chunk_size, parse_resources=parse_resources
)

def get_all_tasks(
self, chunk_size: int = 1000, parse_resources: bool = True
) -> List[Task]:
"""
Get all tasks registered in Redis

.. warning::
This method loads all tasks into memory.
Use :py:meth:`iter_all_tasks` instead.

:param chunk_size: Size of chunks passed to the Redis SCAN and MGET command
:param parse_resources: If set to False, resources are not parsed.
It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation
to learn more.
:return: List with Task objects
"""
tasks = self.redis.keys(f"{KARTON_TASK_NAMESPACE}:*")
return [
Task.unserialize(task_data)
for chunk in chunks(tasks, chunk_size)
for task_data in self.redis.mget(chunk)
if task_data is not None
]
return list(
self.iter_all_tasks(chunk_size=chunk_size, parse_resources=parse_resources)
)

def register_task(self, task: Task, pipe: Optional[Pipeline] = None) -> None:
"""
Expand All @@ -419,15 +522,15 @@ def register_task(self, task: Task, pipe: Optional[Pipeline] = None) -> None:
:param pipe: Optional pipeline object if operation is a part of pipeline
"""
rs = pipe or self.redis
rs.set(f"{KARTON_TASK_NAMESPACE}:{task.uid}", task.serialize())
rs.set(f"{KARTON_TASK_NAMESPACE}:{task.fquid}", task.serialize())

def register_tasks(self, tasks: List[Task]) -> None:
"""
Register or update multiple tasks in Redis.
:param tasks: List of task objects
"""
taskmap = {
f"{KARTON_TASK_NAMESPACE}:{task.uid}": task.serialize() for task in tasks
f"{KARTON_TASK_NAMESPACE}:{task.fquid}": task.serialize() for task in tasks
}
self.redis.mset(taskmap)

Expand All @@ -451,18 +554,28 @@ def delete_task(self, task: Task) -> None:
"""
Remove task from Redis

.. warning::

Used internally by karton.system. This method doesn't properly
unassign routed tasks, so it shouldn't be used without care.
If you want to cancel task: mark it as finished and let it be deleted
by karton.system.

:param task: Task object
"""
self.redis.delete(f"{KARTON_TASK_NAMESPACE}:{task.uid}")
self.redis.delete(f"{KARTON_TASK_NAMESPACE}:{task.fquid}")

def delete_tasks(self, tasks: Iterable[Task], chunk_size: int = 1000) -> None:
"""
Remove multiple tasks from Redis

.. warning::
Before use, read warning in :py:meth:`delete_task` method documentation

:param tasks: List of Task objects
:param chunk_size: Size of chunks passed to the Redis DELETE command
"""
keys = [f"{KARTON_TASK_NAMESPACE}:{task.uid}" for task in tasks]
keys = [f"{KARTON_TASK_NAMESPACE}:{task.fquid}" for task in tasks]
for chunk in chunks(keys, chunk_size):
self.redis.delete(*chunk)

Expand All @@ -473,18 +586,26 @@ def get_task_queue(self, queue: str) -> List[Task]:
:param queue: Queue name
:return: List with Task objects contained in queue
"""
task_uids = self.redis.lrange(queue, 0, -1)
return self.get_tasks(task_uids)
task_fquids = self.redis.lrange(queue, 0, -1)
return self.get_tasks(task_fquids)

def get_task_ids_from_queue(self, queue: str) -> List[str]:
"""
Return all task UIDs in a queue
Return all task fquids in a queue

:param queue: Queue name
:return: List with task identifiers contained in queue
"""
return self.redis.lrange(queue, 0, -1)

def delete_consumer_queues(self, identity: str) -> None:
"""
Deletes consumer queues for given identity

:param identity: Consumer identity
"""
self.redis.delete(*self.get_queue_names(identity))

def remove_task_queue(self, queue: str) -> List[Task]:
"""
Remove task queue with all contained tasks
Expand All @@ -505,7 +626,7 @@ def produce_unrouted_task(self, task: Task) -> None:

:param task: Task object
"""
self.redis.rpush(KARTON_TASKS_QUEUE, task.uid)
self.redis.rpush(KARTON_TASKS_QUEUE, task.fquid)

def produce_routed_task(
self, identity: str, task: Task, pipe: Optional[Pipeline] = None
Expand All @@ -520,7 +641,7 @@ def produce_routed_task(
:param pipe: Optional pipeline object if operation is a part of pipeline
"""
rs = pipe or self.redis
rs.rpush(self.get_queue_name(identity, task.priority), task.uid)
rs.rpush(self.get_queue_name(identity, task.priority), task.fquid)

def consume_queues(
self, queues: Union[str, List[str]], timeout: int = 0
Expand Down Expand Up @@ -679,6 +800,21 @@ def increment_metrics(
rs = pipe or self.redis
rs.hincrby(metric.value, identity, 1)

def increment_multiple_metrics(
self, metric: KartonMetrics, increments: Dict[str, int]
) -> None:
"""
Increments metrics for multiple identities by given value via single pipeline

:param metric: Operation metric type
:param increments: Dictionary of Karton service identities and value
to add to the metric
"""
p = self.redis.pipeline()
for identity, increment in increments.items():
p.hincrby(metric.value, identity, increment)
p.execute()

def increment_metrics_list(
self, metric: KartonMetrics, identities: List[str]
) -> None:
Expand Down Expand Up @@ -817,8 +953,8 @@ def check_bucket_exists(self, bucket: str, create: bool = False) -> bool:

def log_identity_output(self, identity: str, headers: Dict[str, Any]) -> None:
"""
Store the type of task outputted for given producer to
be used in tracking karton service connections.
Store the type of task outputted for given producer
to be used in tracking karton service connections.

:param identity: producer identity
:param headers: outputted headers
Expand Down
Loading