Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure modules and class names #25

Merged
merged 10 commits into from
Mar 1, 2024
Merged
19 changes: 12 additions & 7 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,27 @@
# For more details on the configuration please see:
# https://github.com/marketplace/actions/labeler

# TODO(cookiecutter): Add different parts of the source
# For example:
#
# "part:module":
# - "src/frequenz/lib/client_base/module/**"
# - changed-files:
# - any-glob-to-any-file:
# - "src/frequenz/lib/client_microgrid/module/**"
#
# "part:other":
# - "src/frequenz/lib/client_base/other/**"
# - changed-files:
# - any-glob-to-any-file:
# - "src/frequenz/lib/client_microgrid/other/**"
#
# # For excluding some files (in this example, label "part:complicated"
# # everything inside src/ with a .py suffix, except for src/__init__.py)
# "part:complicated":
# - any:
# - "src/**/*.py"
# - all:
# - "!src/__init__.py"
# - changed-files:
# - any-glob-to-any-file:
# - "src/**/*.py"
# - all-glob-to-all-file:
# - "!src/__init__.py"
#
# Please have in mind that that the part:xxx labels need to
# be created in the GitHub repository.
Expand Down Expand Up @@ -51,7 +56,7 @@
- ".editorconfig"
- ".git*"
- ".git*/**"
- "docs/*.py"
- "docs/_scripts/**"
- CODEOWNERS
- MANIFEST.in
- noxfile.py
12 changes: 11 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@

## Upgrading

<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
The project structure was updated to use more consistent and shorter modules and class names.

* `frequenz.client.base.grpc_streaming_helper` was renamed to `frequenz.client.base.streaming`.

- The `GrpcStreamingHelper` class was renamed to `GrpcStreamBroadcaster`.

+ The constructor argument `retry_spec` was renamed to `retry_strategy`.

* `frequenz.client.base.retry_strategy` was renamed to `frequenz.client.base.retry`.

- The `RetryStrategy` class was renamed to `Strategy`.

## New Features

Expand Down
13 changes: 5 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ classifiers = [
requires-python = ">= 3.11, < 4"
# TODO(cookiecutter): Remove and add more dependencies if appropriate
dependencies = [
"typing-extensions >= 4.5.0, < 5",
"grpcio >= 1.54.2, < 2",
"grpcio-tools >= 1.54.2, < 2",
"frequenz-channels >= v1.0.0-beta.2, < 2",
"grpcio >= 1.54.2, < 2",
"protobuf >= 4.25.3, < 5",
"typing-extensions >= 4.5.0, < 5",
]
dynamic = ["version"]

Expand Down Expand Up @@ -64,14 +64,11 @@ dev-mypy = [
"mypy == 1.8.0",
"types-Markdown == 3.5.0.20240129",
"types-protobuf == 4.24.0.20240129",
"grpc-stubs == 1.53.0.5", # This dependency introduces breaking changes in patch releases
"grpc-stubs == 1.53.0.5", # This dependency introduces breaking changes in patch releases
# For checking the noxfile, docs/ script, and tests
"frequenz-client-base[dev-mkdocs,dev-noxfile,dev-pytest]",
]
dev-noxfile = [
"nox == 2023.4.22",
"frequenz-repo-config[lib] == 0.8.0",
]
dev-noxfile = ["nox == 2023.4.22", "frequenz-repo-config[lib] == 0.8.0"]
dev-pylint = [
"pylint == 3.0.3",
# For checking the noxfile, docs/ script, and tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"""Default retry jitter, in seconds."""


class RetryStrategy(ABC):
class Strategy(ABC):
"""Interface for implementing retry strategies."""

_limit: int | None
Expand Down Expand Up @@ -74,7 +74,7 @@ def __iter__(self) -> Iterator[float]:
yield interval


class LinearBackoff(RetryStrategy):
class LinearBackoff(Strategy):
"""Provides methods for calculating the interval between retries."""

def __init__(
Expand Down Expand Up @@ -112,7 +112,7 @@ def next_interval(self) -> float | None:
return self._interval + random.uniform(0.0, self._jitter)


class ExponentialBackoff(RetryStrategy):
class ExponentialBackoff(Strategy):
"""Provides methods for calculating the exponential interval between retries."""

DEFAULT_INTERVAL = DEFAULT_RETRY_INTERVAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
import logging
from typing import Any, Callable, Generic, TypeVar

import grpc
from grpc.aio import UnaryStreamCall
import grpc.aio

from frequenz import channels

from . import retry_strategy
from . import retry

_logger = logging.getLogger(__name__)

Expand All @@ -24,15 +23,15 @@
"""The output type of the stream."""


class GrpcStreamingHelper(Generic[InputT, OutputT]):
class GrpcStreamBroadcaster(Generic[InputT, OutputT]):
"""Helper class to handle grpc streaming methods."""
llucax marked this conversation as resolved.
Show resolved Hide resolved

def __init__(
self,
stream_name: str,
stream_method: Callable[[], UnaryStreamCall[Any, InputT]],
stream_method: Callable[[], grpc.aio.UnaryStreamCall[Any, InputT]],
transform: Callable[[InputT], OutputT],
retry_spec: retry_strategy.RetryStrategy | None = None,
retry_strategy: retry.Strategy | None = None,
):
"""Initialize the streaming helper.

Expand All @@ -41,18 +40,18 @@ def __init__(
stream_method: A function that returns the grpc stream. This function is
called everytime the connection is lost and we want to retry.
transform: A function to transform the input type to the output type.
retry_spec: The retry strategy to use, when the connection is lost. Defaults
retry_strategy: The retry strategy to use, when the connection is lost. Defaults
to retries every 3 seconds, with a jitter of 1 second, indefinitely.
"""
self._stream_name = stream_name
self._stream_method = stream_method
self._transform = transform
self._retry_spec = (
retry_strategy.LinearBackoff() if retry_spec is None else retry_spec.copy()
self._retry_strategy = (
retry.LinearBackoff() if retry_strategy is None else retry_strategy.copy()
)

self._channel: channels.Broadcast[OutputT] = channels.Broadcast(
f"GrpcStreamingHelper-{stream_name}"
f"GrpcStreamBroadcaster-{stream_name}"
)
self._task = asyncio.create_task(self._run())

Expand Down Expand Up @@ -93,19 +92,19 @@ async def _run(self) -> None:
_logger.exception(
"Error in grpc streaming method: %s", self._stream_name
)
if interval := self._retry_spec.next_interval():
if interval := self._retry_strategy.next_interval():
_logger.warning(
"`%s`, connection ended, retrying %s in %0.3f seconds.",
self._stream_name,
self._retry_spec.get_progress(),
self._retry_strategy.get_progress(),
interval,
)
await asyncio.sleep(interval)
else:
_logger.warning(
"`%s`, connection ended, retry limit exceeded %s.",
self._stream_name,
self._retry_spec.get_progress(),
self._retry_strategy.get_progress(),
)
await self._channel.close()
break
Loading
Loading