Skip to content

Commit

Permalink
add accept parameter to SimpleQueue class (#1140)
Browse files Browse the repository at this point in the history
* add accept parameter to SimpleQueue class

* Fixed missing accept for get_nowait() and added unittests

* Remove unused **kwargs from SimpleQueue.__init__

* Use self.consumer.accept instead of new attribute in SimpleQueue

* Add tests for simple interface when accept=[]

Co-authored-by: Matus Valo <matusvalo@gmail.com>
  • Loading branch information
lsaavedr and matusvalo committed May 22, 2021
1 parent a920bb0 commit e6d76d1
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
8 changes: 4 additions & 4 deletions kombu/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def get(self, block=True, timeout=None):
remaining = timeout - elapsed

def get_nowait(self):
m = self.queue.get(no_ack=self.no_ack)
m = self.queue.get(no_ack=self.no_ack, accept=self.consumer.accept)
if not m:
raise self.Empty()
return m
Expand Down Expand Up @@ -118,7 +118,7 @@ class SimpleQueue(SimpleBase):

def __init__(self, channel, name, no_ack=None, queue_opts=None,
queue_args=None, exchange_opts=None, serializer=None,
compression=None, **kwargs):
compression=None, accept=None):
queue = name
queue_opts = dict(self.queue_opts, **queue_opts or {})
queue_args = dict(self.queue_args, **queue_args or {})
Expand All @@ -134,13 +134,13 @@ def __init__(self, channel, name, no_ack=None, queue_opts=None,
else:
exchange = queue.exchange
routing_key = queue.routing_key
consumer = messaging.Consumer(channel, queue)
consumer = messaging.Consumer(channel, queue, accept=accept)
producer = messaging.Producer(channel, exchange,
serializer=serializer,
routing_key=routing_key,
compression=compression)
super().__init__(channel, producer,
consumer, no_ack, **kwargs)
consumer, no_ack)


class SimpleBuffer(SimpleQueue):
Expand Down
36 changes: 33 additions & 3 deletions t/unit/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from unittest.mock import Mock

from kombu import Connection, Exchange, Queue
from kombu.exceptions import ContentDisallowed


class SimpleBase:
Expand All @@ -21,13 +22,10 @@ def _Queue(self, *args, **kwargs):
def setup(self):
self.connection = Connection(transport='memory')
self.connection.default_channel.exchange_declare('amq.direct')
self.q = self.Queue(None, no_ack=True)

def teardown(self):
self.q.close()
self.connection.close()
self.connection = None
self.q = None

def test_produce__consume(self):
q = self.Queue('test_produce__consume', no_ack=True)
Expand All @@ -50,6 +48,38 @@ def test_produce__basic_get(self):
with pytest.raises(q.Empty):
q.get(block=False)

def test_get_nowait_accept(self):
q = self.Queue('test_accept', serializer='pickle', accept=['json'])
q.put({'hello': 'SimpleSync'})
with pytest.raises(ContentDisallowed):
q.get_nowait().payload

q = self.Queue('test_accept1', serializer='json', accept=[])
q.put({'hello': 'SimpleSync'})
with pytest.raises(ContentDisallowed):
q.get_nowait().payload

q = self.Queue(
'test_accept2', serializer='pickle', accept=['json', 'pickle'])
q.put({'hello': 'SimpleSync'})
assert q.get_nowait().payload == {'hello': 'SimpleSync'}

def test_get_accept(self):
q = self.Queue('test_accept', serializer='pickle', accept=['json'])
q.put({'hello': 'SimpleSync'})
with pytest.raises(ContentDisallowed):
q.get().payload

q = self.Queue('test_accept1', serializer='pickle', accept=[])
q.put({'hello': 'SimpleSync'})
with pytest.raises(ContentDisallowed):
q.get().payload

q = self.Queue(
'test_accept2', serializer='pickle', accept=['json', 'pickle'])
q.put({'hello': 'SimpleSync'})
assert q.get().payload == {'hello': 'SimpleSync'}

def test_clear(self):
q = self.Queue('test_clear', no_ack=True)

Expand Down

0 comments on commit e6d76d1

Please sign in to comment.