Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make GrpcStreamBroadcaster compatible with both grpcio and grpclib #49

Merged
merged 5 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@

## Upgrading

<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
- You should now install the dependency using `frequenz-client-base[grpcio]` (or `frequenz-client-base[grpclib]`) if you want to migrate to `grpclib`).

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- `GrpcStreamBroadcaster` is now compatible with both `grpcio` and `grpclib` implementations of gRPC. Just install `frequenz-client-base[grpcio]` or `frequenz-client-base[grpclib]` to use the desired implementation and everything should work as expected.

## Bug Fixes

Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ plugins:
- https://frequenz-floss.github.io/frequenz-channels-python/v1.0-pre/objects.inv
- https://googleapis.dev/python/protobuf/latest/objects.inv
- https://grpc.github.io/grpc/python/objects.inv
- https://grpclib.readthedocs.io/en/latest/objects.inv
- https://typing-extensions.readthedocs.io/en/stable/objects.inv
# Note this plugin must be loaded after mkdocstrings to be able to use macros
# inside docstrings. See the comment in `docs/_scripts/macros.py` for more
Expand Down
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ classifiers = [
requires-python = ">= 3.11, < 4"
dependencies = [
"frequenz-channels >= v1.0.0-rc1, < 2",
"grpcio >= 1.54.2, < 2",
"protobuf >= 4.21.6, < 6",
"typing-extensions >= 4.5.0, < 5",
]
Expand All @@ -38,12 +37,15 @@ name = "Frequenz Energy-as-a-Service GmbH"
email = "floss@frequenz.com"

[project.optional-dependencies]
grpcio = ["grpcio >= 1.54.2, < 2"]
grpclib = ["grpclib >= 0.4.0, < 0.5"]
dev-flake8 = [
"flake8 == 7.0.0",
"flake8-docstrings == 1.7.0",
"flake8-pyproject == 1.2.3", # For reading the flake8 config from pyproject.toml
"flake8-pyproject == 1.2.3", # For reading the flake8 config from pyproject.toml
"pydoclint == 0.4.1",
"pydocstyle == 6.3.0",
"frequenz-client-base[grpclib,grpcio]",
]
dev-formatting = ["black == 24.4.2", "isort == 5.13.2"]
dev-mkdocs = [
Expand All @@ -56,6 +58,7 @@ dev-mkdocs = [
"mkdocs-material == 9.5.20",
"mkdocstrings[python] == 0.25.0",
"frequenz-repo-config[lib] == 0.9.2",
"frequenz-client-base[grpclib,grpcio]",
]
dev-mypy = [
"mypy == 1.10.0",
Expand All @@ -78,6 +81,7 @@ dev-pytest = [
"pytest-asyncio == 0.23.6",
"async-solipsism == 0.6",
"hypothesis == 6.100.2",
"frequenz-client-base[grpclib,grpcio]",
]
dev = [
"frequenz-client-base[dev-mkdocs,dev-flake8,dev-formatting,dev-mkdocs,dev-mypy,dev-noxfile,dev-pylint,dev-pytest]",
Expand Down
46 changes: 46 additions & 0 deletions src/frequenz/client/base/_grpchacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Hacks to deal with multiple grpc libraries.
daniel-zullo-frequenz marked this conversation as resolved.
Show resolved Hide resolved

This module conditionally imports the base exceptions from the `grpclib` and `grpcio`
libraries, assigning them a new name:

- [`GrpclibError`][] for [`grpclib.GRPCError`][]
- [`GrpcioError`][] for [`grpc.aio.AioRpcError`][]

If the libraries are not installed, the module defines dummy classes with the same names
to avoid import errors.

This way exceptions can be caught from both libraries independently of which one is
used. The unused library will just never raise any exceptions.
"""


try:
from grpclib import GRPCError as GrpclibError
except ImportError:

class GrpclibError(Exception): # type: ignore[no-redef]
"""A dummy class to avoid import errors.

This class will never be actually used, as it is only used for catching
exceptions from the grpclib library. If the grpclib library is not installed,
this class will never be instantiated.
"""


try:
from grpc.aio import AioRpcError as GrpcioError
except ImportError:

class GrpcioError(Exception): # type: ignore[no-redef]
"""A dummy class to avoid import errors.

This class will never be actually used, as it is only used for catching
exceptions from the grpc library. If the grpc library is not installed,
this class will never be instantiated.
"""


__all__ = ["GrpclibError", "GrpcioError"]
10 changes: 5 additions & 5 deletions src/frequenz/client/base/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

import asyncio
import logging
from typing import Any, Callable, Generic, TypeVar

import grpc.aio
from collections.abc import AsyncIterator, Callable
from typing import Generic, TypeVar

from frequenz import channels

from . import retry
from ._grpchacks import GrpcioError, GrpclibError

_logger = logging.getLogger(__name__)

Expand All @@ -29,7 +29,7 @@ class GrpcStreamBroadcaster(Generic[InputT, OutputT]):
def __init__(
self,
stream_name: str,
stream_method: Callable[[], grpc.aio.UnaryStreamCall[Any, InputT]],
stream_method: Callable[[], AsyncIterator[InputT]],
transform: Callable[[InputT], OutputT],
retry_strategy: retry.Strategy | None = None,
):
Expand Down Expand Up @@ -88,7 +88,7 @@ async def _run(self) -> None:
call = self._stream_method()
async for msg in call:
await sender.send(self._transform(msg))
except grpc.aio.AioRpcError as err:
except (GrpcioError, GrpclibError) as err:
error = err
error_str = f"Error: {error}" if error else "Stream exhausted"
if interval := self._retry_strategy.next_interval():
Expand Down
198 changes: 198 additions & 0 deletions tests/streaming/test_grpc_stream_broadcaster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Tests for GrpcStreamBroadcaster class."""

import asyncio
import logging
from collections.abc import AsyncIterator
from contextlib import AsyncExitStack
from unittest.mock import MagicMock

import grpc.aio
import grpclib
import pytest

from frequenz.client.base import retry, streaming


def _transformer(x: int) -> str:
"""Mock transformer."""
return f"transformed_{x}"


@pytest.fixture
def receiver_ready_event() -> asyncio.Event:
"""Fixture for receiver ready event."""
return asyncio.Event()


@pytest.fixture
def no_retry() -> MagicMock:
"""Fixture for mocked, non-retrying retry strategy."""
mock_retry = MagicMock(spec=retry.Strategy)
mock_retry.next_interval.return_value = None
mock_retry.copy.return_value = mock_retry
mock_retry.get_progress.return_value = "mock progress"
return mock_retry


@pytest.fixture
async def ok_helper(
no_retry: MagicMock, # pylint: disable=redefined-outer-name
receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name
) -> AsyncIterator[streaming.GrpcStreamBroadcaster[int, str]]:
"""Fixture for GrpcStreamBroadcaster."""

async def asynciter(ready_event: asyncio.Event) -> AsyncIterator[int]:
"""Mock async iterator."""
await ready_event.wait()
for i in range(5):
yield i
await asyncio.sleep(0) # Yield control to the event loop

helper = streaming.GrpcStreamBroadcaster(
stream_name="test_helper",
stream_method=lambda: asynciter(receiver_ready_event),
transform=_transformer,
retry_strategy=no_retry,
)
yield helper
await helper.stop()


class _ErroringAsyncIter(AsyncIterator[int]):
"""Async iterator that raises an error after a certain number of successes."""

def __init__(
self, error: Exception, ready_event: asyncio.Event, num_successes: int = 0
):
self._error = error
self._ready_event = ready_event
self._num_successes = num_successes
self._current = -1

async def __anext__(self) -> int:
self._current += 1
await self._ready_event.wait()
if self._current >= self._num_successes:
raise self._error
return self._current


async def test_streaming_success(
ok_helper: streaming.GrpcStreamBroadcaster[
int, str
], # pylint: disable=redefined-outer-name
no_retry: MagicMock, # pylint: disable=redefined-outer-name
receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test streaming success."""
caplog.set_level(logging.INFO)
items: list[str] = []
async with asyncio.timeout(1):
receiver = ok_helper.new_receiver()
receiver_ready_event.set()
async for item in receiver:
items.append(item)
no_retry.next_interval.assert_called_once_with()
assert items == [
"transformed_0",
"transformed_1",
"transformed_2",
"transformed_3",
"transformed_4",
]
assert caplog.record_tuples == [
(
"frequenz.client.base.streaming",
logging.ERROR,
"test_helper: connection ended, retry limit exceeded (mock progress), "
"giving up. Stream exhausted.",
)
]


class _NamedMagicMock(MagicMock):
"""Mock with a name."""

def __str__(self) -> str:
return self._mock_name # type: ignore

def __repr__(self) -> str:
return self._mock_name # type: ignore


@pytest.mark.parametrize("successes", [0, 1, 5])
@pytest.mark.parametrize(
"error_spec",
[
(
grpc.aio.AioRpcError(
code=_NamedMagicMock(name="mock grpcio code"),
initial_metadata=MagicMock(),
trailing_metadata=MagicMock(),
details="mock details",
debug_error_string="mock debug_error_string",
),
"<AioRpcError of RPC that terminated with:\n"
"\tstatus = mock grpcio code\n"
'\tdetails = "mock details"\n'
'\tdebug_error_string = "mock debug_error_string"\n'
">",
),
(
grpclib.GRPCError(
status=_NamedMagicMock(name="mock grpclib status"),
message="mock grpclib error",
details="mock grpclib details",
),
"(mock grpclib status, 'mock grpclib error', 'mock grpclib details')",
),
],
ids=["grpcio", "grpclib"],
)
async def test_streaming_error( # pylint: disable=too-many-arguments
successes: int,
error_spec: tuple[Exception, str],
no_retry: MagicMock, # pylint: disable=redefined-outer-name
receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test streaming errors."""
caplog.set_level(logging.INFO)
error, expected_error_str = error_spec
helper = streaming.GrpcStreamBroadcaster(
stream_name="test_helper",
stream_method=lambda: _ErroringAsyncIter(
error, receiver_ready_event, num_successes=successes
),
transform=_transformer,
retry_strategy=no_retry,
)

items: list[str] = []
async with AsyncExitStack() as stack:
stack.push_async_callback(helper.stop)

receiver = helper.new_receiver()
receiver_ready_event.set()
async for item in receiver:
items.append(item)

no_retry.next_interval.assert_called_once_with()
assert items == [f"transformed_{i}" for i in range(successes)]
assert caplog.record_tuples == [
(
"frequenz.client.base.streaming",
logging.INFO,
"test_helper: starting to stream",
),
(
"frequenz.client.base.streaming",
logging.ERROR,
"test_helper: connection ended, retry limit exceeded (mock progress), "
f"giving up. Error: {expected_error_str}.",
),
]
Loading