Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a function to call gRPC stubs and wrap errors #58

Merged
merged 6 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Add a `exception` module to provide client exceptions, including gRPC errors with one subclass per gRPC error status code.
- `channel.parse_grpc_uri()` can now be used with `grpcio` too.
- A new `BaseApiClient` class is introduced to provide a base class for API clients. It is strongly recommended to use this class as a base class for all API clients.
- A new `call_stub_method()` function to simplify calling stub methods, converting gRPC errors to `ApiClientError`s, checking if the client is connected and optionally wrapping the response.

## Bug Fixes

Expand Down
183 changes: 176 additions & 7 deletions src/frequenz/client/base/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
"""Base class for API clients."""

import abc
from collections.abc import Callable
from typing import Any, Generic, Self, TypeVar
import inspect
from collections.abc import Awaitable, Callable
from typing import Any, Generic, Self, TypeVar, overload

from . import _grpchacks
from .channel import ChannelT, parse_grpc_uri
from .exception import ClientNotConnected
from .exception import ApiClientError, ClientNotConnected

StubT = TypeVar("StubT")
"""The type of the gRPC stub."""


class BaseApiClient(abc.ABC, Generic[StubT, ChannelT]):
Expand All @@ -19,6 +22,95 @@ class BaseApiClient(abc.ABC, Generic[StubT, ChannelT]):
This class provides a common interface for API clients that communicate with a API
server. It is designed to be subclassed by specific API clients that provide a more
specific interface.

Some extra tools are provided to make it easier to write API clients:

- [call_stub_method()][frequenz.client.base.client.call_stub_method] is a function
that calls a gRPC stub method and translates errors to API client errors.
- [GrpcStreamBroadcaster][frequenz.client.base.streaming.GrpcStreamBroadcaster] is
a class that helps sending messages from a gRPC stream to
a [Broadcast][frequenz.channels.Broadcast] channel.

Example:
This example illustrates how to create a simple API client that connects to a
gRPC server and calls a method on a stub.

```python
from collections.abc import AsyncIterable
import grpc
from frequenz.client.base.client import BaseApiClient, call_stub_method
from frequenz.client.base.streaming import GrpcStreamBroadcaster
from frequenz.channels import Receiver

# These classes are normally generated by protoc
class ExampleRequest:
int_value: int
str_value: str

class ExampleResponse:
float_value: float

class ExampleStub:
async def example_method(
self,
request: ExampleRequest # pylint: disable=unused-argument
) -> ExampleResponse:
...

def example_stream(self) -> AsyncIterable[ExampleResponse]:
...
# End of generated classes

class ExampleResponseWrapper:
def __init__(self, response: ExampleResponse):
self.transformed_value = f"{response.float_value:.2f}"

class MyApiClient(BaseApiClient[ExampleStub, grpc.Channel]):
def __init__(self, server_url: str, *, connect: bool = True):
super().__init__(
server_url, ExampleStub, grpc.Channel, connect=connect
)
self._broadcaster = GrpcStreamBroadcaster(
"stream",
lambda: self.stub.example_stream(ExampleRequest()),
ExampleResponseWrapper,
)

async def example_method(
self, int_value: int, str_value: str
) -> ExampleResponseWrapper:
return await call_stub_method(
self,
lambda: self.stub.example_method(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to use explicit parameter names here to avoid having ahem the reader accidentally thinking it is a parameter name 😇

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, too much rust going on? 😛

Will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code I'm not so convinced now. The function is called "call stub method", isn't it pretty obvious that what you are passing is... ahem, the stub method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thinks forcing to use:

call_stub_method(self, stub_method=lambda: self.stub.example_method())

Might be a bit too verbose.

This comment was marked as off-topic.

ExampleRequest(int_value=int_value, str_value=str_value)
),
transform=ExampleResponseWrapper,
)

def example_stream(self) -> Receiver[ExampleResponseWrapper]:
return self._broadcaster.new_receiver()


async def main():
client = MyApiClient("grpc://localhost")
response = await client.example_method(42, "hello")
print(response.transformed_value)
count = 0
async for response in client.example_stream():
print(response.transformed_value)
count += 1
if count >= 5:
break
```

