Skip to content

Commit

Permalink
[@Azure/event-hubs] Adding disableDeserialization option when subsc…
Browse files Browse the repository at this point in the history
…ribing (#18173)
  • Loading branch information
marcodalessandro authored Oct 28, 2021
1 parent f8a3be7 commit 8fcdbfe
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 153 deletions.
3 changes: 2 additions & 1 deletion sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Release History

## 5.6.1 (Unreleased)
## 5.7.0 (Unreleased)

### Features Added
- Added `skipParsingBodyAsJson` optional parameter to `EventHubConsumerClient.subscribe` method. When set to `true` it will disable the client from running `JSON.parse()` on the message body when receiving the message. Not applicable if the message was sent with AMQP body type `value` or `sequence`.

### Breaking Changes

Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@azure/event-hubs",
"sdk-type": "client",
"version": "5.6.1",
"version": "5.7.0",
"description": "Azure Event Hubs SDK for JS.",
"author": "Microsoft Corporation",
"license": "MIT",
Expand Down
1 change: 1 addition & 0 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ export interface SubscribeOptions {
maxBatchSize?: number;
maxWaitTimeInSeconds?: number;
ownerLevel?: number;
skipParsingBodyAsJson?: boolean;
startPosition?: EventPosition | {
[partitionId: string]: EventPosition;
};
Expand Down
13 changes: 10 additions & 3 deletions sdk/eventhub/event-hubs/src/dataTransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,29 @@ export const defaultDataTransformer = {
* indicating which part of the AMQP message the body was decoded from.
*
* @param body - The AMQP message body as received from rhea.
* @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body when body type is `content`.
* @returns The decoded/raw body and the body type.
*/
decode(body: unknown | RheaAmqpSection): { body: unknown; bodyType: BodyTypes } {
decode(
body: unknown | RheaAmqpSection,
skipParsingBodyAsJson: boolean
): { body: unknown; bodyType: BodyTypes } {
try {
if (isRheaAmqpSection(body)) {
switch (body.typecode) {
case dataSectionTypeCode:
return { body: tryToJsonDecode(body.content), bodyType: "data" };
return {
body: skipParsingBodyAsJson ? body.content : tryToJsonDecode(body.content),
bodyType: "data"
};
case sequenceSectionTypeCode:
return { body: body.content, bodyType: "sequence" };
case valueSectionTypeCode:
return { body: body.content, bodyType: "value" };
}
} else {
if (isBuffer(body)) {
return { body: tryToJsonDecode(body), bodyType: "data" };
return { body: skipParsingBodyAsJson ? body : tryToJsonDecode(body), bodyType: "data" };
}

return { body, bodyType: "value" };
Expand Down
8 changes: 6 additions & 2 deletions sdk/eventhub/event-hubs/src/eventData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,15 @@ const messagePropertiesMap = {
/**
* Converts the AMQP message to an EventData.
* @param msg - The AMQP message that needs to be converted to EventData.
* @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body when body type is `content`.
* @hidden
*/
export function fromRheaMessage(msg: RheaMessage): EventDataInternal {
export function fromRheaMessage(
msg: RheaMessage,
skipParsingBodyAsJson: boolean
): EventDataInternal {
const rawMessage = AmqpAnnotatedMessage.fromRheaMessage(msg);
const { body, bodyType } = defaultDataTransformer.decode(msg.body);
const { body, bodyType } = defaultDataTransformer.decode(msg.body, skipParsingBodyAsJson);
rawMessage.bodyType = bodyType;

const data: EventDataInternal = {
Expand Down
6 changes: 6 additions & 0 deletions sdk/eventhub/event-hubs/src/eventHubConsumerClientModels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ export interface SubscribeOptions {
* Options for configuring tracing.
*/
tracingOptions?: OperationTracingOptions;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
}

/**
Expand Down
6 changes: 4 additions & 2 deletions sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,10 @@ export class EventHubReceiver extends LinkEntity {
if (!context.message) {
return;
}

const data: EventDataInternal = fromRheaMessage(context.message);
const data: EventDataInternal = fromRheaMessage(
context.message,
!!this.options.skipParsingBodyAsJson
);
const rawMessage = data.getRawAmqpMessage();
const receivedEventData: ReceivedEventData = {
body: data.body,
Expand Down
12 changes: 11 additions & 1 deletion sdk/eventhub/event-hubs/src/models/private.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,18 @@ export interface CommonEventProcessorOptions
* consumers to fail if their `ownerLevel` is lower or doesn't exist.
* - `retryOptions`: The retry options used to govern retry attempts when an issue is encountered while receiving events.
* A simple usage can be `{ "maxRetries": 4 }`.
* - `skipParsingBodyAsJson` : Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you prefer to work directly with
* the bytes present in the message body than have the client attempt to parse it.
*
* Example usage:
* ```js
* {
* retryOptions: {
* maxRetries: 4
* },
* trackLastEnqueuedEventProperties: false
* trackLastEnqueuedEventProperties: false,
* skipParsingBodyAsJson: true
* }
* ```
* @internal
Expand Down Expand Up @@ -113,4 +117,10 @@ export interface EventHubConsumerOptions {
* against periodically making requests for partition properties using the Event Hub client.
*/
trackLastEnqueuedEventProperties?: boolean;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
}
3 changes: 2 additions & 1 deletion sdk/eventhub/event-hubs/src/partitionPump.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ export class PartitionPump {
{
ownerLevel: this._processorOptions.ownerLevel,
trackLastEnqueuedEventProperties: this._processorOptions.trackLastEnqueuedEventProperties,
retryOptions: this._processorOptions.retryOptions
retryOptions: this._processorOptions.retryOptions,
skipParsingBodyAsJson: this._processorOptions.skipParsingBodyAsJson
}
);

Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/src/util/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
*/
export const packageJsonInfo = {
name: "@azure/event-hubs",
version: "5.6.1"
version: "5.7.0"
};
26 changes: 16 additions & 10 deletions sdk/eventhub/event-hubs/test/internal/amqp.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,29 @@ testWithServiceTypes(() => {
assert.isFalse(isAmqpAnnotatedMessage({ body: "hello world" }));
assert.isFalse(
isAmqpAnnotatedMessage(
fromRheaMessage({
message_annotations: {
[Constants.enqueuedTime]: Date.now()
fromRheaMessage(
{
message_annotations: {
[Constants.enqueuedTime]: Date.now()
},
body: undefined
},
body: undefined
})
false
)
)
);

assert.isTrue(
isAmqpAnnotatedMessage(
fromRheaMessage({
message_annotations: {
[Constants.enqueuedTime]: Date.now()
fromRheaMessage(
{
message_annotations: {
[Constants.enqueuedTime]: Date.now()
},
body: undefined
},
body: undefined
}).getRawAmqpMessage()
false
).getRawAmqpMessage()
)
);

Expand Down
Loading

0 comments on commit 8fcdbfe

Please sign in to comment.