Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add type annotations to kombu/transport/azurestoragequeues #1632

Merged
merged 1 commit into from
Jan 3, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions kombu/transport/azurestoragequeues.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

import string
from queue import Empty
from typing import Any, Optional

from azure.core.exceptions import ResourceExistsError

Expand Down Expand Up @@ -85,11 +86,11 @@
class Channel(virtual.Channel):
"""Azure Storage Queues channel."""

domain_format = 'kombu%(vhost)s'
_queue_service = None
_queue_name_cache = {}
no_ack = True
_noack_queues = set()
domain_format: str = 'kombu%(vhost)s'
_queue_service: Optional[QueueServiceClient] = None
_queue_name_cache: dict[Any, Any] = {}
no_ack: bool = True
_noack_queues: set[Any] = set()

def __init__(self, *args, **kwargs):
if QueueServiceClient is None:
Expand All @@ -112,7 +113,7 @@ def basic_consume(self, queue, no_ack, *args, **kwargs):
return super().basic_consume(queue, no_ack,
*args, **kwargs)

def entity_name(self, name, table=CHARS_REPLACE_TABLE):
def entity_name(self, name, table=CHARS_REPLACE_TABLE) -> str:
"""Format AMQP queue name into a valid Azure Storage Queue name."""
return str(safe_str(name)).translate(table)

Expand Down Expand Up @@ -173,7 +174,7 @@ def _purge(self, queue):
return n

@property
def queue_service(self):
def queue_service(self) -> QueueServiceClient:
if self._queue_service is None:
self._queue_service = QueueServiceClient(
account_url=self._url, credential=self._credential
Expand All @@ -190,7 +191,7 @@ def transport_options(self):
return self.connection.client.transport_options

@cached_property
def queue_name_prefix(self):
def queue_name_prefix(self) -> str:
return self.transport_options.get('queue_name_prefix', '')


Expand All @@ -199,9 +200,9 @@ class Transport(virtual.Transport):

Channel = Channel

polling_interval = 1
default_port = None
can_parse_url = True
polling_interval: int = 1
default_port: Optional[int] = None
can_parse_url: bool = True

@staticmethod
def parse_uri(uri: str) -> tuple[str | dict, str]:
Expand Down Expand Up @@ -253,9 +254,10 @@ def parse_uri(uri: str) -> tuple[str | dict, str]:
return credential, url

@classmethod
def as_uri(cls, uri: str, include_password=False, mask='**') -> str:
def as_uri(
cls, uri: str, include_password: bool = False, mask: str = "**"
) -> str:
credential, url = cls.parse_uri(uri)
return 'azurestoragequeues://{}@{}'.format(
credential if include_password else mask,
url
return "azurestoragequeues://{}@{}".format(
credential if include_password else mask, url
)