Skip to content

Commit

Permalink
[Service Bus] Bug Fix: Sessions - Receive messages beyond 2048 in "re…
Browse files Browse the repository at this point in the history
…ceiveAndDelete" mode (Azure#13178)

### Problem
Send 10K messages... 10K messages are received just fine for non-sessionful queues. For sessionful queues, receiving stops after receiving 2048 messages with `subscribe` method and leaves the receiver hanging. (Same with `receiveMessages` API if requested for a large number of messages or called in a loop)

### Reason
The difference between messages from sessionful and non-sessionful queues is the "settled" flag - it is "true" for non-sessions but "false" for sessions which makes the circular buffer full. ("rhea" checks the "settled" flag to pop the deliveries from the circular buffer)

The "settled" flag is updated in rhea mainly by a couple of methods, auto_settle and auto_accept. And this "auto_accept" method can be invoked by setting the "autoaccept" option to true in the receiver options while creating the receiver in the receiveAndDelete mode, which is being set for the non-sessions case, but not for the sessions at the SB SDK level.

### Fix
Set the "autoaccept" option to true in the receiver options for the receiveAndDelete mode for sessions to be able to clear the buffer in "rhea".

### Related issues
Azure#13109 Azure#11633 Azure#8875

### TODO
- [x] There is common code in `messageReceiver.ts` and `messageSession.ts` that can be refactored.
- [x] Test that this PR fixes the receiveBatch scenarios too
- [x] Changelog
  • Loading branch information
HarshaNalluru authored and ljian3377 committed Jan 22, 2021
1 parent bef6d9d commit a5d2a92
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 51 deletions.
1 change: 1 addition & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 7.0.2 (Unreleased)

