Skip to content

Commit

Permalink
Added get_outputs method (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
yankovs authored Apr 4, 2022
1 parent acc3aa8 commit 79ffabd
Showing 1 changed file with 31 additions and 1 deletion.
32 changes: 31 additions & 1 deletion karton/core/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
from collections import defaultdict, namedtuple
from io import BytesIO
from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Tuple, Union
from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Set, Tuple, Union

from minio import Minio
from minio.deleteobjects import DeleteObject
Expand All @@ -27,6 +27,9 @@
)


KartonOutputs = namedtuple("KartonOutputs", ["identity", "outputs"])


class KartonMetrics(enum.Enum):
TASK_PRODUCED = "karton.metrics.produced"
TASK_CONSUMED = "karton.metrics.consumed"
Expand Down Expand Up @@ -151,6 +154,18 @@ def unserialize_bind(identity: str, bind_data: str) -> KartonBind:
service_version=bind.get("service_version"),
)

@staticmethod
def unserialize_output(identity: str, output_data: Set[str]) -> KartonOutputs:
"""
Deserialize KartonOutputs object for given identity.
:param identity: Karton service identity
:param output_data: Serialized output data
:return: KartonOutputs object with outputs definition
"""
output = [json.loads(output_type) for output_type in output_data]
return KartonOutputs(identity=identity, outputs=output)

def get_bind(self, identity: str) -> KartonBind:
"""
Get bind object for given identity
Expand Down Expand Up @@ -643,5 +658,20 @@ def log_identity_output(self, identity: str, headers: Dict[str, Any]) -> None:
self.redis.sadd(f"{KARTON_OUTPUTS_NAMESPACE}:{identity}", json.dumps(headers))
self.redis.expire(f"{KARTON_OUTPUTS_NAMESPACE}:{identity}", 60 * 60)

def get_outputs(self) -> List[KartonOutputs]:
"""
Get a list of the output types for each karton.
:return: List of KartonOutputs
"""

output_keys = self.redis.keys(f"{KARTON_OUTPUTS_NAMESPACE}:*")
return [
self.unserialize_output(
identity.split(":")[1], self.redis.smembers(identity)
)
for identity in output_keys
]

def make_pipeline(self, transaction: bool = False) -> Pipeline:
return self.redis.pipeline(transaction=transaction)

0 comments on commit 79ffabd

Please sign in to comment.