Skip to content

Commit

Permalink
add type annotations to kombu/transport/azurestoragequeues
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonwbarnett committed Dec 18, 2022
1 parent 9ee6fde commit 73bca59
Showing 1 changed file with 17 additions and 15 deletions.
32 changes: 17 additions & 15 deletions kombu/transport/azurestoragequeues.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import string
from queue import Empty
from typing import Any

from azure.core.exceptions import ResourceExistsError

Expand All @@ -59,11 +60,11 @@
class Channel(virtual.Channel):
"""Azure Storage Queues channel."""

domain_format = 'kombu%(vhost)s'
_queue_service = None
_queue_name_cache = {}
no_ack = True
_noack_queues = set()
domain_format: str = 'kombu%(vhost)s'
_queue_service: QueueServiceClient | None = None
_queue_name_cache: dict[Any, Any] = {}
no_ack: bool = True
_noack_queues: set[Any] = set()

def __init__(self, *args, **kwargs):
if QueueServiceClient is None:
Expand All @@ -86,7 +87,7 @@ def basic_consume(self, queue, no_ack, *args, **kwargs):
return super().basic_consume(queue, no_ack,
*args, **kwargs)

def entity_name(self, name, table=CHARS_REPLACE_TABLE):
def entity_name(self, name, table=CHARS_REPLACE_TABLE) -> str:
"""Format AMQP queue name into a valid Azure Storage Queue name."""
return str(safe_str(name)).translate(table)

Expand Down Expand Up @@ -147,7 +148,7 @@ def _purge(self, queue):
return n

@property
def queue_service(self):
def queue_service(self) -> QueueServiceClient:
if self._queue_service is None:
self._queue_service = QueueServiceClient(
account_url=self._url, credential=self._credential
Expand All @@ -164,7 +165,7 @@ def transport_options(self):
return self.connection.client.transport_options

@cached_property
def queue_name_prefix(self):
def queue_name_prefix(self) -> str:
return self.transport_options.get('queue_name_prefix', '')


Expand All @@ -173,9 +174,9 @@ class Transport(virtual.Transport):

Channel = Channel

polling_interval = 1
default_port = None
can_parse_url = True
polling_interval: int = 1
default_port: int | None = None
can_parse_url: bool = True

@staticmethod
def parse_uri(uri: str) -> tuple[str | dict, str]:
Expand Down Expand Up @@ -210,9 +211,10 @@ def parse_uri(uri: str) -> tuple[str | dict, str]:
return credential, url

@classmethod
def as_uri(cls, uri: str, include_password=False, mask='**') -> str:
def as_uri(
cls, uri: str, include_password: bool = False, mask: str = "**"
) -> str:
credential, url = cls.parse_uri(uri)
return 'azurestoragequeues://{}@{}'.format(
credential if include_password else mask,
url
return "azurestoragequeues://{}@{}".format(
credential if include_password else mask, url
)

0 comments on commit 73bca59

Please sign in to comment.