Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for nested resources #186

Merged
merged 10 commits into from
Jul 25, 2022
22 changes: 22 additions & 0 deletions docs/task_headers_payloads.rst
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,28 @@ Each resource has its own metadata store where we can provide additional informa
"sha256": hashlib.sha256(sample_content).hexdigest()
})


Starting from v5.0.0, resources can be nested in other objects like lists or dictionaries.

.. code-block:: python

task = Task(
headers={
"type": "analysis",
"kind": "artifacts"
},
payload={
"artifacts": [
Resource("file1", content=file1),
Resource("file2", content=file2),
Resource("file3", content=file3)
]
"parent": sample # Reference to original (packed) sample
}
)
self.send_task(task)


More information about resources can be found in API documentation.

Directory resource objects
Expand Down
4 changes: 2 additions & 2 deletions karton/core/karton.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ def send_task(self, task: Task) -> bool:
task.headers.update({"origin": self.identity})

# Ensure all local resources have good buckets
for _, resource in task.iterate_resources():
for resource in task.iterate_resources():
if isinstance(resource, LocalResource) and not resource.bucket:
resource.bucket = self.backend.default_bucket_name

# Register new task
self.backend.register_task(task)

# Upload local resources
for _, resource in task.iterate_resources():
for resource in task.iterate_resources():
if isinstance(resource, LocalResource):
resource.upload(self.backend)

Expand Down
63 changes: 42 additions & 21 deletions karton/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Tuple, Union

from .resource import RemoteResource, ResourceBase
from .utils import recursive_iter, recursive_iter_with_keys, recursive_map

if TYPE_CHECKING:
from .backend import KartonBackend # noqa
Expand Down Expand Up @@ -41,8 +42,9 @@ class Task(object):
propagated from initial task like `payload_persistent`
:param parent_uid: Id of a routed task that has created this task by a karton with \
:py:meth:`.send_task`
:param root_uid: Id of a unrouted task that is the root of this tasks analysis tree
:param orig_uid: Id of a unrouted (or crashed routed) task that was forked to \
:param root_uid: Id of an unrouted task that is the root of this \
task's analysis tree
:param orig_uid: Id of an unrouted (or crashed routed) task that was forked to \
create this task
:param uid: This tasks unique identifier
:param error: Traceback of a exception that happened while performing this task
Expand Down Expand Up @@ -261,32 +263,42 @@ def walk_payload_bags(self) -> Iterator[Tuple[Dict[str, Any], str, Any]]:
for key, value in payload_bag.items():
yield payload_bag, key, value

def iterate_resources(self) -> Iterator[Tuple[str, ResourceBase]]:
def walk_payload_values(self) -> Iterator[Tuple[str, Any]]:
psrok1 marked this conversation as resolved.
Show resolved Hide resolved
"""
Get list of resource objects bound to Task
Iterate over all payload values

Generates tuples (key, value)
Generates tuples (path, value).

:return: An iterator over all task resources
:return: An iterator over all task payload values
"""
for _, key, value in self.walk_payload_bags():
if isinstance(value, ResourceBase):
yield key, value
yield from recursive_iter_with_keys(self.payload, "payload")
yield from recursive_iter_with_keys(
self.payload_persistent, "payload_persistent"
)

def unserialize_resources(self, backend: Optional["KartonBackend"]) -> None:
def transform_payload_bags(self, func):
psrok1 marked this conversation as resolved.
Show resolved Hide resolved
"""
Transforms __karton_resource__ serialized entries into
RemoteResource object instances

:param backend: KartonBackend to use while unserializing resources
Recursively transform contents of all payload bags and payloads
contained in them

:meta private:
"""
for payload_bag, key, value in self.walk_payload_bags():
if isinstance(value, dict) and "__karton_resource__" in value:
payload_bag[key] = RemoteResource.from_dict(
value["__karton_resource__"], backend
)
self.payload, self.payload_persistent = recursive_map(
func, [self.payload, self.payload_persistent]
)

def iterate_resources(self) -> Iterator[ResourceBase]:
"""
Get list of resource objects bound to Task

.. versionchanged: 5.0.0
Returns Resource values instead of tuples (key, value)

:return: An iterator over all task resources
"""
for element in recursive_iter([self.payload, self.payload_persistent]):
if isinstance(element, ResourceBase):
yield element

@staticmethod
def unserialize(
Expand All @@ -301,10 +313,20 @@ def unserialize(

:meta private:
"""

def unserialize_resources(value):
psrok1 marked this conversation as resolved.
Show resolved Hide resolved
"""
Transforms __karton_resource__ serialized entries into
RemoteResource object instances
"""
if isinstance(value, dict) and "__karton_resource__" in value:
return RemoteResource.from_dict(value["__karton_resource__"], backend)
return value

if not isinstance(data, str):
data = data.decode("utf8")

task_data = json.loads(data)
task_data = json.loads(data, object_hook=unserialize_resources)

task = Task(task_data["headers"])
task.uid = task_data["uid"]
Expand All @@ -324,7 +346,6 @@ def unserialize(
task.last_update = task_data.get("last_update", None)
task.payload = task_data["payload"]
task.payload_persistent = task_data["payload_persistent"]
task.unserialize_resources(backend)
return task

def __repr__(self) -> str:
Expand Down
69 changes: 32 additions & 37 deletions karton/core/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,45 +169,37 @@ def assertResourceEqual(
"Resource content mismatch in {}".format(resource_name),
)

def assertPayloadBagEqual(
self, payload: Dict[str, Any], expected: Dict[str, Any], payload_bag_name: str
) -> None:
"""
Assert that two payload bags are equal

:param payload: Result payload bag
:param expected: Expected payload bag
:param payload_bag_name: Bag name
"""
self.assertSetEqual(
set(payload.keys()),
set(expected.keys()),
"Incorrect fields set in {}".format(payload_bag_name),
)
for key, value in payload.items():
other_value = expected[key]
path = "{}.{}".format(payload_bag_name, key)
if not isinstance(value, ResourceBase):
self.assertEqual(
value,
other_value,
"Incorrect value of {}".format(path),
)
else:
self.assertResourceEqual(value, other_value, path)

def assertTaskEqual(self, task: Task, expected: Task) -> None:
"""
Assert that two tasks objects are equal
Assert that two task objects are equal

:param task: Result task
:param expected: Expected task
"""
self.assertDictEqual(task.headers, expected.headers, "Headers mismatch")
self.assertPayloadBagEqual(task.payload, expected.payload, "payload")
self.assertPayloadBagEqual(
task.payload_persistent, expected.payload_persistent, "payload_persistent"
# Get paths and values sorted by path
task_payload_values = sorted(task.walk_payload_values(), key=lambda el: el[0])
expected_payload_values = sorted(
expected.walk_payload_values(), key=lambda el: el[0]
)
self.assertSetEqual(
set(path for path, _ in task_payload_values),
set(path for path, _ in expected_payload_values),
)
for task_payload, expected_payload in zip(
task_payload_values, expected_payload_values
):
task_key, task_value = task_payload
expected_key, expected_value = expected_payload
assert (
task_key == expected_key
) # If not true, there is something wrong with this test routine
if not isinstance(task_value, ResourceBase):
self.assertEqual(
task_value, expected_value, "Incorrect value of {}".format(task_key)
)
else:
self.assertResourceEqual(task_value, expected_value, task_key)

def assertTasksEqual(self, tasks: List[Task], expected: List[Task]) -> None:
"""
Expand All @@ -228,15 +220,17 @@ def _process_task(self, incoming_task: Task):
task = incoming_task.fork_task()
task.status = TaskState.STARTED
task.headers.update({"receiver": self.karton.identity})
for payload_bag, key, resource in task.walk_payload_bags():
if not isinstance(resource, ResourceBase):
continue
if not isinstance(resource, LocalResource):

def local_resource_to_remote(obj):
if not isinstance(obj, ResourceBase):
return obj
if not isinstance(obj, LocalResource):
raise ValueError("Test task must contain only LocalResource objects")
backend = cast(KartonBackend, self.backend)
resource = cast(LocalResource, obj)
resource.bucket = backend.default_bucket_name
resource.upload(backend)
remote_resource = RemoteResource(
return RemoteResource(
name=resource.name,
bucket=resource.bucket,
metadata=resource.metadata,
Expand All @@ -246,7 +240,8 @@ def _process_task(self, incoming_task: Task):
sha256=resource.sha256,
_flags=resource._flags,
)
payload_bag[key] = remote_resource

task.transform_payload_bags(local_resource_to_remote)
return task

def run_task(self, task: Task) -> List[Task]:
Expand Down
51 changes: 50 additions & 1 deletion karton/core/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import functools
import signal
from contextlib import contextmanager
from typing import Any, Callable, Iterator, Sequence, TypeVar
from typing import Any, Callable, Iterator, Sequence, Tuple, TypeVar

from .exceptions import HardShutdownInterrupt, TaskTimeoutError

Expand All @@ -12,6 +12,55 @@ def chunks(seq: Sequence[T], size: int) -> Iterator[Sequence[T]]:
return (seq[pos : pos + size] for pos in range(0, len(seq), size))


def recursive_iter(obj) -> Iterator[Any]:
psrok1 marked this conversation as resolved.
Show resolved Hide resolved
"""
Yields all values recursively from nested list/dict structures

:param obj: Object to iterate over
"""
if isinstance(obj, (list, tuple, set)):
psrok1 marked this conversation as resolved.
Show resolved Hide resolved
for elem in obj:
yield from recursive_iter(elem)
elif isinstance(obj, dict):
for elem in obj.values():
yield from recursive_iter(elem)
else:
yield obj


def recursive_iter_with_keys(obj, name="") -> Iterator[Tuple[str, Any]]:
psrok1 marked this conversation as resolved.
Show resolved Hide resolved
"""
Yields (path, value) tuples recursively from nested list/dict structures

:param obj: Object to iterate over
:param name: Object name
"""
if isinstance(obj, (list, tuple, set)):
for idx, elem in enumerate(obj):
yield from recursive_iter_with_keys(elem, name=f"{name}.{idx}")
elif isinstance(obj, dict):
for key, elem in obj.items():
yield from recursive_iter_with_keys(elem, name=f"{name}.{key}")
else:
yield name, obj


def recursive_map(func: Callable[[Any], Any], obj: Any) -> Any:
"""
Returns copy of collection with recursively mapped elements

:param func: Mapping function
:param obj: Object to iterate over
"""
mapped_obj = func(obj)
if isinstance(mapped_obj, (list, tuple, set)):
return [recursive_map(func, elem) for elem in mapped_obj]
elif isinstance(mapped_obj, dict):
return {key: recursive_map(func, elem) for key, elem in mapped_obj.items()}
else:
return mapped_obj


@contextmanager
def timeout(wait_for: int):
def throw_timeout(signum: int, frame: Any) -> None:
Expand Down
2 changes: 1 addition & 1 deletion karton/system/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def gc_collect_resources(self) -> None:
# Task is created before resource upload to lock the reference to the resource.
tasks = self.backend.get_all_tasks()
for task in tasks:
for _, resource in task.iterate_resources():
for resource in task.iterate_resources():
# If resource is referenced by task: remove it from set
if (
resource.bucket == karton_bucket
Expand Down
Empty file.
26 changes: 26 additions & 0 deletions tests/nested_resources_test_case/multi_reverser_karton.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from karton.core import Karton, Task, Resource


class MultiReverserKarton(Karton):
identity = "karton.multi-reverser"
filters = [
{
"type": "multi-reverse-task"
}
]

def process(self, task: Task):
input_files = task.get_payload("files")
output_files = [
Resource(file.name, content=file.content[::-1])
for file in input_files
]
result = Task(
headers={
"type": "multi-reverse-result"
},
payload={
"files": output_files
}
)
self.send_task(result)
Loading