Skip to content

Commit

Permalink
Extended service information for non-consumer services (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
psrok1 authored Feb 28, 2023
1 parent f8727ef commit 46e39ef
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 24 deletions.
138 changes: 130 additions & 8 deletions karton/core/backend.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import dataclasses
import enum
import json
import logging
import time
import urllib.parse
import warnings
from collections import defaultdict, namedtuple
from typing import IO, Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union
Expand All @@ -10,6 +13,7 @@
from redis.client import Pipeline
from urllib3.response import HTTPResponse

from .exceptions import InvalidIdentityError
from .task import Task, TaskPriority, TaskState
from .utils import chunks

Expand All @@ -20,14 +24,14 @@
KARTON_TASK_NAMESPACE = "karton.task"
KARTON_OUTPUTS_NAMESPACE = "karton.outputs"


KartonBind = namedtuple(
"KartonBind",
["identity", "info", "version", "persistent", "filters", "service_version"],
)


KartonOutputs = namedtuple("KartonOutputs", ["identity", "outputs"])
logger = logging.getLogger("karton.core.backend")


class KartonMetrics(enum.Enum):
Expand All @@ -38,33 +42,118 @@ class KartonMetrics(enum.Enum):
TASK_GARBAGE_COLLECTED = "karton.metrics.garbage-collected"


@dataclasses.dataclass(frozen=True, order=True)
class KartonServiceInfo:
"""
Extended Karton service information.
Instances of this dataclass are meant to be aggregated to count service replicas
in Karton Dashboard. They're considered equal if identity and versions strings
are the same.
"""

identity: str = dataclasses.field(metadata={"serializable": False})
karton_version: str
service_version: Optional[str] = None
# Extra information about Redis client
redis_client_info: Optional[Dict[str, str]] = dataclasses.field(
default=None, hash=False, compare=False, metadata={"serializable": False}
)

def make_client_name(self) -> str:
included_keys = [
field.name
for field in dataclasses.fields(self)
if field.metadata.get("serializable", True)
]
params = {
k: v
for k, v in dataclasses.asdict(self).items()
if k in included_keys and v is not None
}
return f"{self.identity}?{urllib.parse.urlencode(params)}"

@classmethod
def parse_client_name(
cls, client_name: str, redis_client_info: Optional[Dict[str, str]] = None
) -> "KartonServiceInfo":
included_keys = [
field.name
for field in dataclasses.fields(cls)
if field.metadata.get("serializable", True)
]
identity, params_string = client_name.split("?", 1)
# Filter out unknown params to not get crashed by future extensions
params = dict(
[
(key, value)
for key, value in urllib.parse.parse_qsl(params_string)
if key in included_keys
]
)
return KartonServiceInfo(
identity, redis_client_info=redis_client_info, **params
)


class KartonBackend:
def __init__(self, config, identity: Optional[str] = None) -> None:
def __init__(
self,
config,
identity: Optional[str] = None,
service_info: Optional[KartonServiceInfo] = None,
) -> None:
self.config = config

if identity is not None:
self._validate_identity(identity)
self.identity = identity
self.redis = self.make_redis(config, identity=identity)

self.service_info = service_info
self.redis = self.make_redis(
config, identity=identity, service_info=service_info
)
self.s3 = boto3.client(
"s3",
endpoint_url=config["s3"]["address"],
aws_access_key_id=config["s3"]["access_key"],
aws_secret_access_key=config["s3"]["secret_key"],
)

def make_redis(self, config, identity: Optional[str] = None) -> StrictRedis:
@staticmethod
def _validate_identity(identity: str):
disallowed_chars = [" ", "?"]
if any(disallowed_char in identity for disallowed_char in disallowed_chars):
raise InvalidIdentityError(
f"Karton identity should not contain {disallowed_chars}"
)

@staticmethod
def make_redis(
config,
identity: Optional[str] = None,
service_info: Optional[KartonServiceInfo] = None,
) -> StrictRedis:
"""
Create and test a Redis connection.
:param config: The karton configuration
:param identity: Karton service identity
:param service_info: Additional service identity metadata
:return: Redis connection
"""
if service_info is not None:
client_name: Optional[str] = service_info.make_client_name()
else:
client_name = identity

redis_args = {
"host": config["redis"]["host"],
"port": config.getint("redis", "port", 6379),
"db": config.getint("redis", "db", 0),
"username": config.get("redis", "username"),
"password": config.get("redis", "password"),
"client_name": identity,
"client_name": client_name,
# set socket_timeout to None if set to 0
"socket_timeout": config.getint("redis", "socket_timeout", 30) or None,
"decode_responses": True,
Expand Down Expand Up @@ -232,17 +321,50 @@ def set_consumer_identity(self, _: str) -> None:
DeprecationWarning,
)

def get_online_consumers(self) -> Dict[str, List[str]]:
def get_online_consumers(self) -> Dict[str, List[Dict[str, str]]]:
"""
Gets all online consumer identities
Gets all online identities.
Actually this method returns all services having an identity,
so the list is not limited to consumers.
:return: Dictionary {identity: [list of clients]}
"""
bound_identities = defaultdict(list)
for client in self.redis.client_list():
bound_identities[client["name"]].append(client)
name = client["name"]
# Strip extra service information from client name
if "?" in name:
name, _ = name.split("?", 1)
bound_identities[name].append(client)
return bound_identities

def get_online_services(self) -> List[KartonServiceInfo]:
"""
Gets all online services providing extended service information.
Consumers by default don't provide that information and it's included in binds
instead. If you want to get information about all services, use
:py:meth:`KartonBackend.get_online_consumers`.
.. versionadded:: 5.1.0
:return: List of KartonServiceInfo objects
"""
bound_services = []
for client in self.redis.client_list():
name = client["name"]
if "?" in name:
try:
service_info = KartonServiceInfo.parse_client_name(
name, redis_client_info=client
)
bound_services.append(service_info)
except Exception:
logger.exception("Fatal error while parsing client name: %s", name)
continue
return bound_services

def get_task(self, task_uid: str) -> Optional[Task]:
"""
Get task object with given identifier
Expand Down
27 changes: 23 additions & 4 deletions karton/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from contextlib import contextmanager
from typing import Optional, Union, cast

from .backend import KartonBackend
from .__version__ import __version__
from .backend import KartonBackend, KartonServiceInfo
from .config import Config
from .logger import KartonLogHandler
from .task import Task
Expand All @@ -20,8 +21,12 @@ class KartonBase(abc.ABC):
attribute.
"""

identity = ""
#: Karton service identity
identity: str = ""
#: Karton service version
version: Optional[str] = None
#: Include extended service information for non-consumer services
with_service_info: bool = False

def __init__(
self,
Expand All @@ -39,7 +44,17 @@ def __init__(
if self.config.has_option("karton", "identity"):
self.identity = self.config.get("karton", "identity")

self.backend = backend or KartonBackend(self.config, identity=self.identity)
self.service_info = None
if self.identity is not None and self.with_service_info:
self.service_info = KartonServiceInfo(
identity=self.identity,
karton_version=__version__,
service_version=self.version,
)

self.backend = backend or KartonBackend(
self.config, identity=self.identity, service_info=self.service_info
)

self._log_handler = KartonLogHandler(
backend=self.backend, channel=self.identity
Expand Down Expand Up @@ -190,7 +205,11 @@ def __init__(
identity: Optional[str] = None,
backend: Optional[KartonBackend] = None,
) -> None:
super().__init__(config=config, identity=identity, backend=backend)
super().__init__(
config=config,
identity=identity,
backend=backend,
)
self.setup_logger()
self._shutdown = False

Expand Down
4 changes: 4 additions & 0 deletions karton/core/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
class InvalidIdentityError(Exception):
pass


class TaskTimeoutError(Exception):
pass

Expand Down
15 changes: 5 additions & 10 deletions karton/core/karton.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ def __init__(
identity: Optional[str] = None,
backend: Optional[KartonBackend] = None,
) -> None:
super(Producer, self).__init__(
config=config, identity=identity, backend=backend
)
super().__init__(config=config, identity=identity, backend=backend)

def send_task(self, task: Task) -> bool:
"""
Expand Down Expand Up @@ -117,9 +115,7 @@ def __init__(
identity: Optional[str] = None,
backend: Optional[KartonBackend] = None,
) -> None:
super(Consumer, self).__init__(
config=config, identity=identity, backend=backend
)
super().__init__(config=config, identity=identity, backend=backend)

if self.filters is None:
raise ValueError("Cannot bind consumer on Empty binds")
Expand Down Expand Up @@ -358,16 +354,15 @@ class LogConsumer(KartonServiceBase):

logger_filter: Optional[str] = None
level: Optional[str] = None
with_service_info = True

def __init__(
self,
config: Optional[Config] = None,
identity: Optional[str] = None,
backend: Optional[KartonBackend] = None,
) -> None:
super(LogConsumer, self).__init__(
config=config, identity=identity, backend=backend
)
super().__init__(config=config, identity=identity, backend=backend)

@abc.abstractmethod
def process_log(self, event: Dict[str, Any]) -> None:
Expand Down Expand Up @@ -419,7 +414,7 @@ def __init__(
identity: Optional[str] = None,
backend: Optional[KartonBackend] = None,
) -> None:
super(Karton, self).__init__(config=config, identity=identity, backend=backend)
super().__init__(config=config, identity=identity, backend=backend)

if self.config.getboolean("signaling", "status", fallback=False):
self.log.info("Using status signaling")
Expand Down
4 changes: 2 additions & 2 deletions karton/system/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ class SystemService(KartonServiceBase):

identity = "karton.system"
version = __version__
with_service_info = True

GC_INTERVAL = 3 * 60
TASK_DISPATCHED_TIMEOUT = 24 * 3600
TASK_STARTED_TIMEOUT = 24 * 3600
TASK_CRASHED_TIMEOUT = 3 * 24 * 3600

def __init__(self, config: Optional[Config]) -> None:
super(SystemService, self).__init__(config=config)

super().__init__(config=config)
self.gc_interval = self.config.getint("system", "gc_interval", self.GC_INTERVAL)
self.task_dispatched_timeout = self.config.getint(
"system", "task_dispatched_timeout", self.TASK_DISPATCHED_TIMEOUT
Expand Down

0 comments on commit 46e39ef

Please sign in to comment.