Skip to content

Commit

Permalink
Clarify that _new_queue returns the queue URL
Browse files Browse the repository at this point in the history
It seems that prior to 129a9e4 it returned a queue
object but this is no longer the case so update comments
variable names accordingly to make it clearer.

Also remove the incorrect fallback which cannot
be correct any more given the return value has to
be the queue URL which must be a string.
  • Loading branch information
sparrowt committed Nov 23, 2022
1 parent d511dca commit 625af1a
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 24 deletions.
48 changes: 26 additions & 22 deletions kombu/transport/SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class Channel(virtual.Channel):
_predefined_queue_async_clients = {} # A client for each predefined queue
_sqs = None
_predefined_queue_clients = {} # A client for each predefined queue
_queue_cache = {}
_queue_cache = {} # SQS queue name => SQS queue URL
_noack_queues = set()
QoS = QoS

Expand Down Expand Up @@ -341,34 +341,38 @@ def canonical_queue_name(self, queue_name):
return self.entity_name(self.queue_name_prefix + queue_name)

def _new_queue(self, queue, **kwargs):
"""Ensure a queue with given name exists in SQS."""
if not isinstance(queue, str):
return queue
"""
Ensure a queue with given name exists in SQS.
Arguments:
queue (str): the AMQP queue name
Returns:
str: the SQS queue URL
"""
# Translate to SQS name for consistency with initial
# _queue_cache population.
queue = self.canonical_queue_name(queue)
sqs_qname = self.canonical_queue_name(queue)

# The SQS ListQueues method only returns 1000 queues. When you have
# so many queues, it's possible that the queue you are looking for is
# not cached. In this case, we could update the cache with the exact
# queue name first.
if queue not in self._queue_cache:
self._update_queue_cache(queue)
if sqs_qname not in self._queue_cache:
self._update_queue_cache(sqs_qname)
try:
return self._queue_cache[queue]
return self._queue_cache[sqs_qname]
except KeyError:
if self.predefined_queues:
raise UndefinedQueueException((
"Queue with name '{}' must be "
"defined in 'predefined_queues'."
).format(queue))
).format(sqs_qname))

attributes = {'VisibilityTimeout': str(self.visibility_timeout)}
if queue.endswith('.fifo'):
if sqs_qname.endswith('.fifo'):
attributes['FifoQueue'] = 'true'

resp = self._create_queue(queue, attributes)
self._queue_cache[queue] = resp['QueueUrl']
resp = self._create_queue(sqs_qname, attributes)
self._queue_cache[sqs_qname] = resp['QueueUrl']
return resp['QueueUrl']

def _create_queue(self, queue_name, attributes):
Expand Down Expand Up @@ -441,13 +445,13 @@ def _optional_b64_decode(byte_string):
pass
return byte_string

def _message_to_python(self, message, queue_name, queue):
def _message_to_python(self, message, queue_name, q_url):
body = self._optional_b64_decode(message['Body'].encode())
payload = loads(bytes_to_str(body))
if queue_name in self._noack_queues:
queue = self._new_queue(queue_name)
q_url = self._new_queue(queue_name)
self.asynsqs(queue=queue_name).delete_message(
queue,
q_url,
message['ReceiptHandle'],
)
else:
Expand All @@ -464,7 +468,7 @@ def _message_to_python(self, message, queue_name, queue):
})
# set delivery tag to SQS receipt handle
delivery_info.update({
'sqs_message': message, 'sqs_queue': queue,
'sqs_message': message, 'sqs_queue': q_url,
})
properties['delivery_tag'] = message['ReceiptHandle']
return payload
Expand All @@ -483,8 +487,8 @@ def _messages_to_python(self, messages, queue):
Returns:
List: A list of Payload objects
"""
q = self._new_queue(queue)
return [self._message_to_python(m, queue, q) for m in messages]
q_url = self._new_queue(queue)
return [self._message_to_python(m, queue, q_url) for m in messages]

def _get_bulk(self, queue,
max_if_unlimited=SQS_MAX_MESSAGES, callback=None):
Expand Down Expand Up @@ -628,24 +632,24 @@ def basic_ack(self, delivery_tag, multiple=False):

def _size(self, queue):
"""Return the number of messages in a queue."""
url = self._new_queue(queue)
q_url = self._new_queue(queue)
c = self.sqs(queue=self.canonical_queue_name(queue))
resp = c.get_queue_attributes(
QueueUrl=url,
QueueUrl=q_url,
AttributeNames=['ApproximateNumberOfMessages'])
return int(resp['Attributes']['ApproximateNumberOfMessages'])

def _purge(self, queue):
"""Delete all current messages in a queue."""
q = self._new_queue(queue)
q_url = self._new_queue(queue)
# SQS is slow at registering messages, so run for a few
# iterations to ensure messages are detected and deleted.
size = 0
for i in range(10):
size += int(self._size(queue))
if not size:
break
self.sqs(queue=queue).purge_queue(QueueUrl=q)
self.sqs(queue=queue).purge_queue(QueueUrl=q_url)
return size

def close(self):
Expand Down
4 changes: 2 additions & 2 deletions kombu/transport/virtual/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,8 +698,8 @@ def _lookup(self, exchange, routing_key, default=None):
"""Find all queues matching `routing_key` for the given `exchange`.
Returns:
str: queue name -- must return the string `default`
if no queues matched.
list[str]: queue names -- must return `[default]`
if default is set and no queues matched.
"""
if default is None:
default = self.deadletter_queue
Expand Down

0 comments on commit 625af1a

Please sign in to comment.