From a0d8c2ffaf0796251909571d89dbe6b2958a5125 Mon Sep 17 00:00:00 2001 From: KieranBrantnerMagee Date: Tue, 8 Sep 2020 09:34:57 -0700 Subject: [PATCH] [ServiceBus] Expose internal amqp message properties via AMQPMessage wrapper object on Message (#13564) * Expose internal amqp message properties via AMQPMessage wrapper object on Message. Add test, changelog notes and docstring. (Note: Cannot rename old message as uamqp relies on the internal property name. Should likely be adapted.) Co-authored-by: Adam Ling (MSFT) --- sdk/servicebus/azure-servicebus/CHANGELOG.md | 1 + .../azure/servicebus/_common/message.py | 79 +++++++++++++++++++ .../azure-servicebus/tests/test_queues.py | 39 +++++++++ 3 files changed, 119 insertions(+) diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index f1b832deb546..b80ba58c5888 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -4,6 +4,7 @@ **New Features** * Messages can now be sent twice in succession. +* Internal AMQP message properties (header, footer, annotations, properties, etc) are now exposed via `Message.amqp_message` **Breaking Changes** diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 77871922401a..050828f2bb06 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -85,6 +85,8 @@ class Message(object): # pylint: disable=too-many-public-methods,too-many-insta :keyword str reply_to_session_id: The session identifier augmenting the `reply_to` address. :keyword str encoding: The encoding for string data. Default is UTF-8. + :ivar AMQPMessage amqp_message: Advanced use only. The internal AMQP message payload that is sent or received. + .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py @@ -105,6 +107,7 @@ def __init__(self, body, **kwargs): self._amqp_header = uamqp.message.MessageHeader() if 'message' in kwargs: + # Note: This cannot be renamed until UAMQP no longer relies on this specific name. self.message = kwargs['message'] self._amqp_properties = self.message.properties self._amqp_header = self.message.header @@ -123,6 +126,8 @@ def __init__(self, body, **kwargs): self.time_to_live = kwargs.pop("time_to_live", None) self.partition_key = kwargs.pop("partition_key", None) self.via_partition_key = kwargs.pop("via_partition_key", None) + # If message is the full message, amqp_message is the "public facing interface" for what we expose. + self.amqp_message = AMQPMessage(self.message) def __str__(self): return str(self.message) @@ -1069,3 +1074,77 @@ def renew_lock(self): expiry = self._receiver._renew_locks(token) # type: ignore self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0) + + +class AMQPMessage(object): + """ + The internal AMQP message that this ServiceBusMessage represents. + + :param properties: Properties to add to the message. + :type properties: ~uamqp.message.MessageProperties + :param application_properties: Service specific application properties. + :type application_properties: dict + :param annotations: Service specific message annotations. Keys in the dictionary + must be `uamqp.types.AMQPSymbol` or `uamqp.types.AMQPuLong`. + :type annotations: dict + :param delivery_annotations: Delivery-specific non-standard properties at the head of the message. + Delivery annotations convey information from the sending peer to the receiving peer. + Keys in the dictionary must be `uamqp.types.AMQPSymbol` or `uamqp.types.AMQPuLong`. + :type delivery_annotations: dict + :param header: The message header. + :type header: ~uamqp.message.MessageHeader + :param footer: The message footer. + :type footer: dict + + """ + def __init__(self, message): + # type: (uamqp.Message) -> None + self._message = message + + @property + def properties(self): + return self._message.properties + + @properties.setter + def properties(self, value): + self._message.properties = value + + @property + def application_properties(self): + return self._message.application_properties + + @application_properties.setter + def application_properties(self, value): + self._message.application_properties = value + + @property + def annotations(self): + return self._message.annotations + + @annotations.setter + def annotations(self, value): + self._message.annotations = value + + @property + def delivery_annotations(self): + return self._message.delivery_annotations + + @delivery_annotations.setter + def delivery_annotations(self, value): + self._message.delivery_annotations = value + + @property + def header(self): + return self._message.header + + @header.setter + def header(self, value): + self._message.header = value + + @property + def footer(self): + return self._message.footer + + @footer.setter + def footer(self, value): + self._message.footer = value diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index c17e4e9b00a9..64412ddce7c7 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1857,3 +1857,42 @@ def test_queue_receiver_invalid_mode(self, servicebus_namespace_connection_strin max_wait_time="oij") as receiver: assert receiver + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest') + def test_message_inner_amqp_properties(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + + message = Message("body") + + with pytest.raises(TypeError): + message.amqp_message.properties = {"properties":1} + message.amqp_message.properties.subject = "subject" + + message.amqp_message.application_properties = {b"application_properties":1} + + message.amqp_message.annotations = {b"annotations":2} + message.amqp_message.delivery_annotations = {b"delivery_annotations":3} + + with pytest.raises(TypeError): + message.amqp_message.header = {"header":4} + message.amqp_message.header.priority = 5 + + message.amqp_message.footer = {b"footer":6} + + with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False) as sb_client: + + with sb_client.get_queue_sender(servicebus_queue.name) as sender: + sender.send_messages(message) + with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver: + message = receiver.receive_messages()[0] + assert message.amqp_message.properties.subject == b"subject" + assert message.amqp_message.application_properties[b"application_properties"] == 1 + assert message.amqp_message.annotations[b"annotations"] == 2 + # delivery_annotations and footer disabled pending uamqp bug https://github.com/Azure/azure-uamqp-python/issues/169 + #assert message.amqp_message.delivery_annotations[b"delivery_annotations"] == 3 + assert message.amqp_message.header.priority == 5 + #assert message.amqp_message.footer[b"footer"] == 6 \ No newline at end of file