Skip to content

Commit

Permalink
fix: fully reconnect to redis if master is not found (#354)
Browse files Browse the repository at this point in the history
Co-authored-by: Ralf Grubenmann <ralf.grubenmann@gmail.com>
  • Loading branch information
Panaetius and Ralf Grubenmann authored Sep 2, 2024
1 parent 83947bf commit 7bfb4b5
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
4 changes: 4 additions & 0 deletions components/renku_data_services/message_queue/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,7 @@ def redis_connection(self) -> redis.Redis:
health_check_interval=60,
)
return self._connection

def reset_redis_connection(self) -> None:
"""Forces a full reconnect to redis."""
self._connection = None
7 changes: 6 additions & 1 deletion components/renku_data_services/message_queue/redis_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Concatenate, ParamSpec, Protocol, TypeVar

from dataclasses_avroschema.schema_generator import AvroModel
from redis.asyncio.sentinel import MasterNotFoundError
from sqlalchemy.ext.asyncio import AsyncSession

from renku_data_services.errors import errors
Expand Down Expand Up @@ -82,4 +83,8 @@ async def send_message(self, event: Event) -> None:
"""Send a message on a channel."""
message = copy.copy(event.serialize())

await self.config.redis_connection.xadd(event.queue, message)
try:
await self.config.redis_connection.xadd(event.queue, message)
except MasterNotFoundError:
self.config.reset_redis_connection() # force redis reconnection
await self.config.redis_connection.xadd(event.queue, message)

0 comments on commit 7bfb4b5

Please sign in to comment.