Skip to content

Commit

Permalink
[Event Hubs] Prefetch events (Azure#26065)
Browse files Browse the repository at this point in the history
Updates the partition receiver to use Rhea's prefetch window instead of
manually fetching the exact number of events needed. It also adds a
`subscribe` option, `prefetchCount`, that controls the max number of
events to prefetch.

Live run:
https://dev.azure.com/azure-sdk/internal/_build/results?buildId=2814550&view=results

Related issue: Azure#26055
  • Loading branch information
deyaaeldeen authored and minhanh-phan committed Jun 12, 2023
1 parent ce8f2c3 commit 7b1ea98
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 34 deletions.
10 changes: 3 additions & 7 deletions sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
# Release History

## 5.10.1 (Unreleased)

### Features Added

### Breaking Changes

### Bugs Fixed
## 5.11.0 (2023-06-06)

### Other Changes

- Use Rhea's prefetch window to prefetch events from the service. This improves the performance of the receiver by reducing the number of round trips to the service. The default prefetch window is 3 * `maxBatchSize` events. This can be configured by setting the `prefetchCount` option on the `subscribe` method on `EventHubConsumerClient`.

## 5.10.0 (2023-05-01)

### Bugs Fixed
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@azure/event-hubs",
"sdk-type": "client",
"version": "5.10.1",
"version": "5.11.0",
"description": "Azure Event Hubs SDK for JS.",
"author": "Microsoft Corporation",
"license": "MIT",
Expand Down
1 change: 1 addition & 0 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ export interface SubscribeOptions {
maxBatchSize?: number;
maxWaitTimeInSeconds?: number;
ownerLevel?: number;
prefetchCount?: number;
skipParsingBodyAsJson?: boolean;
startPosition?: EventPosition | {
[partitionId: string]: EventPosition;
Expand Down
4 changes: 4 additions & 0 deletions sdk/eventhub/event-hubs/src/eventHubConsumerClientModels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ export interface SubscribeOptions {
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
/**
* The count of events requested eagerly and queued without regard to whether a read was requested.
*/
prefetchCount?: number;
}

/**
Expand Down
4 changes: 4 additions & 0 deletions sdk/eventhub/event-hubs/src/models/private.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ export interface EventHubConsumerOptions {
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
/**
* The count of events requested eagerly and queued without regard to whether a read was requested.
*/
prefetchCount?: number;
}

/**
Expand Down
1 change: 1 addition & 0 deletions sdk/eventhub/event-hubs/src/partitionPump.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ export class PartitionPump {
trackLastEnqueuedEventProperties: this._processorOptions.trackLastEnqueuedEventProperties,
retryOptions: this._processorOptions.retryOptions,
skipParsingBodyAsJson: this._processorOptions.skipParsingBodyAsJson,
prefetchCount: this._processorOptions.prefetchCount,
}
);

Expand Down
36 changes: 13 additions & 23 deletions sdk/eventhub/event-hubs/src/partitionReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ export interface PartitionReceiver {
interface ConnectOptions {
abortSignal: AbortSignalLike | undefined;
timeoutInMs: number;
prefetchCount: number;
}

interface ReceiverState {
Expand Down Expand Up @@ -153,7 +154,7 @@ export function createReceiver(
logger.verbose(`is open? -> ${isOpen}`);
return isOpen;
},
async connect({ abortSignal, timeoutInMs }: ConnectOptions): Promise<void> {
async connect({ abortSignal, timeoutInMs, prefetchCount }: ConnectOptions): Promise<void> {
if (state.isConnecting || obj.isOpen()) {
return;
}
Expand All @@ -170,6 +171,7 @@ export function createReceiver(
obj,
state,
queue,
prefetchCount,
eventPosition,
logger,
options,
Expand Down Expand Up @@ -211,26 +213,19 @@ export function createReceiver(
cleanupBeforeAbort();
return Promise.reject(new AbortError(StandardAbortMessage));
}
return obj.isClosed || ctx.wasConnectionCloseCalled
? Promise.resolve(queue.splice(0))
: eventsToRetrieveCount === 0
return obj.isClosed || ctx.wasConnectionCloseCalled || eventsToRetrieveCount === 0
? Promise.resolve(queue.splice(0, maxMessageCount))
: new Promise<void>((resolve, reject) => {
obj._onError = reject;
obj // eslint-disable-line promise/catch-or-return
.connect({
abortSignal,
timeoutInMs: getRetryAttemptTimeoutInMs(options.retryOptions),
prefetchCount: options.prefetchCount ?? maxMessageCount * 3,
})
.then(() => {
if (addCredits(state.link, eventsToRetrieveCount) > 0) {
return logger.verbose(
`setting the wait timer for ${maxWaitTimeInSeconds} seconds`
);
} else return;
})
.then(() =>
waitForEvents(
logger.verbose(`setting the wait timer for ${maxWaitTimeInSeconds} seconds`);
return waitForEvents(
maxMessageCount,
maxWaitTimeInSeconds * 1000,
qReadIntervalInMs,
Expand All @@ -252,8 +247,8 @@ export function createReceiver(
`no messages received when max wait time in seconds ${maxWaitTimeInSeconds} is over`
),
}
)
)
);
})
.catch(reject)
.then(resolve);
})
Expand Down Expand Up @@ -415,14 +410,6 @@ function setEventProps(eventProps: LastEnqueuedEventProperties, data: EventDataI
eventProps.retrievedOn = data.retrievalTime;
}

function addCredits(receiver: Link | undefined, eventsToRetrieveCount: number): number {
const creditsToAdd = eventsToRetrieveCount - (receiver?.credit ?? 0);
if (creditsToAdd > 0) {
receiver?.addCredit(creditsToAdd);
}
return creditsToAdd;
}

function clearHandlers(obj: WritableReceiver): void {
obj._onError = undefined;
}
Expand Down Expand Up @@ -513,6 +500,7 @@ function createRheaOptions(
obj: PartitionReceiver,
state: ReceiverState,
queue: ReceivedEventData[],
prefetchCount: number,
eventPosition: EventPosition,
logger: SimpleLogger,
options: EventHubConsumerOptions
Expand All @@ -523,7 +511,7 @@ function createRheaOptions(
source: {
address,
},
credit_window: 0,
credit_window: prefetchCount,
onClose: (context) => onClose(context, state, logger),
onSessionClose: (context) => onSessionClose(context, state, logger),
onError: (context) => onError(context, obj, state.link, logger),
Expand Down Expand Up @@ -555,6 +543,7 @@ async function setupLink(
obj: PartitionReceiver,
state: ReceiverState,
queue: ReceivedEventData[],
prefetchCount: number,
eventPosition: EventPosition,
logger: SimpleLogger,
options: EventHubConsumerOptions,
Expand All @@ -566,6 +555,7 @@ async function setupLink(
obj,
state,
queue,
prefetchCount,
eventPosition,
logger,
options
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/src/util/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/
export const packageJsonInfo = {
name: "@azure/event-hubs",
version: "5.10.1",
version: "5.11.0",
};

/**
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/test/internal/cancellation.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ testWithServiceTypes((serviceVersion) => {
it(`initialize supports cancellation (${caseType})`, async () => {
const abortSignal = getSignal();
try {
await client.connect({ abortSignal, timeoutInMs: 60000 });
await client.connect({ abortSignal, timeoutInMs: 60000, prefetchCount: 1 });
throw new Error(TEST_FAILURE);
} catch (err: any) {
should.equal(err.name, "AbortError");
Expand All @@ -118,7 +118,7 @@ testWithServiceTypes((serviceVersion) => {

it(`receiveBatch supports cancellation when connection already exists (${caseType})`, async () => {
// Open the connection.
await client.connect({ abortSignal: undefined, timeoutInMs: 60000 });
await client.connect({ abortSignal: undefined, timeoutInMs: 60000, prefetchCount: 1 });
try {
const abortSignal = getSignal();
await client.receiveBatch(10, undefined, abortSignal);
Expand Down
4 changes: 4 additions & 0 deletions sdk/eventhub/event-hubs/test/internal/node/disconnect.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ testWithServiceTypes((serviceVersion) => {
await receiver1.connect({
abortSignal: undefined,
timeoutInMs: 60000,
prefetchCount: 1,
});
await receiver2.connect({
abortSignal: undefined,
timeoutInMs: 60000,
prefetchCount: 1,
});

// We are going to override sender1's close method so that it also invokes receiver2's close method.
Expand Down Expand Up @@ -183,10 +185,12 @@ testWithServiceTypes((serviceVersion) => {
await receiver1.connect({
abortSignal: undefined,
timeoutInMs: 60000,
prefetchCount: 1,
});
await receiver2.connect({
abortSignal: undefined,
timeoutInMs: 60000,
prefetchCount: 1,
});

// We are going to override sender1's close method so that it also invokes receiver2's close method.
Expand Down

0 comments on commit 7b1ea98

Please sign in to comment.