diff --git a/kombu/transport/azurestoragequeues.py b/kombu/transport/azurestoragequeues.py index ab5900195c..89343b427e 100644 --- a/kombu/transport/azurestoragequeues.py +++ b/kombu/transport/azurestoragequeues.py @@ -35,6 +35,7 @@ import string from queue import Empty +from typing import Any from azure.core.exceptions import ResourceExistsError @@ -59,11 +60,11 @@ class Channel(virtual.Channel): """Azure Storage Queues channel.""" - domain_format = 'kombu%(vhost)s' - _queue_service = None - _queue_name_cache = {} - no_ack = True - _noack_queues = set() + domain_format: str = 'kombu%(vhost)s' + _queue_service: QueueServiceClient | None = None + _queue_name_cache: dict[Any, Any] = {} + no_ack: bool = True + _noack_queues: set[Any] = set() def __init__(self, *args, **kwargs): if QueueServiceClient is None: @@ -86,7 +87,7 @@ def basic_consume(self, queue, no_ack, *args, **kwargs): return super().basic_consume(queue, no_ack, *args, **kwargs) - def entity_name(self, name, table=CHARS_REPLACE_TABLE): + def entity_name(self, name, table=CHARS_REPLACE_TABLE) -> str: """Format AMQP queue name into a valid Azure Storage Queue name.""" return str(safe_str(name)).translate(table) @@ -147,7 +148,7 @@ def _purge(self, queue): return n @property - def queue_service(self): + def queue_service(self) -> QueueServiceClient: if self._queue_service is None: self._queue_service = QueueServiceClient( account_url=self._url, credential=self._credential @@ -164,7 +165,7 @@ def transport_options(self): return self.connection.client.transport_options @cached_property - def queue_name_prefix(self): + def queue_name_prefix(self) -> str: return self.transport_options.get('queue_name_prefix', '') @@ -173,9 +174,9 @@ class Transport(virtual.Transport): Channel = Channel - polling_interval = 1 - default_port = None - can_parse_url = True + polling_interval: int = 1 + default_port: int | None = None + can_parse_url: bool = True @staticmethod def parse_uri(uri: str) -> tuple[str | dict, str]: @@ -210,9 +211,10 @@ def parse_uri(uri: str) -> tuple[str | dict, str]: return credential, url @classmethod - def as_uri(cls, uri: str, include_password=False, mask='**') -> str: + def as_uri( + cls, uri: str, include_password: bool = False, mask: str = "**" + ) -> str: credential, url = cls.parse_uri(uri) - return 'azurestoragequeues://{}@{}'.format( - credential if include_password else mask, - url + return "azurestoragequeues://{}@{}".format( + credential if include_password else mask, url )