Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Issue #1087 redis fix" #1106

Merged
merged 1 commit into from
Nov 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ Marcin Lulek (ergo) <info@webreactor.eu>
Marcin Puhacz <marcin.puhacz@gmail.com>
Mark Lavin <mlavin@caktusgroup.com>
markow <markow@red-sky.pl>
Matt Davis <matteius@gmail.com>
Matt Wise <wise@wiredgeek.net>
Maxime Rouyrre <rouyrre+git@gmail.com>
mdk <luc.mdk@gmail.com>
Expand Down
93 changes: 31 additions & 62 deletions kombu/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

import numbers
import socket
import warnings

from bisect import bisect
from collections import namedtuple
from contextlib import contextmanager
from time import time

from vine import promise

from kombu.exceptions import InconsistencyError, VersionMismatch
Expand All @@ -25,7 +25,6 @@
from kombu.utils.compat import _detect_environment

from . import virtual
from .virtual.base import UndeliverableWarning, UNDELIVERABLE_FMT

try:
import redis
Expand Down Expand Up @@ -147,8 +146,12 @@ def __init__(self, *args, **kwargs):
def append(self, message, delivery_tag):
delivery = message.delivery_info
EX, RK = delivery['exchange'], delivery['routing_key']
# Redis-py changed the format of zadd args in v3.0.0 to be like this
zadd_args = [{delivery_tag: time()}]
# TODO: Remove this once we soley on Redis-py 3.0.0+
if redis.VERSION[0] >= 3:
thedrow marked this conversation as resolved.
Show resolved Hide resolved
# Redis-py changed the format of zadd args in v3.0.0
zadd_args = [{delivery_tag: time()}]
else:
zadd_args = [time(), delivery_tag]

with self.pipe_or_acquire() as pipe:
pipe.zadd(self.unacked_index_key, *zadd_args) \
Expand Down Expand Up @@ -710,8 +713,7 @@ def _brpop_start(self, timeout=1):
queues = self._queue_cycle.consume(len(self.active_queues))
if not queues:
return
_q_for_pri = self._queue_for_priority
keys = [_q_for_pri(queue, pri) for pri in self.priority_steps
keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
for queue in queues] + [timeout or 0]
self._in_poll = self.client.connection
self.client.connection.send_command('BRPOP', *keys)
Expand Down Expand Up @@ -747,8 +749,7 @@ def _poll_error(self, type, **options):
def _get(self, queue):
with self.conn_or_acquire() as client:
for pri in self.priority_steps:
queue_name = self._queue_for_priority(queue, pri)
item = client.rpop(queue_name)
item = client.rpop(self._q_for_pri(queue, pri))
if item:
return loads(bytes_to_str(item))
raise Empty()
Expand All @@ -757,21 +758,14 @@ def _size(self, queue):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
queue_name = self._queue_for_priority(queue, pri)
pipe = pipe.llen(queue_name)
pipe = pipe.llen(self._q_for_pri(queue, pri))
sizes = pipe.execute()
size = sum(size for size in sizes
return sum(size for size in sizes
if isinstance(size, numbers.Integral))
return size

def _queue_for_priority(self, queue, pri):
def _q_for_pri(self, queue, pri):
pri = self.priority(pri)
if pri:
queue_args = (queue, self.sep, pri)
else:
queue_args = (queue, '', '')
priority_queue_name = '%s%s%s' % queue_args
return priority_queue_name
return '%s%s%s' % ((queue, self.sep, pri) if pri else (queue, '', ''))

def priority(self, n):
steps = self.priority_steps
Expand All @@ -782,7 +776,7 @@ def _put(self, queue, message, **kwargs):
pri = self._get_message_priority(message, reverse=False)

with self.conn_or_acquire() as client:
client.lpush(self._queue_for_priority(queue, pri), dumps(message))
client.lpush(self._q_for_pri(queue, pri), dumps(message))

def _put_fanout(self, exchange, message, routing_key, **kwargs):
"""Deliver fanout message."""
Expand Down Expand Up @@ -817,14 +811,14 @@ def _delete(self, queue, exchange, routing_key, pattern, *args, **kwargs):
queue or '']))
with client.pipeline() as pipe:
for pri in self.priority_steps:
pipe = pipe.delete(self._queue_for_priority(queue, pri))
pipe = pipe.delete(self._q_for_pri(queue, pri))
pipe.execute()

