Skip to content

Commit

Permalink
[ServiceBus] Expose internal amqp message properties via AMQPMessage …
Browse files Browse the repository at this point in the history
…wrapper object on Message (#13564)

* Expose internal amqp message properties via AMQPMessage wrapper object on Message.  Add test, changelog notes and docstring.  (Note: Cannot rename old message as uamqp relies on the internal property name.  Should likely be adapted.)

Co-authored-by: Adam Ling (MSFT) <adam_ling@outlook.com>
  • Loading branch information
KieranBrantnerMagee and yunhaoling authored Sep 8, 2020
1 parent bec61d9 commit a0d8c2f
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 0 deletions.
1 change: 1 addition & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

**New Features**
* Messages can now be sent twice in succession.
* Internal AMQP message properties (header, footer, annotations, properties, etc) are now exposed via `Message.amqp_message`

**Breaking Changes**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class Message(object): # pylint: disable=too-many-public-methods,too-many-insta
:keyword str reply_to_session_id: The session identifier augmenting the `reply_to` address.
:keyword str encoding: The encoding for string data. Default is UTF-8.
:ivar AMQPMessage amqp_message: Advanced use only. The internal AMQP message payload that is sent or received.
.. admonition:: Example:
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
Expand All @@ -105,6 +107,7 @@ def __init__(self, body, **kwargs):
self._amqp_header = uamqp.message.MessageHeader()

if 'message' in kwargs:
# Note: This cannot be renamed until UAMQP no longer relies on this specific name.
self.message = kwargs['message']
self._amqp_properties = self.message.properties
self._amqp_header = self.message.header
Expand All @@ -123,6 +126,8 @@ def __init__(self, body, **kwargs):
self.time_to_live = kwargs.pop("time_to_live", None)
self.partition_key = kwargs.pop("partition_key", None)
self.via_partition_key = kwargs.pop("via_partition_key", None)
# If message is the full message, amqp_message is the "public facing interface" for what we expose.
self.amqp_message = AMQPMessage(self.message)

def __str__(self):
return str(self.message)
Expand Down Expand Up @@ -1069,3 +1074,77 @@ def renew_lock(self):

expiry = self._receiver._renew_locks(token) # type: ignore
self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0)


class AMQPMessage(object):
"""
The internal AMQP message that this ServiceBusMessage represents.
:param properties: Properties to add to the message.
:type properties: ~uamqp.message.MessageProperties
:param application_properties: Service specific application properties.
:type application_properties: dict
:param annotations: Service specific message annotations. Keys in the dictionary
must be `uamqp.types.AMQPSymbol` or `uamqp.types.AMQPuLong`.
:type annotations: dict
:param delivery_annotations: Delivery-specific non-standard properties at the head of the message.
Delivery annotations convey information from the sending peer to the receiving peer.
Keys in the dictionary must be `uamqp.types.AMQPSymbol` or `uamqp.types.AMQPuLong`.
:type delivery_annotations: dict
:param header: The message header.
:type header: ~uamqp.message.MessageHeader
:param footer: The message footer.
:type footer: dict
"""
def __init__(self, message):
# type: (uamqp.Message) -> None
self._message = message

@property
def properties(self):
return self._message.properties

@properties.setter
def properties(self, value):
self._message.properties = value

@property
def application_properties(self):
return self._message.application_properties

@application_properties.setter
def application_properties(self, value):
self._message.application_properties = value

@property
def annotations(self):
return self._message.annotations

@annotations.setter
def annotations(self, value):
self._message.annotations = value

@property
def delivery_annotations(self):
return self._message.delivery_annotations

@delivery_annotations.setter
def delivery_annotations(self, value):
self._message.delivery_annotations = value

@property
def header(self):
return self._message.header

@header.setter
def header(self, value):
self._message.header = value

@property
def footer(self):
return self._message.footer

@footer.setter
def footer(self, value):
self._message.footer = value
39 changes: 39 additions & 0 deletions sdk/servicebus/azure-servicebus/tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -1857,3 +1857,42 @@ def test_queue_receiver_invalid_mode(self, servicebus_namespace_connection_strin
max_wait_time="oij") as receiver:

assert receiver

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
def test_message_inner_amqp_properties(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):

message = Message("body")

with pytest.raises(TypeError):
message.amqp_message.properties = {"properties":1}
message.amqp_message.properties.subject = "subject"

message.amqp_message.application_properties = {b"application_properties":1}

message.amqp_message.annotations = {b"annotations":2}
message.amqp_message.delivery_annotations = {b"delivery_annotations":3}

with pytest.raises(TypeError):
message.amqp_message.header = {"header":4}
message.amqp_message.header.priority = 5

message.amqp_message.footer = {b"footer":6}

with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:

with sb_client.get_queue_sender(servicebus_queue.name) as sender:
sender.send_messages(message)
with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:
message = receiver.receive_messages()[0]
assert message.amqp_message.properties.subject == b"subject"
assert message.amqp_message.application_properties[b"application_properties"] == 1
assert message.amqp_message.annotations[b"annotations"] == 2
# delivery_annotations and footer disabled pending uamqp bug https://github.com/Azure/azure-uamqp-python/issues/169
#assert message.amqp_message.delivery_annotations[b"delivery_annotations"] == 3
assert message.amqp_message.header.priority == 5
#assert message.amqp_message.footer[b"footer"] == 6

0 comments on commit a0d8c2f

Please sign in to comment.