Skip to content

Commit

Permalink
Return empty list instead of InconsistencyError when exchange table i…
Browse files Browse the repository at this point in the history
…s empty

Missing redis key containg set of queues bound to queue is caused
by removal all queues from the exchange. Hence, we should not raise an
exception but just return empty list of queues instead. This commit
fixes e.g. case publishing message against empty exchange:

import kombu
conn = kombu.Connection('redis://')
exchange = kombu.Exchange('name', type='direct')
exchange.declare(channel=conn.default_channel)

producer = conn.Producer()
producer.publish(
     {'hello': 'world'},
     exchange=exchange,
     routing_key='queue1'
)

But it also fixes the case when last queue is unbound from exchange and
after publishing to this exchange:

import kombu
conn = kombu.Connection('redis://')
exchange = kombu.Exchange('name', type='direct')
queue1 = kombu.Queue('queue1', exchange=exchange, routing_key='queue1')
exchange.declare(channel=conn.default_channel)
queue1 = queue1.bind(channel=conn.default_channel)
queue1.declare()

producer = conn.Producer()
producer.publish(
     {'hello': 'world'},
     exchange=exchange,
     routing_key='queue1'
)

queue1.delete()

producer.publish(
     {'hello': 'world'},
     exchange=exchange,
     routing_key='queue1'
)
  • Loading branch information
matusvalo committed Oct 24, 2021
1 parent 5fefeb8 commit c1bb06f
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 14 deletions.
8 changes: 3 additions & 5 deletions kombu/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,6 @@
'connection_errors', 'channel_errors',
))

NO_ROUTE_ERROR = """
Cannot route message for exchange {0!r}: Table empty or key no longer exists.
Probably the key ({1!r}) has been removed from the Redis database.
"""

# This implementation may seem overly complex, but I assure you there is
# a good reason for doing it this way.
Expand Down Expand Up @@ -998,7 +994,9 @@ def get_table(self, exchange):
with self.conn_or_acquire() as client:
values = client.smembers(key)
if not values:
raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
# table does not exists since all queues bound to the exchange
# were deleted. We need just return empty list.
return []
return [tuple(bytes_to_str(val).split(self.sep)) for val in values]

def _purge(self, queue):
Expand Down
22 changes: 19 additions & 3 deletions t/integration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,19 +144,19 @@ def _consume(self, connection, queue):
with consumer:
connection.drain_events(timeout=1)

def _publish(self, channel, exchange, queues, routing_key=None):
def _publish(self, channel, exchange, queues=None, routing_key=None):
producer = kombu.Producer(channel, exchange=exchange)
if routing_key:
producer.publish(
{'hello': 'world'},
declare=list(queues),
declare=list(queues) if queues else None,
serializer='pickle',
routing_key=routing_key
)
else:
producer.publish(
{'hello': 'world'},
declare=list(queues),
declare=list(queues) if queues else None,
serializer='pickle'
)

Expand All @@ -181,6 +181,13 @@ def test_direct_routing_keys(self, connection):
# direct2 queue should not have data
with pytest.raises(socket.timeout):
self._consume(conn, test_queue2)
# test that publishing using key which is not used results in
# discarted message.
self._publish(channel, ex, [test_queue1, test_queue2], 'd3')
with pytest.raises(socket.timeout):
self._consume(conn, test_queue1)
with pytest.raises(socket.timeout):
self._consume(conn, test_queue2)

def test_fanout(self, connection):
ex = kombu.Exchange('test_fanout', type='fanout')
Expand Down Expand Up @@ -213,6 +220,15 @@ def test_topic(self, connection):
# topic3 queue should not have data
self._consume(conn, test_queue3)

def test_publish_empty_exchange(self, connection):
ex = kombu.Exchange('test_empty_exchange', type='topic')
with connection as conn:
with conn.channel() as channel:
self._publish(
channel, ex,
routing_key='t.1'
)


class BaseTimeToLive:
def test_publish_consume(self, connection):
Expand Down
11 changes: 5 additions & 6 deletions t/unit/transport/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import pytest

from kombu import Connection, Consumer, Exchange, Producer, Queue
from kombu.exceptions import InconsistencyError, VersionMismatch
from kombu.exceptions import VersionMismatch
from kombu.transport import virtual
from kombu.utils import eventio # patch poll
from kombu.utils.json import dumps
Expand Down Expand Up @@ -907,13 +907,12 @@ def test_empty_queues_key(self):
('celery', '', 'celery'),
]

# ... then for some reason, the _kombu.binding.celery key gets lost
# Remove one last queue from exchange. After this call no queue
# is in bound to exchange.
channel.client.srem(key)

# which raises a channel error so that the consumer/publisher
# can recover by redeclaring the required entities.
with pytest.raises(InconsistencyError):
self.channel.get_table('celery')
# get_table() should return empty list of queues
assert self.channel.get_table('celery') == []

def test_socket_connection(self):
with patch('kombu.transport.redis.Channel._create_client'):
Expand Down

0 comments on commit c1bb06f

Please sign in to comment.