!!! Note
* This example uses `grpcio` as the underlaying gRPC library, but `grpclib`
can be used as well.
* In this case a very simple `GrpcStreamBroadcaster` is used, asuming that
each call to `example_stream` will stream the same data. If the request
is more complex, you will probably need to have some kind of map from
a key based on the stream method request parameters to broadcaster
instances.
"""

def __init__(
Expand All @@ -27,16 +119,16 @@ def __init__(
create_stub: Callable[[ChannelT], StubT],
channel_type: type[ChannelT],
*,
auto_connect: bool = True,
connect: bool = True,
) -> None:
"""Create an instance and connect to the server.

Args:
server_url: The URL of the server to connect to.
create_stub: A function that creates a stub from a channel.
channel_type: The type of channel to use.
auto_connect: Whether to automatically connect to the server. If `False`, the
client will not connect to the server until
connect: Whether to connect to the server as soon as a client instance is
created. If `False`, the client will not connect to the server until
[connect()][frequenz.client.base.client.BaseApiClient.connect] is
called.
"""
Expand All @@ -45,7 +137,7 @@ def __init__(
self._channel_type: type[ChannelT] = channel_type
self._channel: ChannelT | None = None
self._stub: StubT | None = None
if auto_connect:
if connect:
self.connect(server_url)

@property
Expand Down Expand Up @@ -141,3 +233,80 @@ async def __aexit__(
self._channel = None
self._stub = None
return result


StubOutT = TypeVar("StubOutT")
"""The type of the response from a gRPC stub method."""

TransformOutT_co = TypeVar("TransformOutT_co", covariant=True)
"""The type of the transformed response from a gRPC stub method."""


@overload
async def call_stub_method(
client: BaseApiClient[StubT, ChannelT],
stub_method: Callable[[], Awaitable[StubOutT]],
*,
method_name: str | None = None,
transform: Callable[[StubOutT], TransformOutT_co],
) -> TransformOutT_co: ...


@overload
async def call_stub_method(
client: BaseApiClient[StubT, ChannelT],
stub_method: Callable[[], Awaitable[StubOutT]],
*,
method_name: str | None = None,
transform: None = None,
) -> StubOutT: ...


async def call_stub_method(
client: BaseApiClient[StubT, ChannelT],
stub_method: Callable[[], Awaitable[StubOutT]],
*,
method_name: str | None = None,
transform: Callable[[StubOutT], TransformOutT_co] | None = None,
) -> StubOutT | TransformOutT_co:
"""Call a gRPC stub method and translate errors to API client errors.

This function is a convenience wrapper around calling a gRPC stub method. It
translates gRPC errors to API client errors and optionally transforms the response
using a provided function.

This function is designed to be used with API clients that subclass
[BaseApiClient][frequenz.client.base.client.BaseApiClient].

Args:
client: The API client to use.
stub_method: The gRPC stub method to call.
method_name: The name of the method being called. If not provided, the name of
the calling function is used.
transform: A function that transforms the response from the gRPC stub method.

Returns:
The response from the gRPC stub method, possibly transformed by the `transform`
function if provided.

Raises:
ClientNotConnected: If the client is not connected to the server.
GrpcError: If a gRPC error occurs.
"""
if method_name is None:
# Get the name of the calling function
method_name = inspect.stack()[1][3]

if not client.is_connected:
raise ClientNotConnected(server_url=client.server_url, operation=method_name)

try:
response = await stub_method()
except (_grpchacks.GrpclibError, _grpchacks.GrpcioError) as grpclib_error:
raise ApiClientError.from_grpc_error(
server_url=client.server_url,
operation=method_name,
grpc_error=grpclib_error,
) from grpclib_error

return response if transform is None else transform(response)
Loading
Loading