diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index bc6c84cb1e32..f52f1d6d1237 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -11,6 +11,8 @@ import platform import time from typing import Optional, Dict, Tuple +from msrest.serialization import UTC + try: from urlparse import urlparse except ImportError: @@ -18,7 +20,6 @@ from uamqp import authentication, types -from ..exceptions import ServiceBusError from .._version import VERSION from .constants import ( JWT_TOKEN_SCOPE, @@ -26,85 +27,73 @@ TOKEN_TYPE_SASTOKEN, DEAD_LETTER_QUEUE_SUFFIX, TRANSFER_DEAD_LETTER_QUEUE_SUFFIX, - USER_AGENT_PREFIX + USER_AGENT_PREFIX, ) _log = logging.getLogger(__name__) -class UTC(datetime.tzinfo): - """Time Zone info for handling UTC""" - - def utcoffset(self, dt): - """UTF offset for UTC is 0.""" - return datetime.timedelta(0) - - def tzname(self, dt): - """Timestamp representation.""" - return "Z" - - def dst(self, dt): - """No daylight saving for UTC.""" - return datetime.timedelta(hours=1) - - -try: - from datetime import timezone # pylint: disable=ungrouped-imports - - TZ_UTC = timezone.utc # type: ignore -except ImportError: - TZ_UTC = UTC() # type: ignore - - def utc_from_timestamp(timestamp): - return datetime.datetime.fromtimestamp(timestamp, tz=TZ_UTC) + return datetime.datetime.fromtimestamp(timestamp, tz=UTC()) def utc_now(): - return datetime.datetime.now(tz=TZ_UTC) + return datetime.datetime.now(UTC()) # This parse_conn_str is used for mgmt, the other in base_handler for handlers. Should be unified. def parse_conn_str(conn_str): # type: (str) -> Tuple[str, Optional[str], Optional[str], str, Optional[str], Optional[int]] - endpoint = '' - shared_access_key_name = None # type: Optional[str] - shared_access_key = None # type: Optional[str] - entity_path = '' + endpoint = "" + shared_access_key_name = None # type: Optional[str] + shared_access_key = None # type: Optional[str] + entity_path = "" shared_access_signature = None # type: Optional[str] - shared_access_signature_expiry = None # type: Optional[int] - for element in conn_str.split(';'): - key, _, value = element.partition('=') - if key.lower() == 'endpoint': - endpoint = value.rstrip('/') - elif key.lower() == 'sharedaccesskeyname': + shared_access_signature_expiry = None # type: Optional[int] + for element in conn_str.split(";"): + key, _, value = element.partition("=") + if key.lower() == "endpoint": + endpoint = value.rstrip("/") + elif key.lower() == "sharedaccesskeyname": shared_access_key_name = value - elif key.lower() == 'sharedaccesskey': + elif key.lower() == "sharedaccesskey": shared_access_key = value - elif key.lower() == 'entitypath': + elif key.lower() == "entitypath": entity_path = value elif key.lower() == "sharedaccesssignature": shared_access_signature = value try: # Expiry can be stored in the "se=" clause of the token. ('&'-separated key-value pairs) # type: ignore - shared_access_signature_expiry = int(shared_access_signature.split('se=')[1].split('&')[0]) - except (IndexError, TypeError, ValueError): # Fallback since technically expiry is optional. + shared_access_signature_expiry = int( + shared_access_signature.split("se=")[1].split("&")[0] + ) + except ( + IndexError, + TypeError, + ValueError, + ): # Fallback since technically expiry is optional. # An arbitrary, absurdly large number, since you can't renew. shared_access_signature_expiry = int(time.time() * 2) - if not (all((endpoint, shared_access_key_name, shared_access_key)) or all((endpoint, shared_access_signature))) \ - or all((shared_access_key_name, shared_access_signature)): # this latter clause since we don't accept both + if not ( + all((endpoint, shared_access_key_name, shared_access_key)) + or all((endpoint, shared_access_signature)) + ) or all( + (shared_access_key_name, shared_access_signature) + ): # this latter clause since we don't accept both raise ValueError( "Invalid connection string. Should be in the format: " "Endpoint=sb:///;SharedAccessKeyName=;SharedAccessKey=" "\nWith alternate option of providing SharedAccessSignature instead of SharedAccessKeyName and Key" ) - return (endpoint, - str(shared_access_key_name) if shared_access_key_name else None, - str(shared_access_key) if shared_access_key else None, - entity_path, - str(shared_access_signature) if shared_access_signature else None, - shared_access_signature_expiry) + return ( + endpoint, + str(shared_access_key_name) if shared_access_key_name else None, + str(shared_access_key) if shared_access_key else None, + entity_path, + str(shared_access_signature) if shared_access_signature else None, + shared_access_signature_expiry, + ) def build_uri(address, entity): @@ -123,7 +112,8 @@ def create_properties(user_agent=None): Format the properties with which to instantiate the connection. This acts like a user agent over HTTP. - :param str user_agent: If specified, this will be added in front of the built-in user agent string. + :param str user_agent: If specified, + this will be added in front of the built-in user agent string. :rtype: dict """ @@ -189,15 +179,18 @@ def create_authentication(client): def generate_dead_letter_entity_name( - queue_name=None, - topic_name=None, - subscription_name=None, - transfer_deadletter=False + queue_name=None, topic_name=None, subscription_name=None, transfer_deadletter=False ): - entity_name = queue_name if queue_name else (topic_name + "/Subscriptions/" + subscription_name) + entity_name = ( + queue_name + if queue_name + else (topic_name + "/Subscriptions/" + subscription_name) + ) entity_name = "{}{}".format( entity_name, - TRANSFER_DEAD_LETTER_QUEUE_SUFFIX if transfer_deadletter else DEAD_LETTER_QUEUE_SUFFIX + TRANSFER_DEAD_LETTER_QUEUE_SUFFIX + if transfer_deadletter + else DEAD_LETTER_QUEUE_SUFFIX, ) return entity_name @@ -205,7 +198,8 @@ def generate_dead_letter_entity_name( def transform_messages_to_sendable_if_needed(messages): """ - This method is to convert single/multiple received messages to sendable messages to enable message resending. + This method is to convert single/multiple received messages + to sendable messages to enable message resending. """ # pylint: disable=protected-access try: