Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service bus websocket connection break after a minute or so #31067

Open
hchandola opened this issue Jul 11, 2023 · 3 comments
Open

Service bus websocket connection break after a minute or so #31067

hchandola opened this issue Jul 11, 2023 · 3 comments
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. Messaging Messaging crew Service Bus

Comments

@hchandola
Copy link
Member

I have the following code that gives errors on complete_message whenever the function one_minute_work takes a minute or so. The error I see in the logs is: Cannot write to closing transport

service_bus_client = ServiceBusClient.from_connection_string(
        SERVICE_BUS_CONNECTION_STRING,
        transport_type=TransportType.AmqpOverWebsocket,
    )


async with AutoLockRenewer(…) as renewer:
   receiver = service_bus_client.get_subscription(topic, subscription, mode=ServiceBusReceiveMode.PEEK_LOCK, 
auto_lock_renewer=auto_lock_renewer)
   async with receiver:
	async for message in receiver:
		await one_minute_work(message)
		await receiver.complete_message(message)
@github-actions github-actions bot added Client This issue points to a problem in the data-plane of the library. needs-team-triage Workflow: This issue needs the team to triage. Service Bus labels Jul 11, 2023
@kashifkhan kashifkhan added the Messaging Messaging crew label Jul 11, 2023
@kashifkhan
Copy link
Member

I have been working with @hchandola via chat to resolve another issue when this one popped up.

  • The set up is a node in a Kubernetes cluster with ports open for amqp over websockets. The issue is reproducible by the user locally as well.
  • Switching over to amqp resolved the issue locally, but they need it to work with websockets and it should work with websockets.

Need to reproduce it locally ourselves, but this error originates from the aiohttp websocket library. My initial hunch is the heartbeat sent by the library fails and the underlying transport is set to closing.

Looking at the initial logs sent in, I didn't see any detaches come in from the service.

@kashifkhan kashifkhan removed the needs-team-triage Workflow: This issue needs the team to triage. label Jul 11, 2023
@selimb
Copy link

selimb commented May 31, 2024

Just stumbled upon this as well. This is quite easy to replicate:

import asyncio
import logging
import logging.config
import os

from azure.servicebus import TransportType
from azure.servicebus.aio import AutoLockRenewer, ServiceBusClient

logging.config.dictConfig(
    {
        "version": 1,
        "disable_existing_loggers": False,
        "root": {
            "level": "NOTSET",
            "handlers": ["console", "file"],
        },
        "handlers": {
            "console": {
                "formatter": "simple",
                "level": "INFO",
                "class": "logging.StreamHandler",
                "stream": "ext://sys.stdout",
            },
            "file": {
                "formatter": "detailed",
                "level": "DEBUG",
                "class": "logging.FileHandler",
                "filename": "sb.log",
                "mode": "w",
            },
        },
        "formatters": {
            "simple": {"format": "%(levelname)-8s | %(name)s | %(message)s"},
            "detailed": {
                "format": "[%(asctime)s][%(levelname)-8s][%(name)s] %(message)s"
            },
        },
    }
)
logging.captureWarnings(True)
logger = logging.getLogger("main")

connstr = os.environ["SERVICE_BUS_CONNECTION_STR"]
queue_name = os.environ["SERVICE_BUS_QUEUE_NAME"]


async def main() -> None:
    # CHANGE ME
    transport_type = TransportType.AmqpOverWebsocket
    async with ServiceBusClient.from_connection_string(
        conn_str=connstr,
        transport_type=transport_type,
    ) as sb_client:
        receiver = sb_client.get_queue_receiver(queue_name=queue_name)
        renewer = AutoLockRenewer()
        async with receiver, renewer:
            # CHANGE ME
            counter = 14
            while True:
                messages = await receiver.receive_messages(max_message_count=1)
                logger.info(f"Received {len(messages)} messages")
                if not messages:
                    break

                for msg in messages:
                    renewer.register(receiver, msg)
                    sleep_duration = float(counter)
                    logger.info(
                        f"Handling message {counter}: sleeping for {sleep_duration} ..."
                    )
                    await asyncio.sleep(sleep_duration)
                    logger.info(f"Handling message {counter} DONE")
                    await receiver.complete_message(msg)
                    counter += 1

