Skip to content

Commit

Permalink
Revert "Port of redis code improvements from prior revision (#1132)"
Browse files Browse the repository at this point in the history
This reverts commit 753f4ec.
  • Loading branch information
matusvalo committed Mar 6, 2021
1 parent 753f4ec commit 2974f01
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 581 deletions.
1 change: 0 additions & 1 deletion AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ Marcin Lulek (ergo) <info@webreactor.eu>
Marcin Puhacz <marcin.puhacz@gmail.com>
Mark Lavin <mlavin@caktusgroup.com>
markow <markow@red-sky.pl>
Matt Davis <matteius@gmail.com>
Matt Wise <wise@wiredgeek.net>
Maxime Rouyrre <rouyrre+git@gmail.com>
mdk <luc.mdk@gmail.com>
Expand Down
29 changes: 13 additions & 16 deletions kombu/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,8 +766,7 @@ def _brpop_start(self, timeout=1):
queues = self._queue_cycle.consume(len(self.active_queues))
if not queues:
return
_q_for_pri = self._queue_for_priority
keys = [_q_for_pri(queue, pri) for pri in self.priority_steps
keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
for queue in queues] + [timeout or 0]
self._in_poll = self.client.connection
self.client.connection.send_command('BRPOP', *keys)
Expand Down Expand Up @@ -803,8 +802,7 @@ def _poll_error(self, type, **options):
def _get(self, queue):
with self.conn_or_acquire() as client:
for pri in self.priority_steps:
queue_name = self._queue_for_priority(queue, pri)
item = client.rpop(queue_name)
item = client.rpop(self._q_for_pri(queue, pri))
if item:
return loads(bytes_to_str(item))
raise Empty()
Expand All @@ -813,15 +811,14 @@ def _size(self, queue):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
queue_name = self._queue_for_priority(queue, pri)
pipe = pipe.llen(queue_name)
pipe = pipe.llen(self._q_for_pri(queue, pri))
sizes = pipe.execute()
return sum(s for s in sizes
if isinstance(s, numbers.Integral))
return sum(size for size in sizes
if isinstance(size, numbers.Integral))

def _queue_for_priority(self, queue, pri):
queue_priority = self.priority(pri)
if queue_priority:
def _q_for_pri(self, queue, pri):
pri = self.priority(pri)
if pri:
return f"{queue}{self.sep}{pri}"
return queue

Expand All @@ -834,7 +831,7 @@ def _put(self, queue, message, **kwargs):
pri = self._get_message_priority(message, reverse=False)

with self.conn_or_acquire() as client:
client.lpush(self._queue_for_priority(queue, pri), dumps(message))
client.lpush(self._q_for_pri(queue, pri), dumps(message))

def _put_fanout(self, exchange, message, routing_key, **kwargs):
"""Deliver fanout message."""
Expand Down Expand Up @@ -869,14 +866,14 @@ def _delete(self, queue, exchange, routing_key, pattern, *args, **kwargs):
queue or '']))
with client.pipeline() as pipe:
for pri in self.priority_steps:
pipe = pipe.delete(self._queue_for_priority(queue, pri))
pipe = pipe.delete(self._q_for_pri(queue, pri))
pipe.execute()

def _has_queue(self, queue, **kwargs):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
pipe = pipe.exists(self._queue_for_priority(queue, pri))
pipe = pipe.exists(self._q_for_pri(queue, pri))
return any(pipe.execute())

def get_table(self, exchange):
Expand All @@ -891,8 +888,8 @@ def _purge(self, queue):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
priority_queue = self._queue_for_priority(queue, pri)
pipe = pipe.llen(priority_queue).delete(priority_queue)
priq = self._q_for_pri(queue, pri)
pipe = pipe.llen(priq).delete(priq)
sizes = pipe.execute()
return sum(sizes[::2])

Expand Down
1 change: 0 additions & 1 deletion requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@ case>=1.5.2
pytest<=5.3.5
pytest-sugar
Pyro4
fakeredis==1.1.0
Loading

0 comments on commit 2974f01

Please sign in to comment.