diff --git a/components/renku_data_services/message_queue/config.py b/components/renku_data_services/message_queue/config.py index 27aa60a7c..09f25a864 100644 --- a/components/renku_data_services/message_queue/config.py +++ b/components/renku_data_services/message_queue/config.py @@ -73,3 +73,7 @@ def redis_connection(self) -> redis.Redis: health_check_interval=60, ) return self._connection + + def reset_redis_connection(self) -> None: + """Forces a full reconnect to redis.""" + self._connection = None diff --git a/components/renku_data_services/message_queue/redis_queue.py b/components/renku_data_services/message_queue/redis_queue.py index 9c344e325..03a371e10 100644 --- a/components/renku_data_services/message_queue/redis_queue.py +++ b/components/renku_data_services/message_queue/redis_queue.py @@ -7,6 +7,7 @@ from typing import Concatenate, ParamSpec, Protocol, TypeVar from dataclasses_avroschema.schema_generator import AvroModel +from redis.asyncio.sentinel import MasterNotFoundError from sqlalchemy.ext.asyncio import AsyncSession from renku_data_services.errors import errors @@ -82,4 +83,8 @@ async def send_message(self, event: Event) -> None: """Send a message on a channel.""" message = copy.copy(event.serialize()) - await self.config.redis_connection.xadd(event.queue, message) + try: + await self.config.redis_connection.xadd(event.queue, message) + except MasterNotFoundError: + self.config.reset_redis_connection() # force redis reconnection + await self.config.redis_connection.xadd(event.queue, message)