Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended service information for non-consumer services #206

Merged
merged 7 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 130 additions & 8 deletions karton/core/backend.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import dataclasses
import enum
import json
import logging
import time
import urllib.parse
import warnings
from collections import defaultdict, namedtuple
from typing import IO, Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union
Expand All @@ -10,6 +13,7 @@
from redis.client import Pipeline
from urllib3.response import HTTPResponse

from .exceptions import InvalidIdentityError
from .task import Task, TaskPriority, TaskState
from .utils import chunks

Expand All @@ -20,14 +24,14 @@
KARTON_TASK_NAMESPACE = "karton.task"
KARTON_OUTPUTS_NAMESPACE = "karton.outputs"


KartonBind = namedtuple(
"KartonBind",
["identity", "info", "version", "persistent", "filters", "service_version"],
)


KartonOutputs = namedtuple("KartonOutputs", ["identity", "outputs"])
logger = logging.getLogger("karton.core.backend")


class KartonMetrics(enum.Enum):
Expand All @@ -38,33 +42,118 @@ class KartonMetrics(enum.Enum):
TASK_GARBAGE_COLLECTED = "karton.metrics.garbage-collected"


@dataclasses.dataclass(frozen=True, order=True)
class KartonServiceInfo:
"""
Extended Karton service information.

Instances of this dataclass are meant to be aggregated to count service replicas
in Karton Dashboard. They're considered equal if identity and versions strings
are the same.
"""

identity: str = dataclasses.field(metadata={"serializable": False})
karton_version: str
service_version: Optional[str] = None
# Extra information about Redis client
redis_client_info: Optional[Dict[str, str]] = dataclasses.field(
default=None, hash=False, compare=False, metadata={"serializable": False}
)

def make_client_name(self) -> str:
included_keys = [
field.name
for field in dataclasses.fields(self)
if field.metadata.get("serializable", True)
]
params = {
k: v
for k, v in dataclasses.asdict(self).items()
psrok1 marked this conversation as resolved.
Show resolved Hide resolved
if k in included_keys and v is not None
}
return f"{self.identity}?{urllib.parse.urlencode(params)}"

@classmethod
def parse_client_name(
cls, client_name: str, redis_client_info: Optional[Dict[str, str]] = None
) -> "KartonServiceInfo":
included_keys = [
field.name
for field in dataclasses.fields(cls)
if field.metadata.get("serializable", True)
]
identity, params_string = client_name.split("?", 1)
# Filter out unknown params to not get crashed by future extensions
params = dict(
[
(key, value)
for key, value in urllib.parse.parse_qsl(params_string)
if key in included_keys
]
)
return KartonServiceInfo(
identity, redis_client_info=redis_client_info, **params
)


class KartonBackend:
def __init__(self, config, identity: Optional[str] = None) -> None:
def __init__(
self,
config,
identity: Optional[str] = None,
service_info: Optional[KartonServiceInfo] = None,
) -> None:
self.config = config

if identity is not None:
self._validate_identity(identity)
self.identity = identity
self.redis = self.make_redis(config, identity=identity)

self.service_info = service_info
self.redis = self.make_redis(
config, identity=identity, service_info=service_info
)
self.s3 = boto3.client(
"s3",
endpoint_url=config["s3"]["address"],
aws_access_key_id=config["s3"]["access_key"],
aws_secret_access_key=config["s3"]["secret_key"],
)

def make_redis(self, config, identity: Optional[str] = None) -> StrictRedis:
@staticmethod
def _validate_identity(identity: str):
disallowed_chars = [" ", "?"]
if any(disallowed_char in identity for disallowed_char in disallowed_chars):
raise InvalidIdentityError(
f"Karton identity should not contain {disallowed_chars}"
)

@staticmethod
def make_redis(
config,
identity: Optional[str] = None,
service_info: Optional[KartonServiceInfo] = None,
) -> StrictRedis:
"""
Create and test a Redis connection.

:param config: The karton configuration
:param identity: Karton service identity
:param service_info: Additional service identity metadata
:return: Redis connection
"""
if service_info is not None:
client_name: Optional[str] = service_info.make_client_name()
else:
client_name = identity

redis_args = {
"host": config["redis"]["host"],
"port": config.getint("redis", "port", 6379),
"db": config.getint("redis", "db", 0),
"username": config.get("redis", "username"),
"password": config.get("redis", "password"),
"client_name": identity,
"client_name": client_name,
# set socket_timeout to None if set to 0
"socket_timeout": config.getint("redis", "socket_timeout", 30) or None,
"decode_responses": True,
Expand Down Expand Up @@ -232,17 +321,50 @@ def set_consumer_identity(self, _: str) -> None:
DeprecationWarning,
)

def get_online_consumers(self) -> Dict[str, List[str]]:
def get_online_consumers(self) -> Dict[str, List[Dict[str, str]]]:
"""
Gets all online consumer identities
Gets all online identities.

Actually this method returns all services having an identity,
so the list is not limited to consumers.

:return: Dictionary {identity: [list of clients]}
"""
bound_identities = defaultdict(list)
for client in self.redis.client_list():
bound_identities[client["name"]].append(client)
name = client["name"]
# Strip extra service information from client name
if "?" in name:
name, _ = name.split("?", 1)
bound_identities[name].append(client)
return bound_identities

def get_online_services(self) -> List[KartonServiceInfo]:
"""
Gets all online services providing extended service information.

Consumers by default don't provide that information and it's included in binds
instead. If you want to get information about all services, use
:py:meth:`KartonBackend.get_online_consumers`.

.. versionadded:: 5.1.0

:return: List of KartonServiceInfo objects
"""
bound_services = []
for client in self.redis.client_list():
name = client["name"]
if "?" in name:
try:
service_info = KartonServiceInfo.parse_client_name(
name, redis_client_info=client
)
bound_services.append(service_info)
except Exception:
logger.exception("Fatal error while parsing client name: %s", name)
continue
return bound_services

def get_task(self, task_uid: str) -> Optional[Task]:
"""
Get task object with given identifier
Expand Down
27 changes: 23 additions & 4 deletions karton/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from contextlib import contextmanager
from typing import Optional, Union, cast

from .backend import KartonBackend
from .__version__ import __version__
from .backend import KartonBackend, KartonServiceInfo
from .config import Config
from .logger import KartonLogHandler
from .task import Task
Expand All @@ -20,8 +21,12 @@ class KartonBase(abc.ABC):
attribute.
"""

identity = ""
#: Karton service identity
identity: str = ""
#: Karton service version
version: Optional[str] = None
#: Include extended service information for non-consumer services
with_service_info: bool = False

def __init__(
self,
Expand All @@ -39,7 +44,17 @@ def __init__(
if self.config.has_option("karton", "identity"):
self.identity = self.config.get("karton", "identity")

self.backend = backend or KartonBackend(self.config, identity=self.identity)
self.service_info = None
if self.identity is not None and self.with_service_info:
self.service_info = KartonServiceInfo(
identity=self.identity,
karton_version=__version__,
service_version=self.version,
)

self.backend = backend or KartonBackend(
self.config, identity=self.identity, service_info=self.service_info
)

self._log_handler = KartonLogHandler(
backend=self.backend, channel=self.identity
Expand Down Expand Up @@ -190,7 +205,11 @@ def __init__(
identity: Optional[str] = None,
backend: Optional[KartonBackend] = None,
) -> None:
super().__init__(config=config, identity=identity, backend=backend)
super().__init__(
config=config,
identity=identity,
backend=backend,
)
self.setup_logger()
self._shutdown = False

Expand Down
4 changes: 4 additions & 0 deletions karton/core/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
class InvalidIdentityError(Exception):
pass


class TaskTimeoutError(Exception):
pass

Expand Down
15 changes: 5 additions & 10 deletions karton/core/karton.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ def __init__(
identity: Optional[str] = None,
backend: Optional[KartonBackend] = None,
) -> None:
super(Producer, self).__init__(
config=config, identity=identity, backend=backend
)
super().__init__(config=config, identity=identity, backend=backend)

def send_task(self, task: Task) -> bool:
"""
Expand Down Expand Up @@ -117,9 +115,7 @@ def __init__(
identity: Optional[str] = None,
backend: Optional[KartonBackend] = None,
) -> None:
super(Consumer, self).__init__(
config=config, identity=identity, backend=backend
)
super().__init__(config=config, identity=identity, backend=backend)

if self.filters is None:
raise ValueError("Cannot bind consumer on Empty binds")
Expand Down Expand Up @@ -358,16 +354,15 @@ class LogConsumer(KartonServiceBase):

logger_filter: Optional[str] = None
level: Optional[str] = None
with_service_info = True

def __init__(
self,
config: Optional[Config] = None,
identity: Optional[str] = None,
backend: Optional[KartonBackend] = None,
) -> None:
super(LogConsumer, self).__init__(
config=config, identity=identity, backend=backend
)
super().__init__(config=config, identity=identity, backend=backend)

@abc.abstractmethod
def process_log(self, event: Dict[str, Any]) -> None:
Expand Down Expand Up @@ -419,7 +414,7 @@ def __init__(
identity: Optional[str] = None,
backend: Optional[KartonBackend] = None,
) -> None:
super(Karton, self).__init__(config=config, identity=identity, backend=backend)
super().__init__(config=config, identity=identity, backend=backend)

if self.config.getboolean("signaling", "status", fallback=False):
self.log.info("Using status signaling")
Expand Down
4 changes: 2 additions & 2 deletions karton/system/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ class SystemService(KartonServiceBase):

identity = "karton.system"
version = __version__
with_service_info = True

GC_INTERVAL = 3 * 60
TASK_DISPATCHED_TIMEOUT = 24 * 3600
TASK_STARTED_TIMEOUT = 24 * 3600
TASK_CRASHED_TIMEOUT = 3 * 24 * 3600

def __init__(self, config: Optional[Config]) -> None:
super(SystemService, self).__init__(config=config)

super().__init__(config=config)
self.gc_interval = self.config.getint("system", "gc_interval", self.GC_INTERVAL)
self.task_dispatched_timeout = self.config.getint(
"system", "task_dispatched_timeout", self.TASK_DISPATCHED_TIMEOUT
Expand Down