Skip to content

Commit

Permalink
[ServiceBus] Object mapping support (#17080)
Browse files Browse the repository at this point in the history
  • Loading branch information
annatisch authored Mar 6, 2021
1 parent f48aca8 commit d7e61a7
Show file tree
Hide file tree
Showing 18 changed files with 273 additions and 158 deletions.
10 changes: 5 additions & 5 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@

**New Features**

* Updated the following methods so that lists and single instances of dict representations are accepted for corresponding strongly-typed object arguments (PR #14807, thanks @bradleydamato):
- `update_queue`, `update_topic`, `update_subscription`, and `update_rule` on `ServiceBusAdministrationClient` accept dict representations of `QueueProperties`, `TopicProperties`, `SubscriptionProperties`, and `RuleProperties`, respectively.
- `send_messages` and `schedule_messages` on both sync and async versions of `ServiceBusSender` accept a list of or single instance of dict representations of `ServiceBusMessage`.
- `add_message` on `ServiceBusMessageBatch` now accepts a dict representation of `ServiceBusMessage`.
- Note: This is ongoing work and is the first step in supporting the above as respresentation of type `typing.Mapping`.
* Updated the following methods so that lists and single instances of Mapping representations are accepted for corresponding strongly-typed object arguments (PR #14807, thanks @bradleydamato):
- `update_queue`, `update_topic`, `update_subscription`, and `update_rule` on `ServiceBusAdministrationClient` accept Mapping representations of `QueueProperties`, `TopicProperties`, `SubscriptionProperties`, and `RuleProperties`, respectively.
- `send_messages` and `schedule_messages` on both sync and async versions of `ServiceBusSender` accept a list of or single instance of Mapping representations of `ServiceBusMessage`.
- `add_message` on `ServiceBusMessageBatch` now accepts a Mapping representation of `ServiceBusMessage`.

**BugFixes**

* Operations failing due to `uamqp.errors.LinkForceDetach` caused by no activity on the connection for 10 minutes will now be retried internally except for the session receiver case.
* `uamqp.errors.AMQPConnectionError` errors with condition code `amqp:unknown-error` are now categorized into `ServiceBusConnectionError` instead of the general `ServiceBusError`.
* The `update_*` methods on `ServiceBusManagementClient` will now raise a `TypeError` rather than an `AttributeError` in the case of unsupported input type.

## 7.0.1 (2021-01-12)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import uuid
import logging
import copy
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Any
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Any, Mapping, cast

import six

Expand Down Expand Up @@ -537,15 +537,8 @@ def __len__(self):

def _from_list(self, messages, parent_span=None):
# type: (Iterable[ServiceBusMessage], AbstractSpan) -> None
for each in messages:
if not isinstance(each, (ServiceBusMessage, dict)):
raise TypeError(
"Only ServiceBusMessage or an iterable object containing ServiceBusMessage "
"objects are accepted. Received instead: {}".format(
each.__class__.__name__
)
)
self._add(each, parent_span)
for message in messages:
self._add(message, parent_span)

@property
def max_size_in_bytes(self):
Expand All @@ -566,7 +559,7 @@ def size_in_bytes(self):
return self._size

def add_message(self, message):
# type: (ServiceBusMessage) -> None
# type: (Union[ServiceBusMessage, Mapping[str, Any]]) -> None
"""Try to add a single Message to the batch.
The total size of an added message is the sum of its body, properties, etc.
Expand All @@ -581,12 +574,12 @@ def add_message(self, message):

return self._add(message)

def _add(self, message, parent_span=None):
# type: (ServiceBusMessage, AbstractSpan) -> None
def _add(self, add_message, parent_span=None):
# type: (Union[ServiceBusMessage, Mapping[str, Any]], AbstractSpan) -> None
"""Actual add implementation. The shim exists to hide the internal parameters such as parent_span."""

message = create_messages_from_dicts_if_needed(message, ServiceBusMessage) # type: ignore
message = create_messages_from_dicts_if_needed(add_message, ServiceBusMessage)
message = transform_messages_to_sendable_if_needed(message)
message = cast(ServiceBusMessage, message)
trace_message(
message, parent_span
) # parent_span is e.g. if built as part of a send operation.
Expand Down
53 changes: 31 additions & 22 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
Optional,
Type,
TYPE_CHECKING,
Union
Union,
cast
)
from contextlib import contextmanager
from msrest.serialization import UTC
Expand Down Expand Up @@ -59,19 +60,10 @@
from .receiver_mixins import ReceiverMixin
from .._servicebus_session import BaseSession

# pylint: disable=unused-import, ungrouped-imports
DictMessageType = Union[
Mapping,
MessagesType = Union[
Mapping[str, Any],
ServiceBusMessage,
List[Mapping[str, Any]],
List[ServiceBusMessage],
ServiceBusMessageBatch
]

DictMessageReturnType = Union[
ServiceBusMessage,
List[ServiceBusMessage],
ServiceBusMessageBatch
List[Union[Mapping[str, Any], ServiceBusMessage]]
]

_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -222,20 +214,37 @@ def transform_messages_to_sendable_if_needed(messages):
except AttributeError:
return messages


def _single_message_from_dict(message, message_type):
# type: (Union[ServiceBusMessage, Mapping[str, Any]], Type[ServiceBusMessage]) -> ServiceBusMessage
if isinstance(message, message_type):
return message
try:
return message_type(**cast(Mapping[str, Any], message))
except TypeError:
raise TypeError(
"Only ServiceBusMessage instances or Mappings representing messages are supported. "
"Received instead: {}".format(
message.__class__.__name__
)
)


def create_messages_from_dicts_if_needed(messages, message_type):
# type: (DictMessageType, type) -> DictMessageReturnType
# type: (MessagesType, Type[ServiceBusMessage]) -> Union[ServiceBusMessage, List[ServiceBusMessage]]
"""
This method is used to convert dict representations
of messages to a list of ServiceBusMessage objects or ServiceBusBatchMessage.
:param DictMessageType messages: A list or single instance of messages of type ServiceBusMessages or
dict representations of type ServiceBusMessage. Also accepts ServiceBusBatchMessage.
:rtype: DictMessageReturnType
This method is used to convert dict representations of one or more messages to
one or more ServiceBusMessage objects.
:param Messages messages: A list or single instance of messages of type ServiceBusMessage or
dict representations of type ServiceBusMessage.
:param Type[ServiceBusMessage] message_type: The class type to return the messages as.
:rtype: Union[ServiceBusMessage, List[ServiceBusMessage]]
"""
if isinstance(messages, list):
return [(message_type(**message) if isinstance(message, dict) else message) for message in messages]
return [_single_message_from_dict(m, message_type) for m in messages]
return _single_message_from_dict(messages, message_type)

return_messages = message_type(**messages) if isinstance(messages, dict) else messages
return return_messages

def strip_protocol_from_uri(uri):
# type: (str) -> str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import time
import uuid
from typing import Any, TYPE_CHECKING, Union, List, Optional
from typing import Any, TYPE_CHECKING, Union, List, Optional, Mapping, cast

import uamqp
from uamqp import SendClient, types
Expand Down Expand Up @@ -42,6 +42,16 @@
import datetime
from azure.core.credentials import TokenCredential

MessageTypes = Union[
Mapping[str, Any],
ServiceBusMessage,
List[Union[Mapping[str, Any], ServiceBusMessage]]
]
MessageObjTypes = Union[
ServiceBusMessage,
ServiceBusMessageBatch,
List[ServiceBusMessage]]

_LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -248,7 +258,7 @@ def _send(self, message, timeout=None, last_exception=None):
self._set_msg_timeout(default_timeout, None)

def schedule_messages(self, messages, schedule_time_utc, **kwargs):
# type: (Union[ServiceBusMessage, List[ServiceBusMessage]], datetime.datetime, Any) -> List[int]
# type: (MessageTypes, datetime.datetime, Any) -> List[int]
"""Send Message or multiple Messages to be enqueued at a specific time.
Returns a list of the sequence numbers of the enqueued messages.
Expand All @@ -272,21 +282,21 @@ def schedule_messages(self, messages, schedule_time_utc, **kwargs):
# pylint: disable=protected-access

self._check_live()
messages = create_messages_from_dicts_if_needed(messages, ServiceBusMessage) # type: ignore
obj_messages = create_messages_from_dicts_if_needed(messages, ServiceBusMessage)
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")

with send_trace_context_manager(span_name=SPAN_NAME_SCHEDULE) as send_span:
if isinstance(messages, ServiceBusMessage):
if isinstance(obj_messages, ServiceBusMessage):
request_body = self._build_schedule_request(
schedule_time_utc, send_span, messages
schedule_time_utc, send_span, obj_messages
)
else:
if len(messages) == 0:
if len(obj_messages) == 0:
return [] # No-op on empty list.
request_body = self._build_schedule_request(
schedule_time_utc, send_span, *messages
schedule_time_utc, send_span, *obj_messages
)
if send_span:
self._add_span_request_attributes(send_span)
Expand Down Expand Up @@ -338,7 +348,7 @@ def cancel_scheduled_messages(self, sequence_numbers, **kwargs):
)

def send_messages(self, message, **kwargs):
# type: (Union[ServiceBusMessage, ServiceBusMessageBatch, List[ServiceBusMessage]], Any) -> None
# type: (Union[MessageTypes, ServiceBusMessageBatch], Any) -> None
"""Sends message and blocks until acknowledgement is received or operation times out.
If a list of messages was provided, attempts to send them as a single batch, throwing a
Expand Down Expand Up @@ -368,48 +378,44 @@ def send_messages(self, message, **kwargs):
:caption: Send message.
"""

self._check_live()
message = create_messages_from_dicts_if_needed(message, ServiceBusMessage)
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")

with send_trace_context_manager() as send_span:
# Ensure message is sendable (not a ReceivedMessage), and if needed (a list) is batched. Adds tracing.
message = transform_messages_to_sendable_if_needed(message)
try:
for each_message in iter(message): # type: ignore # Ignore type (and below) as it will except if wrong.
add_link_to_send(each_message, send_span)
batch = self.create_message_batch()
batch._from_list(message, send_span) # type: ignore # pylint: disable=protected-access
message = batch
except TypeError: # Message was not a list or generator. Do needed tracing.
if isinstance(message, ServiceBusMessageBatch):
for (
batch_message
) in message.message._body_gen: # pylint: disable=protected-access
add_link_to_send(batch_message, send_span)
elif isinstance(message, ServiceBusMessage):
trace_message(message, send_span)
add_link_to_send(message, send_span)
if isinstance(message, ServiceBusMessageBatch):
for (
batch_message
) in message.message._body_gen: # pylint: disable=protected-access
add_link_to_send(batch_message, send_span)
obj_message = message # type: MessageObjTypes
else:
obj_message = create_messages_from_dicts_if_needed(message, ServiceBusMessage)
# Ensure message is sendable (not a ReceivedMessage), and if needed (a list) is batched. Adds tracing.
obj_message = transform_messages_to_sendable_if_needed(obj_message)
try:
# Ignore type (and below) as it will except if wrong.
for each_message in iter(obj_message): # type: ignore
add_link_to_send(each_message, send_span)
batch = self.create_message_batch()
batch._from_list(obj_message, send_span) # type: ignore # pylint: disable=protected-access
obj_message = batch
except TypeError: # Message was not a list or generator. Do needed tracing.
trace_message(cast(ServiceBusMessage, obj_message), send_span)
add_link_to_send(obj_message, send_span)

if (
isinstance(message, ServiceBusMessageBatch) and len(message) == 0
isinstance(obj_message, ServiceBusMessageBatch) and len(obj_message) == 0
): # pylint: disable=len-as-condition
return # Short circuit noop if an empty list or batch is provided.
if not isinstance(message, (ServiceBusMessageBatch, ServiceBusMessage)):
raise TypeError(
"Can only send azure.servicebus.<ServiceBusMessageBatch, ServiceBusMessage> "
"or lists of ServiceBusMessage."
)

if send_span:
self._add_span_request_attributes(send_span)

self._do_retryable_operation(
self._send,
message=message,
message=obj_message,
timeout=timeout,
operation_requires_timeout=True,
require_last_exception=True,
Expand Down
Loading

0 comments on commit d7e61a7

Please sign in to comment.