Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return empty list instead of InconsistencyError when exchange table is empty #1404

Merged
merged 1 commit into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions kombu/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,6 @@
'connection_errors', 'channel_errors',
))

NO_ROUTE_ERROR = """
Cannot route message for exchange {0!r}: Table empty or key no longer exists.
Probably the key ({1!r}) has been removed from the Redis database.
"""

# This implementation may seem overly complex, but I assure you there is
# a good reason for doing it this way.
Expand Down Expand Up @@ -998,7 +994,9 @@ def get_table(self, exchange):
with self.conn_or_acquire() as client:
values = client.smembers(key)
if not values:
raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
# table does not exists since all queues bound to the exchange
# were deleted. We need just return empty list.
return []
return [tuple(bytes_to_str(val).split(self.sep)) for val in values]

def _purge(self, queue):
Expand Down
22 changes: 19 additions & 3 deletions t/integration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,19 +144,19 @@ def _consume(self, connection, queue):
with consumer:
connection.drain_events(timeout=1)

def _publish(self, channel, exchange, queues, routing_key=None):
def _publish(self, channel, exchange, queues=None, routing_key=None):
producer = kombu.Producer(channel, exchange=exchange)
if routing_key:
producer.publish(
{'hello': 'world'},
declare=list(queues),
declare=list(queues) if queues else None,
serializer='pickle',
routing_key=routing_key
)
else:
producer.publish(
{'hello': 'world'},
declare=list(queues),
declare=list(queues) if queues else None,
serializer='pickle'
)

Expand All @@ -181,6 +181,13 @@ def test_direct_routing_keys(self, connection):
# direct2 queue should not have data
with pytest.raises(socket.timeout):
self._consume(conn, test_queue2)
# test that publishing using key which is not used results in
# discarted message.
self._publish(channel, ex, [test_queue1, test_queue2], 'd3')
with pytest.raises(socket.timeout):
self._consume(conn, test_queue1)
with pytest.raises(socket.timeout):
self._consume(conn, test_queue2)

def test_fanout(self, connection):
ex = kombu.Exchange('test_fanout', type='fanout')
Expand Down Expand Up @@ -213,6 +220,15 @@ def test_topic(self, connection):
# topic3 queue should not have data
self._consume(conn, test_queue3)

def test_publish_empty_exchange(self, connection):
ex = kombu.Exchange('test_empty_exchange', type='topic')
with connection as conn:
with conn.channel() as channel:
self._publish(
channel, ex,
routing_key='t.1'
)


class BaseTimeToLive:
def test_publish_consume(self, connection):
Expand Down
11 changes: 5 additions & 6 deletions t/unit/transport/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import pytest

from kombu import Connection, Consumer, Exchange, Producer, Queue
from kombu.exceptions import InconsistencyError, VersionMismatch
from kombu.exceptions import VersionMismatch
from kombu.transport import virtual
from kombu.utils import eventio # patch poll
from kombu.utils.json import dumps
Expand Down Expand Up @@ -907,13 +907,12 @@ def test_empty_queues_key(self):
('celery', '', 'celery'),
]

# ... then for some reason, the _kombu.binding.celery key gets lost
# Remove one last queue from exchange. After this call no queue
# is in bound to exchange.
channel.client.srem(key)

# which raises a channel error so that the consumer/publisher
# can recover by redeclaring the required entities.
with pytest.raises(InconsistencyError):
self.channel.get_table('celery')
# get_table() should return empty list of queues
assert self.channel.get_table('celery') == []

def test_socket_connection(self):
with patch('kombu.transport.redis.Channel._create_client'):
Expand Down