- [Bug Fix] Receiving messages from sessions in "receiveAndDelete" mode using the `subscribe()` method stops after receiving 2048 of them and leaves the receiver hanging. The bug has been fixed in [PR 13178](https://github.com/Azure/azure-sdk-for-js/pull/13178). Also fixes the same issue that is seen with the `receiveMessages` API when large number of messages are requested or if the API is called in a loop.

## 7.0.1 (2021-01-11)

Expand Down
41 changes: 17 additions & 24 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,15 @@ import { getUniqueName } from "../util/utils";
import { ProcessErrorArgs, ReceiveMode, SubscribeOptions } from "../models";
import { DispositionStatusOptions } from "./managementClient";
import { AbortSignalLike } from "@azure/core-http";
import { onMessageSettled, DeferredPromiseAndTimer } from "./shared";
import {
onMessageSettled,
DeferredPromiseAndTimer,
ReceiverHandlers,
createReceiverOptions
} from "./shared";
import { LockRenewer } from "./autoLockRenewer";
import { translateServiceBusError } from "../serviceBusError";

/**
* @internal
* @hidden
*/
export type ReceiverHandlers = Pick<
ReceiverOptions,
"onMessage" | "onError" | "onClose" | "onSessionError" | "onSessionClose"
>;

/**
* @internal
* @hidden
Expand Down Expand Up @@ -174,22 +170,19 @@ export abstract class MessageReceiver extends LinkEntity<Receiver> {
useNewName: boolean,
handlers: ReceiverHandlers
): ReceiverOptions {
const rcvrOptions: ReceiverOptions = {
name: useNewName ? getUniqueName(this.baseName) : this.name,
autoaccept: this.receiveMode === "receiveAndDelete" ? true : false,
// receiveAndDelete -> first(0), peekLock -> second (1)
rcv_settle_mode: this.receiveMode === "receiveAndDelete" ? 0 : 1,
// receiveAndDelete -> settled (1), peekLock -> unsettled (0)
snd_settle_mode: this.receiveMode === "receiveAndDelete" ? 1 : 0,
source: {
const rcvrOptions: ReceiverOptions = createReceiverOptions(
useNewName ? getUniqueName(this.baseName) : this.name,
this.receiveMode,
{
address: this.address
},
credit_window: 0,
onSettled: (context) => {
return onMessageSettled(this.logPrefix, context.delivery, this._deliveryDispositionMap);
},
...handlers
};
{
onSettled: (context: EventContext) => {
return onMessageSettled(this.logPrefix, context.delivery, this._deliveryDispositionMap);
},
...handlers
}
);

return rcvrOptions;
}
Expand Down
45 changes: 44 additions & 1 deletion sdk/servicebus/service-bus/src/core/shared.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { Delivery } from "rhea-promise";
import { Delivery, ReceiverOptions, Source } from "rhea-promise";
import { translateServiceBusError } from "../serviceBusError";
import { receiverLogger } from "../log";
import { ReceiveMode } from "../models";

/**
* @internal
* @hidden
*/
export type ReceiverHandlers = Pick<
ReceiverOptions,
"onMessage" | "onError" | "onClose" | "onSessionError" | "onSessionClose" | "onSettled"
>;

/**
* @internal
Expand Down Expand Up @@ -68,3 +78,36 @@ export function onMessageSettled(
}
}
}

/**
* Creates the options that need to be specified while creating an AMQP receiver link.
*
* @internal
* @hidden
* @param {string} name
* @param {ReceiveMode} receiveMode
* @param {Source} source
* @param {ReceiverHandlers} handlers
*/
export function createReceiverOptions(
name: string,
receiveMode: ReceiveMode,
source: Source,
handlers: ReceiverHandlers
): ReceiverOptions {
const rcvrOptions: ReceiverOptions = {
name,
// "autoaccept" being true in the "receiveAndDelete" mode sets the "settled" flag to true on the deliveries
// which helps in clearing the circular buffer(size=2048) as it is needed to receive messages after 2048 of them are received.
autoaccept: receiveMode === "receiveAndDelete" ? true : false,
// receiveAndDelete -> first(0), peekLock -> second (1)
rcv_settle_mode: receiveMode === "receiveAndDelete" ? 0 : 1,
// receiveAndDelete -> settled (1), peekLock -> unsettled (0)
snd_settle_mode: receiveMode === "receiveAndDelete" ? 1 : 0,
source,
credit_window: 0,
...handlers
};

return rcvrOptions;
}
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import {
OnAmqpEventAsPromise,
OnError,
OnMessage,
ReceiveOptions,
ReceiverHandlers
ReceiveOptions
} from "./messageReceiver";
import { ConnectionContext } from "../connectionContext";

Expand All @@ -29,6 +28,7 @@ import { ServiceBusMessageImpl } from "../serviceBusMessage";
import { AbortSignalLike } from "@azure/abort-controller";
import { translateServiceBusError } from "../serviceBusError";
import { abandonMessage, completeMessage } from "../receivers/shared";
import { ReceiverHandlers } from "./shared";

/**
* @internal
Expand Down
45 changes: 21 additions & 24 deletions sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
StandardAbortMessage
} from "../util/utils";
import { BatchingReceiverLite, MinimalReceiver } from "../core/batchingReceiver";
import { onMessageSettled, DeferredPromiseAndTimer } from "../core/shared";
import { onMessageSettled, DeferredPromiseAndTimer, createReceiverOptions } from "../core/shared";
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { ReceiverHelper } from "../core/receiverHelper";
import {
Expand Down Expand Up @@ -321,31 +321,28 @@ export class MessageSession extends LinkEntity<Receiver> {
* Creates the options that need to be specified while creating an AMQP receiver link.
*/
private _createMessageSessionOptions(): ReceiverOptions {
const rcvrOptions: ReceiverOptions = {
name: this.name,
autoaccept: false,
// receiveAndDelete -> first(0), peekLock -> second (1)
rcv_settle_mode: this.receiveMode === "receiveAndDelete" ? 0 : 1,
// receiveAndDelete -> settled (1), peekLock -> unsettled (0)
snd_settle_mode: this.receiveMode === "receiveAndDelete" ? 1 : 0,
source: {
const rcvrOptions: ReceiverOptions = createReceiverOptions(
this.name,
this.receiveMode,
{
address: this.address,
filter: {}
filter: { [Constants.sessionFilterName]: this.sessionId }
},
credit_window: 0,
onClose: (context) =>
this._onAmqpClose(context).catch(() => {
/* */
}),
onSessionClose: (context) =>
this._onSessionClose(context).catch(() => {
/* */
}),
onError: this._onAmqpError,
onSessionError: this._onSessionError,
onSettled: this._onSettled
};
(rcvrOptions.source as any).filter[Constants.sessionFilterName] = this.sessionId;
{
onClose: (context) =>
this._onAmqpClose(context).catch(() => {
/* */
}),
onSessionClose: (context) =>
this._onSessionClose(context).catch(() => {
/* */
}),
onError: this._onAmqpError,
onSessionError: this._onSessionError,
onSettled: this._onSettled
}
);

return rcvrOptions;
}

Expand Down

0 comments on commit a5d2a92

Please sign in to comment.