Skip to content

Commit

Permalink
[service-bus] Allow users to subscribe() if they've closed the active…
Browse files Browse the repository at this point in the history
… subscription. (#10448)

Allow users to make subscribe() calls so long as there is no active subscription.

This commit also fleshes out some of our unit test primitives so you can be a bit more realistic with your unit tests and not have to stub as much using the createClientEntityContextForTests() and createRheaReceiverForTests().

Fixes #9829
  • Loading branch information
richardpark-msft authored Aug 7, 2020
1 parent 33028ab commit e8ab4d2
Show file tree
Hide file tree
Showing 12 changed files with 758 additions and 113 deletions.
1 change: 1 addition & 0 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ export class MessageReceiver extends LinkEntity {
);

this._receiver = await this._context.namespace.connection.createReceiver(options);

this.isConnecting = false;
checkAborted();

Expand Down
49 changes: 38 additions & 11 deletions sdk/servicebus/service-bus/src/core/receiverHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import * as log from "../log";
* @ignore
*/
export class ReceiverHelper {
private _stopReceivingMessages: boolean = false;
private _isSuspended: boolean = false;

constructor(private _getCurrentReceiver: () => Receiver | undefined) {}

Expand All @@ -25,43 +25,66 @@ export class ReceiverHelper {
* @returns true if credits were added, false if there is no current receiver instance
* or `stopReceivingMessages` has been called.
*/
public addCredit(credits: number): boolean {
addCredit(credits: number): boolean {
const receiver = this._getCurrentReceiver();

if (this._stopReceivingMessages || receiver == null) {
if (!this.canReceiveMessages()) {
return false;
}

receiver.addCredit(credits);
if (receiver != null) {
receiver.addCredit(credits);
}

return true;
}

/**
* Prevents us from receiving any further messages.
* Drains the credits for the receiver and prevents the `receiverHelper.addCredit()` method from adding credits.
* Call `resume()` to enable the `addCredit()` method.
*/
public async stopReceivingMessages(): Promise<void> {
async suspend(): Promise<void> {
const receiver = this._getCurrentReceiver();

if (receiver == null) {
this._isSuspended = true;

if (!this._isValidReceiver(receiver)) {
return;
}

log.receiver(
`[${receiver.name}] User has requested to stop receiving new messages, attempting to drain the credits.`
);
this._stopReceivingMessages = true;

return this.drain();
}

/**
* Resets tracking so `addCredit` works again.
*/
resume(): void {
this._isSuspended = false;
}

/**
* Whether the receiver can receive messages.
*
* This checks if the the caller has decided to disable adding
* credits via 'suspend' as well as whether the receiver itself is
* still open.
*/
canReceiveMessages(): boolean {
const receiver = this._getCurrentReceiver();
return !this._isSuspended && this._isValidReceiver(receiver);
}

/**
* Initiates a drain for the current receiver and resolves when
* the drain has completed.
*/
public async drain(): Promise<void> {
async drain(): Promise<void> {
const receiver = this._getCurrentReceiver();

if (receiver == null) {
if (!this._isValidReceiver(receiver)) {
return;
}

Expand All @@ -82,4 +105,8 @@ export class ReceiverHelper {

return drainPromise;
}

private _isValidReceiver(receiver: Receiver | undefined): receiver is Receiver {
return receiver != null && receiver.isOpen();
}
}
45 changes: 26 additions & 19 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ export class StreamingReceiver extends MessageReceiver {
*/
protected _onAmqpMessage: OnAmqpEventAsPromise;

/**
* Whether we are currently registered for receiving messages.
*/
public get isReceivingMessages(): boolean {
// for the streaming receiver so long as we can receive messages then we
// _are_ receiving messages - there's no in-between state like there is
// with BatchingReceiver.
return this._receiverHelper.canReceiveMessages();
}

/**
* Instantiate a new Streaming receiver for receiving messages with handlers.
*
Expand Down Expand Up @@ -500,13 +510,18 @@ export class StreamingReceiver extends MessageReceiver {
};
}

stopReceivingMessages(): Promise<void> {
return this._receiverHelper.stopReceivingMessages();
async stopReceivingMessages(): Promise<void> {
await this._receiverHelper.suspend();
}

init(useNewName: boolean, abortSignal?: AbortSignalLike): Promise<void> {
async init(useNewName: boolean, abortSignal?: AbortSignalLike): Promise<void> {
const options = this._createReceiverOptions(useNewName, this._getHandlers());
return super._init(options, abortSignal);
await this._init(options, abortSignal);

// this might seem odd but in reality this entire class is like one big function call that
// results in a receive(). Once we're being initialized we should consider ourselves the
// "owner" of the receiver and that it's now being locked into being the actual receiver.
this._receiverHelper.resume();
}

/**
Expand All @@ -515,7 +530,7 @@ export class StreamingReceiver extends MessageReceiver {
* @param {OnMessage} onMessage The message handler to receive servicebus messages.
* @param {OnError} onError The error handler to receive an error that occurs while receivin messages.
*/
receive(onMessage: OnMessage, onError: OnError): void {
subscribe(onMessage: OnMessage, onError: OnError): void {
throwErrorIfConnectionClosed(this._context.namespace);

this._onMessage = onMessage;
Expand Down Expand Up @@ -555,22 +570,12 @@ export class StreamingReceiver extends MessageReceiver {
}

this._isDetaching = true;

try {
// Clears the token renewal timer. Closes the link and its session if they are open.
// Removes the link and its session if they are present in rhea's cache.
await this._closeLink(this._receiver);

if (this.receiverType === ReceiverType.batching) {
log.error(
"[%s] Receiver '%s' with address '%s' is a Batching Receiver, so we will not be " +
"re-establishing the receiver link.",
connectionId,
this.name,
this.address
);
return;
}

const translatedError = receiverError ? translate(receiverError) : receiverError;

// Track-1
Expand Down Expand Up @@ -695,7 +700,7 @@ export class StreamingReceiver extends MessageReceiver {
context: ClientEntityContext,
options?: ReceiveOptions &
Pick<OperationOptionsBase, "abortSignal"> & {
_createStreamingReceiver?: (
_createStreamingReceiverStubForTests?: (
context: ClientEntityContext,
options?: ReceiveOptions
) => StreamingReceiver;
Expand All @@ -707,8 +712,10 @@ export class StreamingReceiver extends MessageReceiver {

let sReceiver: StreamingReceiver;

if (options?._createStreamingReceiver) {
sReceiver = options._createStreamingReceiver(context, options);
if (context.streamingReceiver) {
sReceiver = context.streamingReceiver;
} else if (options?._createStreamingReceiverStubForTests) {
sReceiver = options._createStreamingReceiverStubForTests(context, options);
} else {
sReceiver = new StreamingReceiver(context, options);
}
Expand Down
18 changes: 16 additions & 2 deletions sdk/servicebus/service-bus/src/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ export interface MessageHandlers<ReceivedMessageT> {
processError(err: Error): Promise<void>;
}

/**
* @internal
* @ignore
*/
export interface InternalMessageHandlers<ReceivedMessageT>
extends MessageHandlers<ReceivedMessageT> {
/**
* Called when the connection is initialized but before we subscribe to messages or add credits.
*
* NOTE: This handler is completely internal and only used for tests.
*/
processInitialize?: () => Promise<void>;
}

/**
* Represents the possible receive modes for the receiver.
*/
Expand All @@ -44,9 +58,9 @@ export interface CreateReceiverOptions<ReceiveModeT extends ReceiveMode> {
* queue/subscription.
*
* Messages that are not settled within the lock duration will be redelivered as many times as
* the max delivery count set on the queue/subscription, after which they get sent to a separate
* the max delivery count set on the queue/subscription, after which they get sent to a separate
* dead letter queue.
*
*
* You can settle a message by calling complete(), abandon(), defer() or deadletter() methods on
* the message.
*
Expand Down
28 changes: 25 additions & 3 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import {
GetMessageIteratorOptions,
MessageHandlers,
ReceiveMessagesOptions,
SubscribeOptions
SubscribeOptions,
InternalMessageHandlers
} from "../models";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { ReceivedMessage } from "..";
Expand Down Expand Up @@ -206,6 +207,7 @@ export class ReceiverImpl<ReceivedMessageT extends ReceivedMessage | ReceivedMes
* @throws MessagingError if the service returns an error while receiving messages. These are bubbled up to be handled by user provided `onError` handler.
*/
private _registerMessageHandler(
onInitialize: () => Promise<void>,
onMessage: OnMessage,
onError: OnError,
options?: SubscribeOptions
Expand All @@ -231,8 +233,15 @@ export class ReceiverImpl<ReceivedMessageT extends ReceivedMessage | ReceivedMes
if (!sReceiver) {
return;
}

try {
await onInitialize();
} catch (err) {
onError(err);
}

if (!this.isClosed) {
sReceiver.receive(onMessage, onError);
sReceiver.subscribe(onMessage, onError);
} else {
await sReceiver.close();
}
Expand Down Expand Up @@ -393,7 +402,16 @@ export class ReceiverImpl<ReceivedMessageT extends ReceivedMessage | ReceivedMes

const processError = wrapProcessErrorHandler(handlers);

const internalMessageHandlers = handlers as
| InternalMessageHandlers<ReceivedMessageT>
| undefined;

this._registerMessageHandler(
async () => {
if (internalMessageHandlers?.processInitialize) {
await internalMessageHandlers.processInitialize();
}
},
async (message: ServiceBusMessageImpl) => {
return handlers.processMessage((message as any) as ReceivedMessageT);
},
Expand Down Expand Up @@ -441,7 +459,11 @@ export class ReceiverImpl<ReceivedMessageT extends ReceivedMessage | ReceivedMes
* When this returns true, new `registerMessageHandler()` or `receiveMessages()` calls cannot be made.
*/
private _isReceivingMessages(): boolean {
if (this._context.streamingReceiver && this._context.streamingReceiver.isOpen()) {
if (
this._context.streamingReceiver &&
this._context.streamingReceiver.isOpen() &&
this._context.streamingReceiver.isReceivingMessages
) {
return true;
}
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece

return {
close: async (): Promise<void> => {
return this._messageSession?.receiverHelper.stopReceivingMessages();
return this._messageSession?.receiverHelper.suspend();
}
};
}
Expand Down
Loading

0 comments on commit e8ab4d2

Please sign in to comment.