asyncio.run(main())

Errors start happening right after the 15 second mark:

INFO     | main | Received 1 messages
INFO     | main | Handling message 14: sleeping for 14.0 ...
INFO     | main | Handling message 14 DONE
INFO     | main | Received 1 messages
INFO     | main | Handling message 15: sleeping for 15.0 ...
INFO     | main | Handling message 15 DONE
INFO     | main | Received 1 messages
INFO     | main | Handling message 16: sleeping for 16.0 ...
INFO     | main | Handling message 16 DONE
INFO     | azure.servicebus.aio._base_handler_async | AMQP error occurred: (AMQPConnectionError('Error condition: ErrorCondition.SocketError\n Error Description: Can not send frame out due to exception: Cannot write to closing transport')), condition: (<ErrorCondition.SocketError: b'amqp:socket-error'>), description: ('Can not send frame out due to exception: Cannot write to closing transport').

The problem is that aiohttp sends a heartbeat, but:

  • nothing ever calls .receive()
  • calling .receive() is the only way that the heartbeat can be "ACK'd"

Where does the magic "15 second" come from? azure sets heartbeat=10, and aiohttp

  • automatically schedules a heartbeat after heartbeat seconds (10) -- this timer is cancelled as soon .receive() is called, but this doesn't occur
  • once the heartbeat is sent, it expects to receive a response within heartbeat/2 seconds (5) -- this timer is also cancelled as soon as .receive() is called, but again this doesn't doesn't occur

10 + 5 = 15 QED.

@selimb
Copy link

selimb commented May 31, 2024

I was able to workaround the issue by renewing message locks more frequently (every 10 seconds), which forces communication and thus forces a .receive() call.

class LockRenewer:
    def __init__(self) -> None:
        self._stop_evt = asyncio.Event()
        # This value should be lower than `DEFAULT_WEBSOCKET_HEARTBEAT_SECONDS * 1.5`.
        self._renew_every = 10
        self._tasks: list[asyncio.Task] = []

    async def __aenter__(self) -> Self:
        self._stop_evt.clear()
        return self

    async def __aexit__(self, *_args: Any) -> None:
        await self._close()

    async def _close(self) -> None:
        self._stop_evt.set()
        if len(self._tasks) != 0:
            await asyncio.wait(self._tasks)
            self._tasks.clear()

    def register(
        self, receiver: ServiceBusReceiver, msg: ServiceBusReceivedMessage
    ) -> None:
        task = asyncio.create_task(self._auto_renew(receiver, msg))
        self._tasks.append(task)

    async def _auto_renew(
        self, receiver: ServiceBusReceiver, msg: ServiceBusReceivedMessage
    ) -> None:
        while True:
            with contextlib.suppress(asyncio.TimeoutError):
                await asyncio.wait_for(self._stop_evt.wait(), self._renew_every)
                # If the above succeeds, then `_stop_evt` was set.
                break

            logger.debug("Renewing...")
            try:
                await receiver.renew_message_lock(msg)
            except MessageAlreadySettled:
                break
            except Exception:
                logger.error("Failed to auto-renew message", exc_info=True)
                break
            else:
                logger.debug("Renewing DONE")

It may be possible to do this with azure's provided AutoLockRenewer instead, but I think it requires patching private attributes.

It may also be possible to workaround the issue by monkeypatching DEFAULT_WEBSOCKET_HEARTBEAT_SECONDS, either by increasing it or setting it to None, but this code comment suggests that this would not be wise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Client This issue points to a problem in the data-plane of the library. Messaging Messaging crew Service Bus
Projects
None yet
Development

No branches or pull requests

5 participants