-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Service bus websocket connection break after a minute or so #31067
Comments
I have been working with @hchandola via chat to resolve another issue when this one popped up.
Need to reproduce it locally ourselves, but this error originates from the aiohttp websocket library. My initial hunch is the heartbeat sent by the library fails and the underlying transport is set to closing. Looking at the initial logs sent in, I didn't see any detaches come in from the service. |
Just stumbled upon this as well. This is quite easy to replicate: import asyncio
import logging
import logging.config
import os
from azure.servicebus import TransportType
from azure.servicebus.aio import AutoLockRenewer, ServiceBusClient
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"root": {
"level": "NOTSET",
"handlers": ["console", "file"],
},
"handlers": {
"console": {
"formatter": "simple",
"level": "INFO",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
"file": {
"formatter": "detailed",
"level": "DEBUG",
"class": "logging.FileHandler",
"filename": "sb.log",
"mode": "w",
},
},
"formatters": {
"simple": {"format": "%(levelname)-8s | %(name)s | %(message)s"},
"detailed": {
"format": "[%(asctime)s][%(levelname)-8s][%(name)s] %(message)s"
},
},
}
)
logging.captureWarnings(True)
logger = logging.getLogger("main")
connstr = os.environ["SERVICE_BUS_CONNECTION_STR"]
queue_name = os.environ["SERVICE_BUS_QUEUE_NAME"]
async def main() -> None:
# CHANGE ME
transport_type = TransportType.AmqpOverWebsocket
async with ServiceBusClient.from_connection_string(
conn_str=connstr,
transport_type=transport_type,
) as sb_client:
receiver = sb_client.get_queue_receiver(queue_name=queue_name)
renewer = AutoLockRenewer()
async with receiver, renewer:
# CHANGE ME
counter = 14
while True:
messages = await receiver.receive_messages(max_message_count=1)
logger.info(f"Received {len(messages)} messages")
if not messages:
break
for msg in messages:
renewer.register(receiver, msg)
sleep_duration = float(counter)
logger.info(
f"Handling message {counter}: sleeping for {sleep_duration} ..."
)
await asyncio.sleep(sleep_duration)
logger.info(f"Handling message {counter} DONE")
await receiver.complete_message(msg)
counter += 1
asyncio.run(main()) Errors start happening right after the 15 second mark:
The problem is that aiohttp sends a heartbeat, but:
Where does the magic "15 second" come from? azure sets heartbeat=10, and aiohttp
|
I was able to workaround the issue by renewing message locks more frequently (every 10 seconds), which forces communication and thus forces a class LockRenewer:
def __init__(self) -> None:
self._stop_evt = asyncio.Event()
# This value should be lower than `DEFAULT_WEBSOCKET_HEARTBEAT_SECONDS * 1.5`.
self._renew_every = 10
self._tasks: list[asyncio.Task] = []
async def __aenter__(self) -> Self:
self._stop_evt.clear()
return self
async def __aexit__(self, *_args: Any) -> None:
await self._close()
async def _close(self) -> None:
self._stop_evt.set()
if len(self._tasks) != 0:
await asyncio.wait(self._tasks)
self._tasks.clear()
def register(
self, receiver: ServiceBusReceiver, msg: ServiceBusReceivedMessage
) -> None:
task = asyncio.create_task(self._auto_renew(receiver, msg))
self._tasks.append(task)
async def _auto_renew(
self, receiver: ServiceBusReceiver, msg: ServiceBusReceivedMessage
) -> None:
while True:
with contextlib.suppress(asyncio.TimeoutError):
await asyncio.wait_for(self._stop_evt.wait(), self._renew_every)
# If the above succeeds, then `_stop_evt` was set.
break
logger.debug("Renewing...")
try:
await receiver.renew_message_lock(msg)
except MessageAlreadySettled:
break
except Exception:
logger.error("Failed to auto-renew message", exc_info=True)
break
else:
logger.debug("Renewing DONE") It may be possible to do this with azure's provided It may also be possible to workaround the issue by monkeypatching |
I have the following code that gives errors on
complete_message
whenever the functionone_minute_work
takes a minute or so. The error I see in the logs is:Cannot write to closing transport
The text was updated successfully, but these errors were encountered: