Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Object mapping support #17080

Merged
14 commits merged into from
Mar 6, 2021
10 changes: 5 additions & 5 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@

**New Features**

* Updated the following methods so that lists and single instances of dict representations are accepted for corresponding strongly-typed object arguments (PR #14807, thanks @bradleydamato):
- `update_queue`, `update_topic`, `update_subscription`, and `update_rule` on `ServiceBusAdministrationClient` accept dict representations of `QueueProperties`, `TopicProperties`, `SubscriptionProperties`, and `RuleProperties`, respectively.
- `send_messages` and `schedule_messages` on both sync and async versions of `ServiceBusSender` accept a list of or single instance of dict representations of `ServiceBusMessage`.
- `add_message` on `ServiceBusMessageBatch` now accepts a dict representation of `ServiceBusMessage`.
- Note: This is ongoing work and is the first step in supporting the above as respresentation of type `typing.Mapping`.
* Updated the following methods so that lists and single instances of Mapping representations are accepted for corresponding strongly-typed object arguments (PR #14807, thanks @bradleydamato):
- `update_queue`, `update_topic`, `update_subscription`, and `update_rule` on `ServiceBusAdministrationClient` accept Mapping representations of `QueueProperties`, `TopicProperties`, `SubscriptionProperties`, and `RuleProperties`, respectively.
- `send_messages` and `schedule_messages` on both sync and async versions of `ServiceBusSender` accept a list of or single instance of Mapping representations of `ServiceBusMessage`.
- `add_message` on `ServiceBusMessageBatch` now accepts a Mapping representation of `ServiceBusMessage`.

**BugFixes**

* Operations failing due to `uamqp.errors.LinkForceDetach` caused by no activity on the connection for 10 minutes will now be retried internally except for the session receiver case.
* `uamqp.errors.AMQPConnectionError` errors with condition code `amqp:unknown-error` are now categorized into `ServiceBusConnectionError` instead of the general `ServiceBusError`.
* The `update_*` methods on `ServiceBusManagementClient` will now raise a `TypeError` rather than an `AttributeError` in the case of unsupported input type.

## 7.0.1 (2021-01-12)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import uuid
import logging
import copy
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Any
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Any, Mapping, cast

import six

Expand Down Expand Up @@ -537,15 +537,8 @@ def __len__(self):

def _from_list(self, messages, parent_span=None):
# type: (Iterable[ServiceBusMessage], AbstractSpan) -> None
for each in messages:
if not isinstance(each, (ServiceBusMessage, dict)):
raise TypeError(
"Only ServiceBusMessage or an iterable object containing ServiceBusMessage "
"objects are accepted. Received instead: {}".format(
each.__class__.__name__
)
)
self._add(each, parent_span)
for message in messages:
self._add(message, parent_span)

@property
def max_size_in_bytes(self):
Expand All @@ -566,7 +559,7 @@ def size_in_bytes(self):
return self._size

def add_message(self, message):
# type: (ServiceBusMessage) -> None
# type: (Union[ServiceBusMessage, Mapping[str, Any]]) -> None
"""Try to add a single Message to the batch.

The total size of an added message is the sum of its body, properties, etc.
Expand All @@ -581,12 +574,12 @@ def add_message(self, message):

return self._add(message)

def _add(self, message, parent_span=None):
# type: (ServiceBusMessage, AbstractSpan) -> None
def _add(self, add_message, parent_span=None):
# type: (Union[ServiceBusMessage, Mapping[str, Any]], AbstractSpan) -> None
"""Actual add implementation. The shim exists to hide the internal parameters such as parent_span."""

message = create_messages_from_dicts_if_needed(message, ServiceBusMessage) # type: ignore
message = create_messages_from_dicts_if_needed(add_message, ServiceBusMessage)
message = transform_messages_to_sendable_if_needed(message)
message = cast(ServiceBusMessage, message)
trace_message(
message, parent_span
) # parent_span is e.g. if built as part of a send operation.
Expand Down
53 changes: 31 additions & 22 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
Optional,
Type,
TYPE_CHECKING,
Union
Union,
cast
)
from contextlib import contextmanager
from msrest.serialization import UTC
Expand Down Expand Up @@ -59,19 +60,10 @@
from .receiver_mixins import ReceiverMixin
from .._servicebus_session import BaseSession

# pylint: disable=unused-import, ungrouped-imports
DictMessageType = Union[
Mapping,
MessagesType = Union[
Mapping[str, Any],
ServiceBusMessage,
List[Mapping[str, Any]],
List[ServiceBusMessage],
ServiceBusMessageBatch
]

DictMessageReturnType = Union[
ServiceBusMessage,
List[ServiceBusMessage],
ServiceBusMessageBatch
List[Union[Mapping[str, Any], ServiceBusMessage]]
]

_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -222,20 +214,37 @@ def transform_messages_to_sendable_if_needed(messages):
except AttributeError:
return messages


def _single_message_from_dict(message, message_type):
# type: (Union[ServiceBusMessage, Mapping[str, Any]], Type[ServiceBusMessage]) -> ServiceBusMessage
if isinstance(message, message_type):
return message
try:
return message_type(**cast(Mapping[str, Any], message))
except TypeError:
raise TypeError(
"Only ServiceBusMessage instances or Mappings representing messages are supported. "
"Received instead: {}".format(
message.__class__.__name__
)
)


def create_messages_from_dicts_if_needed(messages, message_type):
# type: (DictMessageType, type) -> DictMessageReturnType
# type: (MessagesType, Type[ServiceBusMessage]) -> Union[ServiceBusMessage, List[ServiceBusMessage]]
"""
This method is used to convert dict representations
of messages to a list of ServiceBusMessage objects or ServiceBusBatchMessage.
:param DictMessageType messages: A list or single instance of messages of type ServiceBusMessages or
dict representations of type ServiceBusMessage. Also accepts ServiceBusBatchMessage.
:rtype: DictMessageReturnType
This method is used to convert dict representations of one or more messages to
one or more ServiceBusMessage objects.

:param Messages messages: A list or single instance of messages of type ServiceBusMessage or
dict representations of type ServiceBusMessage.
:param Type[ServiceBusMessage] message_type: The class type to return the messages as.
:rtype: Union[ServiceBusMessage, List[ServiceBusMessage]]
"""
if isinstance(messages, list):
return [(message_type(**message) if isinstance(message, dict) else message) for message in messages]
return [_single_message_from_dict(m, message_type) for m in messages]
return _single_message_from_dict(messages, message_type)

return_messages = message_type(**messages) if isinstance(messages, dict) else messages
return return_messages

def strip_protocol_from_uri(uri):
# type: (str) -> str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import time
import uuid
from typing import Any, TYPE_CHECKING, Union, List, Optional
from typing import Any, TYPE_CHECKING, Union, List, Optional, Mapping, cast

import uamqp
from uamqp import SendClient, types
Expand Down Expand Up @@ -42,6 +42,16 @@
import datetime
from azure.core.credentials import TokenCredential

MessageTypes = Union[
Mapping[str, Any],
ServiceBusMessage,
List[Union[Mapping[str, Any], ServiceBusMessage]]
]
MessageObjTypes = Union[
ServiceBusMessage,
ServiceBusMessageBatch,
List[ServiceBusMessage]]

_LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -248,7 +258,7 @@ def _send(self, message, timeout=None, last_exception=None):
self._set_msg_timeout(default_timeout, None)

def schedule_messages(self, messages, schedule_time_utc, **kwargs):
# type: (Union[ServiceBusMessage, List[ServiceBusMessage]], datetime.datetime, Any) -> List[int]
# type: (MessageTypes, datetime.datetime, Any) -> List[int]
"""Send Message or multiple Messages to be enqueued at a specific time.
Returns a list of the sequence numbers of the enqueued messages.

Expand All @@ -272,21 +282,21 @@ def schedule_messages(self, messages, schedule_time_utc, **kwargs):
# pylint: disable=protected-access

self._check_live()
messages = create_messages_from_dicts_if_needed(messages, ServiceBusMessage) # type: ignore
obj_messages = create_messages_from_dicts_if_needed(messages, ServiceBusMessage)
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")

with send_trace_context_manager(span_name=SPAN_NAME_SCHEDULE) as send_span:
if isinstance(messages, ServiceBusMessage):
if isinstance(obj_messages, ServiceBusMessage):
request_body = self._build_schedule_request(
schedule_time_utc, send_span, messages
schedule_time_utc, send_span, obj_messages
)
else:
if len(messages) == 0:
if len(obj_messages) == 0:
return [] # No-op on empty list.
request_body = self._build_schedule_request(
schedule_time_utc, send_span, *messages
schedule_time_utc, send_span, *obj_messages
)
if send_span:
self._add_span_request_attributes(send_span)
Expand Down Expand Up @@ -338,7 +348,7 @@ def cancel_scheduled_messages(self, sequence_numbers, **kwargs):
)

def send_messages(self, message, **kwargs):
# type: (Union[ServiceBusMessage, ServiceBusMessageBatch, List[ServiceBusMessage]], Any) -> None
# type: (Union[MessageTypes, ServiceBusMessageBatch], Any) -> None
"""Sends message and blocks until acknowledgement is received or operation times out.

If a list of messages was provided, attempts to send them as a single batch, throwing a
Expand Down Expand Up @@ -368,48 +378,44 @@ def send_messages(self, message, **kwargs):
:caption: Send message.

"""

self._check_live()
message = create_messages_from_dicts_if_needed(message, ServiceBusMessage)
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")

with send_trace_context_manager() as send_span:
# Ensure message is sendable (not a ReceivedMessage), and if needed (a list) is batched. Adds tracing.
message = transform_messages_to_sendable_if_needed(message)
try:
for each_message in iter(message): # type: ignore # Ignore type (and below) as it will except if wrong.
add_link_to_send(each_message, send_span)
batch = self.create_message_batch()
batch._from_list(message, send_span) # type: ignore # pylint: disable=protected-access
message = batch
except TypeError: # Message was not a list or generator. Do needed tracing.
if isinstance(message, ServiceBusMessageBatch):
for (
batch_message
) in message.message._body_gen: # pylint: disable=protected-access
add_link_to_send(batch_message, send_span)
elif isinstance(message, ServiceBusMessage):
trace_message(message, send_span)
add_link_to_send(message, send_span)
if isinstance(message, ServiceBusMessageBatch):
for (
batch_message
) in message.message._body_gen: # pylint: disable=protected-access
add_link_to_send(batch_message, send_span)
obj_message = message # type: MessageObjTypes
else:
obj_message = create_messages_from_dicts_if_needed(message, ServiceBusMessage)
# Ensure message is sendable (not a ReceivedMessage), and if needed (a list) is batched. Adds tracing.
obj_message = transform_messages_to_sendable_if_needed(obj_message)
try:
# Ignore type (and below) as it will except if wrong.
for each_message in iter(obj_message): # type: ignore
add_link_to_send(each_message, send_span)
batch = self.create_message_batch()
batch._from_list(obj_message, send_span) # type: ignore # pylint: disable=protected-access
obj_message = batch
except TypeError: # Message was not a list or generator. Do needed tracing.
trace_message(cast(ServiceBusMessage, obj_message), send_span)
add_link_to_send(obj_message, send_span)

if (
isinstance(message, ServiceBusMessageBatch) and len(message) == 0
isinstance(obj_message, ServiceBusMessageBatch) and len(obj_message) == 0
): # pylint: disable=len-as-condition
return # Short circuit noop if an empty list or batch is provided.
if not isinstance(message, (ServiceBusMessageBatch, ServiceBusMessage)):
raise TypeError(
"Can only send azure.servicebus.<ServiceBusMessageBatch, ServiceBusMessage> "
"or lists of ServiceBusMessage."
)
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved

if send_span:
self._add_span_request_attributes(send_span)

self._do_retryable_operation(
self._send,
message=message,
message=obj_message,
timeout=timeout,
operation_requires_timeout=True,
require_last_exception=True,
Expand Down
Loading