def _has_queue(self, queue, **kwargs):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
pipe = pipe.exists(self._queue_for_priority(queue, pri))
pipe = pipe.exists(self._q_for_pri(queue, pri))
return any(pipe.execute())

def get_table(self, exchange):
Expand All @@ -835,55 +829,30 @@ def get_table(self, exchange):
raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
return [tuple(bytes_to_str(val).split(self.sep)) for val in values]

def _lookup(self, exchange, routing_key, default=None):
"""Find all queues matching `routing_key` for the given `exchange`.
def _lookup_direct(self, exchange, routing_key):
if not exchange:
return [routing_key]

Returns:
str: queue name -- must return the string `default`
if no queues matched.
"""
key = self.keyprefix_queue % exchange
pattern = ''
result = []
if default is None:
default = self.deadletter_queue
if not exchange: # anon exchange
return [routing_key or default]

queue = routing_key
redis_key = self.keyprefix_queue % exchange
try:
queue_bind = self.sep.join([
routing_key or '',
pattern,
queue or '',
])
with self.conn_or_acquire() as client:
if not client.scard(redis_key):
pass # Do not check if its a member because set is empty
elif client.sismember(redis_key, queue_bind):
result = [queue]
except KeyError:
pass

if not result:
if default is not None:
warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format(
exchange=exchange, routing_key=routing_key)),
)
self._new_queue(default)
result = [default]
else:
raise InconsistencyError(NO_ROUTE_ERROR.format(
exchange, redis_key))
queue_bind = self.sep.join([
routing_key or '',
pattern,
queue or '',
])
with self.conn_or_acquire() as client:
if client.sismember(key, queue_bind):
return [queue]

return result
return []

def _purge(self, queue):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
priority_queue = self._queue_for_priority(queue, pri)
pipe = pipe.llen(priority_queue).delete(priority_queue)
priq = self._q_for_pri(queue, pri)
pipe = pipe.llen(priq).delete(priq)
sizes = pipe.execute()
return sum(sizes[::2])

Expand Down
26 changes: 21 additions & 5 deletions kombu/transport/virtual/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,20 +710,36 @@ def _lookup(self, exchange, routing_key, default=None):
return [routing_key or default]

try:
result = self.typeof(exchange).lookup(
R = self.typeof(exchange).lookup(
self.get_table(exchange),
exchange, routing_key, default,
)
except KeyError:
result = []
R = []

if not result and default is not None:
if not R and default is not None:
warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format(
exchange=exchange, routing_key=routing_key)),
)
self._new_queue(default)
result = [default]
return result
R = [default]
return R

def _lookup_direct(self, exchange, routing_key):
"""Find queue matching `routing_key` for given direct `exchange`.

Returns:
str: queue name
"""
if not exchange:
return [routing_key]

return self.exchange_types['direct'].lookup(
table=self.get_table(exchange),
exchange=exchange,
routing_key=routing_key,
default=None,
)

def _restore(self, message):
"""Redeliver message to its original destination."""
Expand Down
2 changes: 1 addition & 1 deletion kombu/transport/virtual/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def lookup(self, table, exchange, routing_key, default):
}

def deliver(self, message, exchange, routing_key, **kwargs):
_lookup = self.channel._lookup
_lookup = self.channel._lookup_direct
_put = self.channel._put
for queue in _lookup(exchange, routing_key):
_put(queue, message, **kwargs)
Expand Down
1 change: 0 additions & 1 deletion requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@ case>=1.5.2
pytest
pytest-sugar
Pyro4
fakeredis==1.0.4
Loading