Skip to content

Commit

Permalink
[service-bus] Receiver unification - BatchingReceiver (#10277)
Browse files Browse the repository at this point in the history
As part of the ongoing enhancements to Track 2 we've been running into issues where code is getting duplicated between session and non-session (or worse, is drifting apart). 

This PR is first in a series to unify the implementations, starting with unifying BatchingReceiver and MessageSession to the same body of code.

This is tangentially related to #9829, which will get the same treatment but for StreamingReceiver and MessageSession.
  • Loading branch information
richardpark-msft authored Jul 28, 2020
1 parent 7888918 commit 2513783
Show file tree
Hide file tree
Showing 9 changed files with 1,674 additions and 1,638 deletions.
594 changes: 285 additions & 309 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts

Large diffs are not rendered by default.

135 changes: 44 additions & 91 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts

Large diffs are not rendered by default.

70 changes: 70 additions & 0 deletions sdk/servicebus/service-bus/src/core/shared.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { Delivery } from "rhea-promise";
import { translate } from "@azure/core-amqp";
import * as log from "../log";

/**
* @internal
* @ignore
*/
export interface DeferredPromiseAndTimer {
resolve: (value?: any) => void;
reject: (reason?: any) => void;
timer: NodeJS.Timer;
}

/**
* This is the shared onSettled handler for all of the receiver implementations.
*
* The sequence is basically:
* 1. User calls `await <ServiceBusMessage instance>.complete()` (or other settlement methods)
* 2. This creates a `Promise` that gets stored in the _deliveryDispositionMap
* 3. When the service acknowledges the settlement this method gets called for that message.
* 4. We resolve() the promise from the _deliveryDispositionMap.
* 5. User's code after the settlement continues.
*
* @internal
* @ignore
*/
export function onMessageSettled(
connectionId: string,
delivery: Delivery | undefined,
deliveryDispositionMap: Map<number, DeferredPromiseAndTimer>
): void {
if (delivery) {
const id = delivery.id;
const state = delivery.remote_state;
const settled = delivery.remote_settled;
log.receiver(
"[%s] Delivery with id %d, remote_settled: %s, remote_state: %o has been " + "received.",
connectionId,
id,
settled,
state && state.error ? state.error : state
);
if (settled && deliveryDispositionMap.has(id)) {
const promise = deliveryDispositionMap.get(id) as DeferredPromiseAndTimer;
clearTimeout(promise.timer);
log.receiver(
"[%s] Found the delivery with id %d in the map and cleared the timer.",
connectionId,
id
);
const deleteResult = deliveryDispositionMap.delete(id);
log.receiver(
"[%s] Successfully deleted the delivery with id %d from the map.",
connectionId,
id,
deleteResult
);
if (state && state.error && (state.error.condition || state.error.description)) {
const error = translate(state.error);
return promise.reject(error);
}

return promise.resolve();
}
}
}
5 changes: 3 additions & 2 deletions sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece
const receivedMessages = await this._messageSession!.receiveMessages(
maxMessageCount,
options?.maxWaitTimeInMs ?? Constants.defaultOperationTimeoutInMs,
defaultMaxTimeAfterFirstMessageForBatchingMs
defaultMaxTimeAfterFirstMessageForBatchingMs,
options?.abortSignal
);

return (receivedMessages as any) as ReceivedMessageT[];
Expand Down Expand Up @@ -475,7 +476,7 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece
}

try {
this._messageSession.receive(onMessage, onError, options);
this._messageSession.subscribe(onMessage, onError, options);
} catch (err) {
onError(err);
}
Expand Down
Loading

0 comments on commit 2513783

Please sign in to comment.