Skip to content

Commit

Permalink
Convert to betterproto (#37)
Browse files Browse the repository at this point in the history
This PR migrated the Microgrid API to use `betterproto` instead of
`grpc`.

A couple of bugs were found on the way and fixed, one of them directly
related to the lack of proper type hints in `grpc`.

The changes are fairly small.

> [!NOTE]
> This PR is just a draft, tests need to be adapted still and I don't
plan to merge this to `v0.x.x` directly, as it would be an unnecessary
breaking change for the branch used by the SDK. This is mainly to show
what changes are needed to migrate a project to `betterproto`.

This PR is using an unreleased repository that hosts the files generated
by betterproto from the protobuf files. It will be published soon ™️
  • Loading branch information
llucax committed May 23, 2024
2 parents 8f4238a + 78b08fa commit c46c680
Show file tree
Hide file tree
Showing 12 changed files with 818 additions and 1,535 deletions.
3 changes: 2 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
## Upgrading

- The client is now using [`grpclib`](https://pypi.org/project/grpclib/) to connect to the server instead of [`grpcio`](https://pypi.org/project/grpcio/). You might need to adapt the way you connect to the server in your code, using `grpcio.client.Channel`.
- The client now doesn't raise `grpc.aio.RpcError` exceptions anymore. Instead, it raises `ClientError` exceptions that have the `grpc.aio.RpcError` as their `__cause__`. You might need to adapt your error handling code to catch `ClientError` exceptions instead of `grpc.aio.RpcError` exceptions.
- The client now doesn't raise `grpc.aio.RpcError` exceptions anymore. Instead, it raises `ClientError` exceptions that have the `grpclib.GRPCError` as their `__cause__`. You might need to adapt your error handling code to catch `ClientError` exceptions instead of `grpc.aio.RpcError` exceptions.
- The client now uses protobuf/grpc bindings generated [betterproto](https://github.com/danielgtaylor/python-betterproto) instead of [grpcio](https://pypi.org/project/grpcio/). If you were using the bindings directly, you might need to do some minor adjustments to your code.

## New Features

Expand Down
11 changes: 3 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

[build-system]
requires = [
"setuptools == 68.1.0",
"setuptools_scm[toml] == 7.1.0",
"frequenz-repo-config[lib] == 0.9.1",
]
requires = ["setuptools == 68.1.0", "setuptools_scm[toml] == 7.1.0"]
build-backend = "setuptools.build_meta"

[project]
Expand Down Expand Up @@ -36,11 +32,10 @@ classifiers = [
]
requires-python = ">= 3.11, < 4"
dependencies = [
"frequenz-api-microgrid >= 0.15.3, < 0.16.0",
"betterproto == 2.0.0b6",
"frequenz-channels >= 1.0.0-rc1, < 2.0.0",
"frequenz-client-base[grpclib] >= 0.4.0, < 0.5",
"grpcio >= 1.54.2, < 2",
"protobuf >= 4.21.6, < 6",
"frequenz-microgrid-betterproto >= 0.15.3.1, < 0.16",
"timezonefinder >= 6.2.0, < 7",
"typing-extensions >= 4.5.0, < 5",
]
Expand Down
145 changes: 57 additions & 88 deletions src/frequenz/client/microgrid/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,19 @@

import asyncio
import logging
from collections.abc import AsyncIterator, Awaitable, Callable, Iterable
from typing import Any, TypeVar, cast

import grpc.aio

# pylint: disable=no-name-in-module
from frequenz.api.common.components_pb2 import ComponentCategory as PbComponentCategory
from frequenz.api.common.metrics_pb2 import Bounds as PbBounds
from frequenz.api.microgrid.microgrid_pb2 import ComponentData as PbComponentData
from frequenz.api.microgrid.microgrid_pb2 import ComponentFilter as PbComponentFilter
from frequenz.api.microgrid.microgrid_pb2 import ComponentIdParam as PbComponentIdParam
from frequenz.api.microgrid.microgrid_pb2 import ComponentList as PbComponentList
from frequenz.api.microgrid.microgrid_pb2 import ConnectionFilter as PbConnectionFilter
from frequenz.api.microgrid.microgrid_pb2 import ConnectionList as PbConnectionList
from frequenz.api.microgrid.microgrid_pb2 import (
MicrogridMetadata as PbMicrogridMetadata,
)
from frequenz.api.microgrid.microgrid_pb2 import SetBoundsParam as PbSetBoundsParam
from frequenz.api.microgrid.microgrid_pb2 import (
SetPowerActiveParam as PbSetPowerActiveParam,
)
from frequenz.api.microgrid.microgrid_pb2_grpc import MicrogridStub
from collections.abc import Callable, Iterable, Set
from typing import Any, TypeVar

# pylint: enable=no-name-in-module
import grpclib
import grpclib.client
from betterproto.lib.google import protobuf as pb_google
from frequenz.channels import Receiver
from frequenz.client.base import retry, streaming
from google.protobuf.empty_pb2 import Empty # pylint: disable=no-name-in-module
from google.protobuf.timestamp_pb2 import Timestamp # pylint: disable=no-name-in-module
from frequenz.microgrid.betterproto.frequenz.api import microgrid as pb_microgrid
from frequenz.microgrid.betterproto.frequenz.api.common import (
components as pb_components,
)
from frequenz.microgrid.betterproto.frequenz.api.common import metrics as pb_metrics

from ._component import (
Component,
Expand Down Expand Up @@ -67,7 +52,7 @@ class ApiClient:

def __init__(
self,
grpc_channel: grpc.aio.Channel,
grpc_channel: grpclib.client.Channel,
target: str,
retry_strategy: retry.Strategy | None = None,
) -> None:
Expand All @@ -84,7 +69,7 @@ def __init__(
self.target = target
"""The location (as "host:port") of the microgrid API gRPC server."""

self.api = MicrogridStub(grpc_channel)
self.api = pb_microgrid.MicrogridStub(grpc_channel)
"""The gRPC stub for the microgrid API."""

self._broadcasters: dict[int, streaming.GrpcStreamBroadcaster[Any, Any]] = {}
Expand All @@ -101,22 +86,19 @@ async def components(self) -> Iterable[Component]:
when the api call exceeded the timeout.
"""
try:
# grpc.aio is missing types and mypy thinks this is not awaitable,
# but it is
component_list = await cast(
Awaitable[PbComponentList],
self.api.ListComponents(
PbComponentFilter(),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
),
component_list = await self.api.list_components(
pb_microgrid.ComponentFilter(),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
)

except grpc.aio.AioRpcError as err:
except grpclib.GRPCError as err:
raise ClientError(
f"Failed to list components. Microgrid API: {self.target}. Err: {err.details()}"
f"Failed to list components. Microgrid API: {self.target}. Err: {err}"
) from err

components_only = filter(
lambda c: c.category is not PbComponentCategory.COMPONENT_CATEGORY_SENSOR,
lambda c: c.category
is not pb_components.ComponentCategory.COMPONENT_CATEGORY_SENSOR,
component_list.components,
)
result: Iterable[Component] = map(
Expand All @@ -140,16 +122,13 @@ async def metadata(self) -> Metadata:
Returns:
the microgrid metadata.
"""
microgrid_metadata: PbMicrogridMetadata | None = None
microgrid_metadata: pb_microgrid.MicrogridMetadata | None = None
try:
microgrid_metadata = await cast(
Awaitable[PbMicrogridMetadata],
self.api.GetMicrogridMetadata(
Empty(),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
),
microgrid_metadata = await self.api.get_microgrid_metadata(
pb_google.Empty(),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
)
except grpc.aio.AioRpcError:
except grpclib.GRPCError:
_logger.exception("The microgrid metadata is not available.")

if not microgrid_metadata:
Expand All @@ -166,8 +145,8 @@ async def metadata(self) -> Metadata:

async def connections(
self,
starts: set[int] | None = None,
ends: set[int] | None = None,
starts: Set[int] = frozenset(),
ends: Set[int] = frozenset(),
) -> Iterable[Connection]:
"""Fetch the connections between components in the microgrid.
Expand All @@ -184,23 +163,20 @@ async def connections(
ClientError: If the connection to the Microgrid API cannot be established or
when the api call exceeded the timeout.
"""
connection_filter = PbConnectionFilter(starts=starts, ends=ends)
connection_filter = pb_microgrid.ConnectionFilter(
starts=list(starts), ends=list(ends)
)
try:
valid_components, all_connections = await asyncio.gather(
self.components(),
# grpc.aio is missing types and mypy thinks this is not
# awaitable, but it is
cast(
Awaitable[PbConnectionList],
self.api.ListConnections(
connection_filter,
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
),
self.api.list_connections(
connection_filter,
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
),
)
except grpc.aio.AioRpcError as err:
except grpclib.GRPCError as err:
raise ClientError(
f"Failed to list connections. Microgrid API: {self.target}. Err: {err.details()}"
f"Failed to list connections. Microgrid API: {self.target}. Err: {err}"
) from err
# Filter out the components filtered in `components` method.
# id=0 is an exception indicating grid component.
Expand All @@ -223,7 +199,7 @@ async def _new_component_data_receiver(
*,
component_id: int,
expected_category: ComponentCategory,
transform: Callable[[PbComponentData], _ComponentDataT],
transform: Callable[[pb_microgrid.ComponentData], _ComponentDataT],
maxsize: int,
) -> Receiver[_ComponentDataT]:
"""Return a new broadcaster receiver for a given `component_id`.
Expand All @@ -250,13 +226,8 @@ async def _new_component_data_receiver(
if broadcaster is None:
broadcaster = streaming.GrpcStreamBroadcaster(
f"raw-component-data-{component_id}",
# We need to cast here because grpc says StreamComponentData is
# a grpc.CallIterator[PbComponentData] which is not an AsyncIterator,
# but it is a grpc.aio.UnaryStreamCall[..., PbComponentData], which it
# is.
lambda: cast(
AsyncIterator[PbComponentData],
self.api.StreamComponentData(PbComponentIdParam(id=component_id)),
lambda: self.api.stream_component_data(
pb_microgrid.ComponentIdParam(id=component_id)
),
transform,
retry_strategy=self._retry_strategy,
Expand Down Expand Up @@ -409,16 +380,15 @@ async def set_power(self, component_id: int, power_w: float) -> None:
when the api call exceeded the timeout.
"""
try:
await cast(
Awaitable[Empty],
self.api.SetPowerActive(
PbSetPowerActiveParam(component_id=component_id, power=power_w),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
await self.api.set_power_active(
pb_microgrid.SetPowerActiveParam(
component_id=component_id, power=power_w
),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
)
except grpc.aio.AioRpcError as err:
except grpclib.GRPCError as err:
raise ClientError(
f"Failed to set power. Microgrid API: {self.target}. Err: {err.details()}"
f"Failed to set power. Microgrid API: {self.target}. Err: {err}"
) from err

async def set_bounds(
Expand All @@ -427,7 +397,7 @@ async def set_bounds(
lower: float,
upper: float,
) -> None:
"""Send `PbSetBoundsParam`s received from a channel to the Microgrid service.
"""Send `SetBoundsParam`s received from a channel to the Microgrid service.
Args:
component_id: ID of the component to set bounds for.
Expand All @@ -446,28 +416,27 @@ async def set_bounds(
if lower > 0:
raise ValueError(f"Lower bound {lower} must be less than or equal to 0.")

target_metric = PbSetBoundsParam.TargetMetric.TARGET_METRIC_POWER_ACTIVE
target_metric = (
pb_microgrid.SetBoundsParamTargetMetric.TARGET_METRIC_POWER_ACTIVE
)
try:
await cast(
Awaitable[Timestamp],
self.api.AddInclusionBounds(
PbSetBoundsParam(
component_id=component_id,
target_metric=target_metric,
bounds=PbBounds(lower=lower, upper=upper),
),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
await self.api.add_inclusion_bounds(
pb_microgrid.SetBoundsParam(
component_id=component_id,
target_metric=target_metric,
bounds=pb_metrics.Bounds(lower=lower, upper=upper),
),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
)
except grpc.aio.AioRpcError as err:
except grpclib.GRPCError as err:
_logger.error(
"set_bounds write failed: %s, for message: %s, api: %s. Err: %s",
err,
next,
api_details,
err.details(),
err,
)
raise ClientError(
f"Failed to set inclusion bounds. Microgrid API: {self.target}. "
f"Err: {err.details()}"
f"Err: {err}"
) from err
Loading

0 comments on commit c46c680

Please sign in to comment.