Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add FastStream integration auto_inject option #128

Merged
merged 10 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions docs/integrations/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ With some frameworks we provide an option to inject dependencies in handlers wit

from dishka.integrations.flask import FromDishka, setup_dishka

@app.get("/"
@app.get("/")
def index(
*,
interactor: FromDishka[Interactor],
Expand All @@ -91,7 +91,7 @@ With some frameworks we provide an option to inject dependencies in handlers wit

.. code-block:: python

from dishka.integrations.fastapi import FromDishka, setup_dishka
from dishka.integrations.fastapi import FromDishka, DishkaRoute, setup_dishka

router = APIRouter(route_class=DishkaRoute)

Expand All @@ -105,6 +105,27 @@ With some frameworks we provide an option to inject dependencies in handlers wit

setup_dishka(container, app)

* For **FasStream** (**0.5.0** version and higher) you need to provide ``auto_inject=True`` when calling ``setup_dishka``. It is important here to call it before registering any subscribers or router include:

.. code-block:: python

from faststream import FastStream
from faststream.nats import NatsBroker, NatsMessage
from dishka import make_async_container
from dishka.integrations.faststream import FastStreamProvider, FromDishka, setup_dishka

broker = NatsBroker()
app = FastStream(broker)
setup_dishka(make_async_container(..., FastStreamProvider), app, auto_inject=True)

@broker.subscriber("/")
def index(
*,
message: FromDishka[NatsMessage],
) -> str:
await message.ack()
return message.body


Context data
====================
Expand All @@ -121,7 +142,7 @@ This objects are passed to context:
* Aiogram - ``aiogram.types.TelegramObject``
* pyTelegramBotAPI - actual type of event (like ``Message``) is used.
* Arq - no objects
* FastStream - no objects
* FastStream - ``faststream.broker.message.StreamMessage`` or ``faststream.[broker].[Broker]Message``, ``faststream.utils.ContextRepo``
* TaskIq - no objects

To use such objects you need to declare them in your provider using :ref:`from-context` and then they will be available as factories params.
Expand Down
13 changes: 6 additions & 7 deletions examples/integrations/faststream_app.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from typing import Annotated

from faststream import FastStream
from faststream.nats import NatsBroker
from faststream import ContextRepo, FastStream
from faststream.nats import NatsBroker, NatsMessage

from dishka import Provider, Scope, make_async_container, provide
from dishka.integrations.faststream import FromDishka, inject, setup_dishka
from dishka.integrations.faststream import FromDishka, setup_dishka


class A:
Expand Down Expand Up @@ -32,15 +30,16 @@ def get_b(self, a: A) -> B:

broker = NatsBroker()
app = FastStream(broker)
setup_dishka(container, app)
setup_dishka(container, app, auto_inject=True)


@broker.subscriber("test")
@inject
async def handler(
msg: str,
a: FromDishka[A],
b: FromDishka[B],
raw_message: FromDishka[NatsMessage],
faststream_context: FromDishka[ContextRepo],
):
print(msg, a, b)

Expand Down
3 changes: 0 additions & 3 deletions requirements/faststream-044.txt

This file was deleted.

3 changes: 3 additions & 0 deletions requirements/faststream-047.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-r test.txt

faststream[nats]==0.4.7
3 changes: 3 additions & 0 deletions requirements/faststream-050.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-r test.txt

faststream[nats]==0.5.0rc2
111 changes: 86 additions & 25 deletions src/dishka/integrations/faststream.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,124 @@
__all__ = (
"FromDishka",
"inject",
"FastStreamProvider",
"setup_dishka",
)

import warnings
from collections.abc import AsyncIterator, Awaitable, Callable
from contextlib import asynccontextmanager
from typing import Any, TypeVar
from typing import Any

from faststream import BaseMiddleware, FastStream, context
from faststream.__about__ import __version__
from faststream.broker.message import StreamMessage
from faststream.types import DecodedMessage
from faststream.utils.context import ContextRepo

from dishka import AsyncContainer, FromDishka
from dishka import AsyncContainer, FromDishka, Provider, Scope, from_context
from dishka.integrations.base import wrap_injection

T = TypeVar("T")

class FastStreamProvider(Provider):
context = from_context(provides=ContextRepo, scope=Scope.REQUEST)
message = from_context(provides=StreamMessage, scope=Scope.REQUEST)

def inject(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
return wrap_injection(
func=func,
container_getter=lambda *_: context.get_local("dishka"),
is_async=True,
remove_depends=True,
)

FASTSTREAM_OLD_MIDDLEWARES = __version__ < "0.5"

class DishkaMiddleware(BaseMiddleware):

class _DishkaBaseMiddleware(BaseMiddleware):
def __init__(self, container: AsyncContainer) -> None:
self.container = container

def __call__(self, msg: Any | None = None) -> "DishkaMiddleware":
def __call__(self, msg: Any | None = None) -> "_DishkaBaseMiddleware":
self.msg = msg
return self

@asynccontextmanager
async def consume_scope(
self,
*args: Any,
**kwargs: Any,
) -> AsyncIterator[DecodedMessage]:
async with self.container() as request_container:
with context.scope("dishka", request_container):
async with super().consume_scope(*args, **kwargs) as result:
yield result

if FASTSTREAM_OLD_MIDDLEWARES:

class DishkaMiddleware(_DishkaBaseMiddleware):
@asynccontextmanager
async def consume_scope(
self,
*args: Any,
**kwargs: Any,
) -> AsyncIterator[DecodedMessage]:
async with self.container() as request_container:
with context.scope("dishka", request_container):
async with super().consume_scope(
*args,
**kwargs,
) as result:
yield result

else:

class DishkaMiddleware(_DishkaBaseMiddleware):
async def consume_scope(
self,
call_next: Callable[[Any], Awaitable[Any]],
msg: StreamMessage[any],
) -> AsyncIterator[DecodedMessage]:
async with self.container(
{
StreamMessage: msg,
type(msg): msg,
ContextRepo: context,
},
) as request_container:
with context.scope("dishka", request_container):
return await call_next(msg)


def setup_dishka(
container: AsyncContainer,
app: FastStream,
*,
finalize_container: bool = True,
auto_inject: bool = False,
) -> None:
assert app.broker, "You can't patch FastStream application without broker" # noqa: S101

if finalize_container:
app.after_shutdown(container.close)

app.broker.middlewares = (
*app.broker.middlewares,
DishkaMiddleware(container),
if FASTSTREAM_OLD_MIDDLEWARES:
app.broker.middlewares = (
DishkaMiddleware(container),
*app.broker.middlewares,
)

if auto_inject:
warnings.warn(
"""
Auto injection is not supported for FastStream version less than 0.5.0
Please, update your FastStream installation
or use @inject at each subscriber manually.
""",
category=RuntimeWarning,
stacklevel=1,
)

else:
app.broker._middlewares = ( # noqa: SLF001
DishkaMiddleware(container),
*app.broker._middlewares, # noqa: SLF001
)

if auto_inject:
app.broker._call_decorators = ( # noqa: SLF001
inject,
*app.broker._call_decorators, # noqa: SLF001
)


def inject(func):
return wrap_injection(
func=func,
container_getter=lambda *_: context.get_local("dishka"),
is_async=True,
remove_depends=True,
)
3 changes: 3 additions & 0 deletions tests/integrations/faststream/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("faststream")
6 changes: 4 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ env_list =
aiogram-330,
telebot-415,
starlette-0270,
faststream-044,
faststream-047,
faststream-050,
arq-0250
taskiq-0110

Expand All @@ -36,7 +37,8 @@ deps =
starlette-latest: -r requirements/starlette-latest.txt
starlette-0270: -r requirements/starlette-0270.txt
faststream-latest: -r requirements/faststream-latest.txt
faststream-044: -r requirements/faststream-044.txt
faststream-047: -r requirements/faststream-047.txt
faststream-050: -r requirements/faststream-050.txt
arq-latest: -r requirements/arq-latest.txt
arq-0250: -r requirements/arq-0250.txt
taskiq-latest: -r requirements/taskiq-latest.txt
Expand Down
Loading