Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 1063 on 4.x branch #1262

Closed
wants to merge 10 commits into from
18 changes: 17 additions & 1 deletion kombu/pidbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from . import Exchange, Queue, Consumer, Producer
from .clocks import LamportClock
from .common import maybe_declare, oid_from
from .exceptions import InconsistencyError
from .exceptions import InconsistencyError, OperationalError
from .five import range, string_t
from .log import get_logger
from .utils.functional import maybe_evaluate, reprcall
Expand Down Expand Up @@ -151,6 +151,14 @@ def reply(self, data, exchange, routing_key, ticket, **kwargs):
serializer=self.mailbox.serializer)


def is_no_route_error_for_reply_celery_pidbox(exc_str):
if "Cannot route message for exchange" in exc_str \
and "Table empty or key no longer exists" in exc_str \
and "reply.celery.pidbox" in exc_str:
return True
return False


class Mailbox(object):
"""Process Mailbox."""

Expand Down Expand Up @@ -284,6 +292,14 @@ def _publish_reply(self, reply, exchange, routing_key, ticket,
}, retry=True,
**opts
)
except OperationalError as exc:
# Fixes https://github.com/celery/kombu/issues/1063

if not exc.args and not is_no_route_error_for_reply_celery_pidbox(exc.args[0]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what exactly happens here?
Why reporting this error is good enough?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not raising the error if it's got a certain message.

raise

error('NO_ROUTE_ERROR caught: %r', exc, exc_info=1)

except InconsistencyError:
# queue probably deleted and no one is expecting a reply.
pass
Expand Down
4 changes: 2 additions & 2 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
https://github.com/celery/py-amqp/zipball/master
https://github.com/celery/vine/zipball/master
https://github.com/celery/py-amqp/zipball/v2.6
hsophie-sf marked this conversation as resolved.
Show resolved Hide resolved
https://github.com/celery/vine/zipball/1.0
2 changes: 1 addition & 1 deletion requirements/extras/brotli.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
brotlipy>=0.7.0;platform_python_implementation=="PyPy"
brotli>=1.0.0;platform_python_implementation=="CPython"
brotli==1.0.7;platform_python_implementation=="CPython"
hsophie-sf marked this conversation as resolved.
Show resolved Hide resolved
24 changes: 23 additions & 1 deletion t/unit/test_pidbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

from case import Mock, patch

import kombu
from kombu import Connection
from kombu import pidbox
from kombu.exceptions import ContentDisallowed, InconsistencyError
from kombu.exceptions import ContentDisallowed, InconsistencyError, OperationalError
from kombu.transport import redis
from kombu.utils.uuid import uuid


Expand Down Expand Up @@ -58,6 +60,26 @@ def test_publish_reply_ignores_InconsistencyError(self):
)
producer.publish.assert_called()

def test_publish_reply_handles_redis_OperationalError_wth_no_route_error_msg(self):
mailbox = pidbox.Mailbox('reply.celery')(self.connection)
exchange = mailbox.reply_exchange.name
channel = self.connection.channel()
mailbox.reply_queue(channel).declare()
ticket = uuid()
# Using Channel._lookup as a proxy (in absence of a redis integration test) to mock the actual
# redis.get_table call which produces the OperationalError, since _lookup is just a level above
# redis.get_table on the stack trace.
with patch.object(kombu.transport.virtual.Channel, '_lookup') as simulate_redis_get_table_err:
# raise the actual redis error
simulate_redis_get_table_err.side_effect = OperationalError(
redis.NO_ROUTE_ERROR.format(exchange, mailbox.oid))
try:
hsophie-sf marked this conversation as resolved.
Show resolved Hide resolved
mailbox._publish_reply({'foo': 'bar'}, exchange, mailbox.oid, ticket)
except OperationalError as exc:
if pidbox.is_no_route_error_for_reply_celery_pidbox(exc.args[0]):
pytest.fail("NO_ROUTE_ERROR with specific message should have been caught")


def test_reply__collect(self):
mailbox = pidbox.Mailbox('test_reply__collect')(self.connection)
exchange = mailbox.reply_exchange.name
Expand Down