From a5d2a922689e14c4fbb15a92d4a9cd980942783e Mon Sep 17 00:00:00 2001 From: Harsha Nalluru Date: Wed, 13 Jan 2021 10:17:59 -0800 Subject: [PATCH] [Service Bus] Bug Fix: Sessions - Receive messages beyond 2048 in "receiveAndDelete" mode (#13178) ### Problem Send 10K messages... 10K messages are received just fine for non-sessionful queues. For sessionful queues, receiving stops after receiving 2048 messages with `subscribe` method and leaves the receiver hanging. (Same with `receiveMessages` API if requested for a large number of messages or called in a loop) ### Reason The difference between messages from sessionful and non-sessionful queues is the "settled" flag - it is "true" for non-sessions but "false" for sessions which makes the circular buffer full. ("rhea" checks the "settled" flag to pop the deliveries from the circular buffer) The "settled" flag is updated in rhea mainly by a couple of methods, auto_settle and auto_accept. And this "auto_accept" method can be invoked by setting the "autoaccept" option to true in the receiver options while creating the receiver in the receiveAndDelete mode, which is being set for the non-sessions case, but not for the sessions at the SB SDK level. ### Fix Set the "autoaccept" option to true in the receiver options for the receiveAndDelete mode for sessions to be able to clear the buffer in "rhea". ### Related issues https://github.com/Azure/azure-sdk-for-js/issues/13109 https://github.com/Azure/azure-sdk-for-js/issues/11633 https://github.com/Azure/azure-sdk-for-js/issues/8875 ### TODO - [x] There is common code in `messageReceiver.ts` and `messageSession.ts` that can be refactored. - [x] Test that this PR fixes the receiveBatch scenarios too - [x] Changelog --- sdk/servicebus/service-bus/CHANGELOG.md | 1 + .../service-bus/src/core/messageReceiver.ts | 41 +++++++---------- sdk/servicebus/service-bus/src/core/shared.ts | 45 ++++++++++++++++++- .../service-bus/src/core/streamingReceiver.ts | 4 +- .../service-bus/src/session/messageSession.ts | 45 +++++++++---------- 5 files changed, 85 insertions(+), 51 deletions(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 7df0b09b30dc..8bde7b23ca8a 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -2,6 +2,7 @@ ## 7.0.2 (Unreleased) +- [Bug Fix] Receiving messages from sessions in "receiveAndDelete" mode using the `subscribe()` method stops after receiving 2048 of them and leaves the receiver hanging. The bug has been fixed in [PR 13178](https://github.com/Azure/azure-sdk-for-js/pull/13178). Also fixes the same issue that is seen with the `receiveMessages` API when large number of messages are requested or if the API is called in a loop. ## 7.0.1 (2021-01-11) diff --git a/sdk/servicebus/service-bus/src/core/messageReceiver.ts b/sdk/servicebus/service-bus/src/core/messageReceiver.ts index 359858e64ef0..e7ecb25ab86c 100644 --- a/sdk/servicebus/service-bus/src/core/messageReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/messageReceiver.ts @@ -16,19 +16,15 @@ import { getUniqueName } from "../util/utils"; import { ProcessErrorArgs, ReceiveMode, SubscribeOptions } from "../models"; import { DispositionStatusOptions } from "./managementClient"; import { AbortSignalLike } from "@azure/core-http"; -import { onMessageSettled, DeferredPromiseAndTimer } from "./shared"; +import { + onMessageSettled, + DeferredPromiseAndTimer, + ReceiverHandlers, + createReceiverOptions +} from "./shared"; import { LockRenewer } from "./autoLockRenewer"; import { translateServiceBusError } from "../serviceBusError"; -/** - * @internal - * @hidden - */ -export type ReceiverHandlers = Pick< - ReceiverOptions, - "onMessage" | "onError" | "onClose" | "onSessionError" | "onSessionClose" ->; - /** * @internal * @hidden @@ -174,22 +170,19 @@ export abstract class MessageReceiver extends LinkEntity { useNewName: boolean, handlers: ReceiverHandlers ): ReceiverOptions { - const rcvrOptions: ReceiverOptions = { - name: useNewName ? getUniqueName(this.baseName) : this.name, - autoaccept: this.receiveMode === "receiveAndDelete" ? true : false, - // receiveAndDelete -> first(0), peekLock -> second (1) - rcv_settle_mode: this.receiveMode === "receiveAndDelete" ? 0 : 1, - // receiveAndDelete -> settled (1), peekLock -> unsettled (0) - snd_settle_mode: this.receiveMode === "receiveAndDelete" ? 1 : 0, - source: { + const rcvrOptions: ReceiverOptions = createReceiverOptions( + useNewName ? getUniqueName(this.baseName) : this.name, + this.receiveMode, + { address: this.address }, - credit_window: 0, - onSettled: (context) => { - return onMessageSettled(this.logPrefix, context.delivery, this._deliveryDispositionMap); - }, - ...handlers - }; + { + onSettled: (context: EventContext) => { + return onMessageSettled(this.logPrefix, context.delivery, this._deliveryDispositionMap); + }, + ...handlers + } + ); return rcvrOptions; } diff --git a/sdk/servicebus/service-bus/src/core/shared.ts b/sdk/servicebus/service-bus/src/core/shared.ts index e03bf2729fef..a26f9839031f 100644 --- a/sdk/servicebus/service-bus/src/core/shared.ts +++ b/sdk/servicebus/service-bus/src/core/shared.ts @@ -1,9 +1,19 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { Delivery } from "rhea-promise"; +import { Delivery, ReceiverOptions, Source } from "rhea-promise"; import { translateServiceBusError } from "../serviceBusError"; import { receiverLogger } from "../log"; +import { ReceiveMode } from "../models"; + +/** + * @internal + * @hidden + */ +export type ReceiverHandlers = Pick< + ReceiverOptions, + "onMessage" | "onError" | "onClose" | "onSessionError" | "onSessionClose" | "onSettled" +>; /** * @internal @@ -68,3 +78,36 @@ export function onMessageSettled( } } } + +/** + * Creates the options that need to be specified while creating an AMQP receiver link. + * + * @internal + * @hidden + * @param {string} name + * @param {ReceiveMode} receiveMode + * @param {Source} source + * @param {ReceiverHandlers} handlers + */ +export function createReceiverOptions( + name: string, + receiveMode: ReceiveMode, + source: Source, + handlers: ReceiverHandlers +): ReceiverOptions { + const rcvrOptions: ReceiverOptions = { + name, + // "autoaccept" being true in the "receiveAndDelete" mode sets the "settled" flag to true on the deliveries + // which helps in clearing the circular buffer(size=2048) as it is needed to receive messages after 2048 of them are received. + autoaccept: receiveMode === "receiveAndDelete" ? true : false, + // receiveAndDelete -> first(0), peekLock -> second (1) + rcv_settle_mode: receiveMode === "receiveAndDelete" ? 0 : 1, + // receiveAndDelete -> settled (1), peekLock -> unsettled (0) + snd_settle_mode: receiveMode === "receiveAndDelete" ? 1 : 0, + source, + credit_window: 0, + ...handlers + }; + + return rcvrOptions; +} diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index ed0163788f31..8ef4d8b54702 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -6,8 +6,7 @@ import { OnAmqpEventAsPromise, OnError, OnMessage, - ReceiveOptions, - ReceiverHandlers + ReceiveOptions } from "./messageReceiver"; import { ConnectionContext } from "../connectionContext"; @@ -29,6 +28,7 @@ import { ServiceBusMessageImpl } from "../serviceBusMessage"; import { AbortSignalLike } from "@azure/abort-controller"; import { translateServiceBusError } from "../serviceBusError"; import { abandonMessage, completeMessage } from "../receivers/shared"; +import { ReceiverHandlers } from "./shared"; /** * @internal diff --git a/sdk/servicebus/service-bus/src/session/messageSession.ts b/sdk/servicebus/service-bus/src/session/messageSession.ts index 49c7e6ed76cb..e84db4058520 100644 --- a/sdk/servicebus/service-bus/src/session/messageSession.ts +++ b/sdk/servicebus/service-bus/src/session/messageSession.ts @@ -23,7 +23,7 @@ import { StandardAbortMessage } from "../util/utils"; import { BatchingReceiverLite, MinimalReceiver } from "../core/batchingReceiver"; -import { onMessageSettled, DeferredPromiseAndTimer } from "../core/shared"; +import { onMessageSettled, DeferredPromiseAndTimer, createReceiverOptions } from "../core/shared"; import { AbortError, AbortSignalLike } from "@azure/abort-controller"; import { ReceiverHelper } from "../core/receiverHelper"; import { @@ -321,31 +321,28 @@ export class MessageSession extends LinkEntity { * Creates the options that need to be specified while creating an AMQP receiver link. */ private _createMessageSessionOptions(): ReceiverOptions { - const rcvrOptions: ReceiverOptions = { - name: this.name, - autoaccept: false, - // receiveAndDelete -> first(0), peekLock -> second (1) - rcv_settle_mode: this.receiveMode === "receiveAndDelete" ? 0 : 1, - // receiveAndDelete -> settled (1), peekLock -> unsettled (0) - snd_settle_mode: this.receiveMode === "receiveAndDelete" ? 1 : 0, - source: { + const rcvrOptions: ReceiverOptions = createReceiverOptions( + this.name, + this.receiveMode, + { address: this.address, - filter: {} + filter: { [Constants.sessionFilterName]: this.sessionId } }, - credit_window: 0, - onClose: (context) => - this._onAmqpClose(context).catch(() => { - /* */ - }), - onSessionClose: (context) => - this._onSessionClose(context).catch(() => { - /* */ - }), - onError: this._onAmqpError, - onSessionError: this._onSessionError, - onSettled: this._onSettled - }; - (rcvrOptions.source as any).filter[Constants.sessionFilterName] = this.sessionId; + { + onClose: (context) => + this._onAmqpClose(context).catch(() => { + /* */ + }), + onSessionClose: (context) => + this._onSessionClose(context).catch(() => { + /* */ + }), + onError: this._onAmqpError, + onSessionError: this._onSessionError, + onSettled: this._onSettled + } + ); + return rcvrOptions; }