Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Servicebus] message settlement and pyamqp #24754

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import uuid
import time
import threading
import functools
from datetime import timedelta
from typing import cast, Optional, Tuple, TYPE_CHECKING, Dict, Any, Callable, Union

Expand All @@ -15,9 +16,9 @@
from urllib import quote_plus # type: ignore
from urlparse import urlparse # type: ignore

import uamqp
from uamqp import utils, compat
from uamqp.message import MessageProperties
from ._pyamqp import constants, error as errors, utils as pyamqp_utils
from ._pyamqp.message import Message, Properties
from ._pyamqp.authentication import JWTTokenAuth

from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential
from azure.core.pipeline.policies import RetryMode
Expand All @@ -42,6 +43,7 @@
TRACE_COMPONENT,
TRACE_PEER_ADDRESS_PROPERTY,
TRACE_BUS_DESTINATION_PROPERTY,
JWT_TOKEN_SCOPE
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -146,11 +148,8 @@ def _generate_sas_token(uri, policy, key, expiry=None):
expiry = timedelta(hours=1) # Default to 1 hour.

abs_expiry = int(time.time()) + expiry.seconds
encoded_uri = quote_plus(uri).encode("utf-8") # pylint: disable=no-member
encoded_policy = quote_plus(policy).encode("utf-8") # pylint: disable=no-member
encoded_key = key.encode("utf-8")

token = utils.create_sas_token(encoded_policy, encoded_key, encoded_uri, expiry)
token = pyamqp_utils.generate_sas_token(uri, policy, key, abs_expiry).encode()
return AccessToken(token=token, expires_on=abs_expiry)

def _get_backoff_time(retry_mode, backoff_factor, backoff_max, retried_times):
Expand Down Expand Up @@ -266,7 +265,7 @@ def __init__(self, fully_qualified_namespace, entity_name, credential, **kwargs)
self._container_id = CONTAINER_PREFIX + str(uuid.uuid4())[:8]
self._config = Configuration(**kwargs)
self._running = False
self._handler = None # type: uamqp.AMQPClient
self._handler = None # type: AMQPClient
self._auth_uri = None
self._properties = create_properties(self._config.user_agent)
self._shutdown = threading.Event()
Expand Down Expand Up @@ -457,7 +456,7 @@ def _mgmt_request_response(
timeout=None,
**kwargs
):
# type: (bytes, Any, Callable, bool, Optional[float], Any) -> uamqp.Message
# type: (bytes, Any, Callable, bool, Optional[float], Any) -> Message
"""
Execute an amqp management operation.

Expand Down Expand Up @@ -485,9 +484,9 @@ def _mgmt_request_response(
except AttributeError:
pass

mgmt_msg = uamqp.Message(
mgmt_msg = Message(
body=message,
properties=MessageProperties(
properties=Properties(
reply_to=self._mgmt_target, encoding=self._config.encoding, **kwargs
),
application_properties=application_properties,
Expand All @@ -512,7 +511,7 @@ def _mgmt_request_response_with_retry(
# type: (bytes, Dict[str, Any], Callable, Optional[float], Any) -> Any
return self._do_retryable_operation(
self._mgmt_request_response,
mgmt_operation=mgmt_operation,
mgmt_operation=mgmt_operation.decode(),
message=message,
callback=callback,
timeout=timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
from typing import Optional, Dict, Any
from urllib.parse import urlparse

from uamqp.constants import TransportType, DEFAULT_AMQP_WSS_PORT, DEFAULT_AMQPS_PORT
from .._pyamqp.constants import TransportType
from azure.core.pipeline.policies import RetryMode

DEFAULT_AMQPS_PORT = 1571
DEFAULT_AMQP_WSS_PORT = 443


class Configuration(object): # pylint:disable=too-many-instance-attributes
def __init__(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
# -------------------------------------------------------------------------
from enum import Enum

from uamqp import constants, types
from .._pyamqp import (
types,
constants,
)
from azure.core import CaseInsensitiveEnumMeta

VENDOR = b"com.microsoft"
Expand Down Expand Up @@ -179,8 +182,8 @@ class ServiceBusMessageState(int, Enum):

# To enable extensible string enums for the public facing parameter, and translate to the "real" uamqp constants.
ServiceBusToAMQPReceiveModeMap = {
ServiceBusReceiveMode.PEEK_LOCK: constants.ReceiverSettleMode.PeekLock,
ServiceBusReceiveMode.RECEIVE_AND_DELETE: constants.ReceiverSettleMode.ReceiveAndDelete,
ServiceBusReceiveMode.PEEK_LOCK: constants.ReceiverSettleMode.Second,
ServiceBusReceiveMode.RECEIVE_AND_DELETE: constants.ReceiverSettleMode.First,
}


Expand All @@ -193,11 +196,9 @@ class ServiceBusSubQueue(str, Enum, metaclass=CaseInsensitiveEnumMeta):
TRANSFER_DEAD_LETTER = "transferdeadletter"


ANNOTATION_SYMBOL_PARTITION_KEY = types.AMQPSymbol(_X_OPT_PARTITION_KEY)
ANNOTATION_SYMBOL_VIA_PARTITION_KEY = types.AMQPSymbol(_X_OPT_VIA_PARTITION_KEY)
ANNOTATION_SYMBOL_SCHEDULED_ENQUEUE_TIME = types.AMQPSymbol(
_X_OPT_SCHEDULED_ENQUEUE_TIME
)
ANNOTATION_SYMBOL_PARTITION_KEY = _X_OPT_PARTITION_KEY
ANNOTATION_SYMBOL_VIA_PARTITION_KEY = _X_OPT_VIA_PARTITION_KEY
ANNOTATION_SYMBOL_SCHEDULED_ENQUEUE_TIME = _X_OPT_SCHEDULED_ENQUEUE_TIME

ANNOTATION_SYMBOL_KEY_MAP = {
_X_OPT_PARTITION_KEY: ANNOTATION_SYMBOL_PARTITION_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

import six

import uamqp.errors
import uamqp.message
from .._pyamqp.constants import MAX_FRAME_SIZE_BYTES as MAX_MESSAGE_LENGTH_BYTES
from .._pyamqp.message import Message, BatchMessage

from .constants import (
_BATCH_MESSAGE_OVERHEAD_COST,
Expand Down Expand Up @@ -670,11 +670,11 @@ class ServiceBusMessageBatch(object):

def __init__(self, max_size_in_bytes=None):
# type: (Optional[int]) -> None
self.message = uamqp.BatchMessage(
self.message = BatchMessage(
data=[], multi_messages=False, properties=None
)
self._max_size_in_bytes = (
max_size_in_bytes or uamqp.constants.MAX_MESSAGE_LENGTH_BYTES
max_size_in_bytes or MAX_MESSAGE_LENGTH_BYTES
)
self._size = self.message.gather()[0].get_message_encoded_size()
self._count = 0
Expand Down Expand Up @@ -782,7 +782,7 @@ class ServiceBusReceivedMessage(ServiceBusMessage):
"""

def __init__(self, message, receive_mode=ServiceBusReceiveMode.PEEK_LOCK, **kwargs):
# type: (uamqp.message.Message, Union[ServiceBusReceiveMode, str], Any) -> None
# type: (Message, Union[ServiceBusReceiveMode, str], Any) -> None
super(ServiceBusReceivedMessage, self).__init__(None, message=message) # type: ignore
self._settled = receive_mode == ServiceBusReceiveMode.RECEIVE_AND_DELETE
self._received_timestamp_utc = utc_now()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# -------------------------------------------------------------------------

import logging
import uamqp
from .._pyamqp.message import Message
from .message import ServiceBusReceivedMessage
from ..exceptions import _handle_amqp_mgmt_error
from .constants import ServiceBusReceiveMode, MGMT_RESPONSE_MESSAGE_ERROR_CONDITION
Expand Down Expand Up @@ -64,7 +64,7 @@ def peek_op( # pylint: disable=inconsistent-return-statements
if status_code == 200:
parsed = []
for m in message.get_data()[b"messages"]:
wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b"message"]))
wrapped = Message.decode_from_bytes(bytearray(m[b"message"]))
parsed.append(
ServiceBusReceivedMessage(
wrapped, is_peeked_message=True, receiver=receiver
Expand Down Expand Up @@ -112,7 +112,7 @@ def deferred_message_op( # pylint: disable=inconsistent-return-statements
if status_code == 200:
parsed = []
for m in message.get_data()[b"messages"]:
wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b"message"]))
wrapped = Message.decode_from_bytes(bytearray(m[b"message"]))
parsed.append(
message_type(
wrapped, receive_mode, is_deferred_message=True, receiver=receiver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import functools
from typing import Optional, Callable

from uamqp import Source
from .._pyamqp.endpoints import Source

from .message import ServiceBusReceivedMessage
from .constants import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
except ImportError:
from urllib.parse import urlparse

from uamqp import authentication, types
from .._pyamqp.authentication import JWTTokenAuth
from .._pyamqp import types

from azure.core.settings import settings
from azure.core.tracing import SpanKind, Link
Expand Down Expand Up @@ -110,22 +111,22 @@ def create_properties(user_agent=None):
:rtype: dict
"""
properties = {}
properties[types.AMQPSymbol("product")] = USER_AGENT_PREFIX
properties[types.AMQPSymbol("version")] = VERSION
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can just drop the AMQPSymbol from these.... otherwise I think this is just going to be encoded as strings. I see this in a couple of places, so we should double check the payloads are being encoded as we think they are.

properties["product"] = USER_AGENT_PREFIX
properties["version"] = VERSION
framework = "Python/{}.{}.{}".format(
sys.version_info[0], sys.version_info[1], sys.version_info[2]
)
properties[types.AMQPSymbol("framework")] = framework
properties["framework"] = framework
platform_str = platform.platform()
properties[types.AMQPSymbol("platform")] = platform_str
properties["platform"] = platform_str

final_user_agent = "{}/{} {} ({})".format(
USER_AGENT_PREFIX, VERSION, framework, platform_str
)
if user_agent:
final_user_agent = "{} {}".format(user_agent, final_user_agent)

properties[types.AMQPSymbol("user-agent")] = final_user_agent
properties["user-agent"] = final_user_agent
return properties


Expand Down Expand Up @@ -165,7 +166,7 @@ def create_authentication(client):
except AttributeError:
token_type = TOKEN_TYPE_JWT
if token_type == TOKEN_TYPE_SASTOKEN:
auth = authentication.JWTTokenAuth(
auth =JWTTokenAuth(
client._auth_uri,
client._auth_uri,
functools.partial(client._credential.get_token, client._auth_uri),
Expand All @@ -179,7 +180,7 @@ def create_authentication(client):
)
auth.update_token()
return auth
return authentication.JWTTokenAuth(
return JWTTokenAuth(
client._auth_uri,
client._auth_uri,
functools.partial(client._credential.get_token, JWT_TOKEN_SCOPE),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#-------------------------------------------------------------------------

__version__ = "2.0.0a1"


from ._connection import Connection
from ._transport import SSLTransport

from .client import AMQPClient, ReceiveClient, SendClient
Loading