Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for nested resources #189

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 10 additions & 32 deletions karton/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Tuple, Union

from .resource import RemoteResource, ResourceBase
from .utils import walk_resources

if TYPE_CHECKING:
from .backend import KartonBackend # noqa
Expand Down Expand Up @@ -249,18 +250,6 @@ def default(kself, obj: Any):
sort_keys=True,
)

def walk_payload_bags(self) -> Iterator[Tuple[Dict[str, Any], str, Any]]:
"""
Iterate over all payload bags and payloads contained in them

Generates tuples (payload_bag, key, value)

:return: An iterator over all task payload bags
"""
for payload_bag in [self.payload, self.payload_persistent]:
for key, value in payload_bag.items():
yield payload_bag, key, value

def iterate_resources(self) -> Iterator[Tuple[str, ResourceBase]]:
"""
Get list of resource objects bound to Task
Expand All @@ -269,24 +258,8 @@ def iterate_resources(self) -> Iterator[Tuple[str, ResourceBase]]:

:return: An iterator over all task resources
"""
for _, key, value in self.walk_payload_bags():
if isinstance(value, ResourceBase):
yield key, value

def unserialize_resources(self, backend: Optional["KartonBackend"]) -> None:
"""
Transforms __karton_resource__ serialized entries into
RemoteResource object instances

:param backend: KartonBackend to use while unserializing resources

:meta private:
"""
for payload_bag, key, value in self.walk_payload_bags():
if isinstance(value, dict) and "__karton_resource__" in value:
payload_bag[key] = RemoteResource.from_dict(
value["__karton_resource__"], backend
)
for payload_bag in [self.payload, self.payload_persistent]:
yield from walk_resources(payload_bag)

@staticmethod
def unserialize(
Expand All @@ -301,10 +274,16 @@ def unserialize(

:meta private:
"""

def unserialize_resources(value):
if isinstance(value, dict) and "__karton_resource__" in value:
return RemoteResource.from_dict(value["__karton_resource__"], backend)
return value

if not isinstance(data, str):
data = data.decode("utf8")

task_data = json.loads(data)
task_data = json.loads(data, object_hook=unserialize_resources)

task = Task(task_data["headers"])
task.uid = task_data["uid"]
Expand All @@ -324,7 +303,6 @@ def unserialize(
task.last_update = task_data.get("last_update", None)
task.payload = task_data["payload"]
task.payload_persistent = task_data["payload_persistent"]
task.unserialize_resources(backend)
return task

def __repr__(self) -> str:
Expand Down
44 changes: 25 additions & 19 deletions karton/core/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .config import Config
from .resource import LocalResource, RemoteResource, ResourceBase
from .task import Task, TaskState
from .utils import walk_resources

__all__ = ["KartonTestCase", "mock"]

Expand Down Expand Up @@ -228,25 +229,30 @@ def _process_task(self, incoming_task: Task):
task = incoming_task.fork_task()
task.status = TaskState.STARTED
task.headers.update({"receiver": self.karton.identity})
for payload_bag, key, resource in task.walk_payload_bags():
if not isinstance(resource, ResourceBase):
continue
if not isinstance(resource, LocalResource):
raise ValueError("Test task must contain only LocalResource objects")
backend = cast(KartonBackend, self.backend)
resource.bucket = backend.default_bucket_name
resource.upload(backend)
remote_resource = RemoteResource(
name=resource.name,
bucket=resource.bucket,
metadata=resource.metadata,
uid=resource.uid,
size=resource.size,
backend=backend,
sha256=resource.sha256,
_flags=resource._flags,
)
payload_bag[key] = remote_resource

for payload_bag in [task.payload, task.payload_persistent]:
for key, resource in walk_resources(payload_bag):

if not isinstance(resource, ResourceBase):
continue
if not isinstance(resource, LocalResource):
raise ValueError(
"Test task must contain only LocalResource objects"
)
backend = cast(KartonBackend, self.backend)
resource.bucket = backend.default_bucket_name
resource.upload(backend)
remote_resource = RemoteResource(
name=resource.name,
bucket=resource.bucket,
metadata=resource.metadata,
uid=resource.uid,
size=resource.size,
backend=backend,
sha256=resource.sha256,
_flags=resource._flags,
)
payload_bag[key] = remote_resource
return task

def run_task(self, task: Task) -> List[Task]:
Expand Down
17 changes: 16 additions & 1 deletion karton/core/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import functools
import signal
from contextlib import contextmanager
from typing import Any, Callable, Iterator, Sequence, TypeVar
from typing import Any, Callable, Iterator, Sequence, Tuple, TypeVar

from .exceptions import HardShutdownInterrupt, TaskTimeoutError
from .resource import ResourceBase

T = TypeVar("T")

Expand All @@ -12,6 +13,20 @@ def chunks(seq: Sequence[T], size: int) -> Iterator[Sequence[T]]:
return (seq[pos : pos + size] for pos in range(0, len(seq), size))


def walk_resources(obj: Any) -> Iterator[Tuple[str, ResourceBase]]:
q = [(None, obj)]
while q:
last_name, obj = q.pop()
if isinstance(obj, ResourceBase):
yield last_name, obj

elif isinstance(obj, (list, tuple, set)):
q += [(last_name, o) for o in obj]

elif isinstance(obj, dict):
q += [(k, v) for k, v in obj.items()]


@contextmanager
def timeout(wait_for: int):
def throw_timeout(signum: int, frame: Any) -> None:
Expand Down