Skip to content

Commit

Permalink
azure service bus: add type annotations and use cached property
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonwbarnett committed Jan 4, 2023
1 parent 54cd277 commit 646bbdc
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 32 deletions.
53 changes: 23 additions & 30 deletions kombu/transport/azureservicebus.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

import string
from queue import Empty
from typing import Any
from typing import Any, Dict, Set

import azure.core.exceptions
import azure.servicebus.exceptions
Expand Down Expand Up @@ -87,8 +87,8 @@ class SendReceive:
def __init__(self,
receiver: ServiceBusReceiver | None = None,
sender: ServiceBusSender | None = None):
self.receiver = receiver # type: ServiceBusReceiver
self.sender = sender # type: ServiceBusSender
self.receiver: ServiceBusReceiver = receiver
self.sender: ServiceBusSender = sender

def close(self) -> None:
if self.receiver:
Expand All @@ -102,21 +102,19 @@ def close(self) -> None:
class Channel(virtual.Channel):
"""Azure Service Bus channel."""

default_wait_time_seconds = 5 # in seconds
default_peek_lock_seconds = 60 # in seconds (default 60, max 300)
default_wait_time_seconds: int = 5 # in seconds
default_peek_lock_seconds: int = 60 # in seconds (default 60, max 300)
# in seconds (is the default from service bus repo)
default_uamqp_keep_alive_interval = 30
default_uamqp_keep_alive_interval: int = 30
# number of retries (is the default from service bus repo)
default_retry_total = 3
default_retry_total: int = 3
# exponential backoff factor (is the default from service bus repo)
default_retry_backoff_factor = 0.8
default_retry_backoff_factor: float = 0.8
# Max time to backoff (is the default from service bus repo)
default_retry_backoff_max = 120
domain_format = 'kombu%(vhost)s'
_queue_service = None # type: ServiceBusClient
_queue_mgmt_service = None # type: ServiceBusAdministrationClient
_queue_cache = {} # type: Dict[str, SendReceive]
_noack_queues = set() # type: Set[str]
default_retry_backoff_max: int = 120
domain_format: str = 'kombu%(vhost)s'
_queue_cache: Dict[str, SendReceive] = {}
_noack_queues: Set[str] = set()

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -229,7 +227,7 @@ def _delete(self, queue: str, *args, **kwargs) -> None:
"""Delete queue by name."""
queue = self.entity_name(self.queue_name_prefix + queue)

self._queue_mgmt_service.delete_queue(queue)
self.queue_mgmt_service.delete_queue(queue)
send_receive_obj = self._queue_cache.pop(queue, None)
if send_receive_obj:
send_receive_obj.close()
Expand Down Expand Up @@ -300,7 +298,7 @@ def _size(self, queue: str) -> int:

return props.total_message_count

def _purge(self, queue):
def _purge(self, queue) -> int:
"""Delete all current messages in a queue."""
# Azure doesn't provide a purge api yet
n = 0
Expand Down Expand Up @@ -339,24 +337,19 @@ def close(self) -> None:
if self.connection is not None:
self.connection.close_channel(self)

@property
@cached_property
def queue_service(self) -> ServiceBusClient:
if self._queue_service is None:
self._queue_service = ServiceBusClient.from_connection_string(
self._connection_string,
retry_total=self.retry_total,
retry_backoff_factor=self.retry_backoff_factor,
retry_backoff_max=self.retry_backoff_max
)
return self._queue_service
return ServiceBusClient.from_connection_string(
self._connection_string,
retry_total=self.retry_total,
retry_backoff_factor=self.retry_backoff_factor,
retry_backoff_max=self.retry_backoff_max
)

@property
@cached_property
def queue_mgmt_service(self) -> ServiceBusAdministrationClient:
if self._queue_mgmt_service is None:
self._queue_mgmt_service = \
ServiceBusAdministrationClient.from_connection_string(
return ServiceBusAdministrationClient.from_connection_string(
self._connection_string)
return self._queue_mgmt_service

@property
def conninfo(self):
Expand Down
23 changes: 21 additions & 2 deletions t/unit/transport/test_azureservicebus.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,33 @@ def mock_asb_management(mock_asb):
)


def sbac_class_patch():
with patch('kombu.transport.azureservicebus.ServiceBusAdministrationClient') as sbac: # noqa
yield sbac


def sbc_class_patch():
with patch('kombu.transport.azureservicebus.ServiceBusClient') as sbc: # noqa
yield sbc


@pytest.fixture(autouse=True)
def mock_clients(
sbc_class_patch,
sbac_class_patch,
mock_asb,
mock_asb_management
):
sbc_class_patch.from_connection_string.return_value = mock_asb
sbac_class_patch.from_connection_string.return_value = mock_asb_management


@pytest.fixture
def mock_queue(mock_asb, mock_asb_management, random_queue) -> MockQueue:
exchange = Exchange('test_servicebus', type='direct')
queue = Queue(random_queue, exchange, random_queue)
conn = Connection(URL_CREDS, transport=azureservicebus.Transport)
channel = conn.channel()
channel._queue_service = mock_asb
channel._queue_mgmt_service = mock_asb_management

queue(channel).declare()
producer = messaging.Producer(channel, exchange, routing_key=random_queue)
Expand Down

0 comments on commit 646bbdc

Please sign in to comment.