Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Prefetch events #26065

Merged
merged 6 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
# Release History

## 5.10.1 (Unreleased)

### Features Added

### Breaking Changes

### Bugs Fixed
## 5.11.0 (2023-06-06)

### Other Changes

- Use Rhea's prefetch window to prefetch events from the service. This improves the performance of the receiver by reducing the number of round trips to the service. The default prefetch window is 3 * `maxBatchSize` events. This can be configured by setting the `prefetchCount` option on the `subscribe` method on `EventHubConsumerClient`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens when the user closes the consumer/subscription as soon as the prefetch receives the events?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be a problem in service-bus since the delivery count would be updated, probably have no effects in event-hubs, would like to understand more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The receiver will be discarded including the prefetched list of events. This is ok in Event Hubs because when you receive from a partition, you start reading from a specific event position and not necessarily from the last event fetched.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no event position is specified, the reading will start from the earliest event in the partition.


## 5.10.0 (2023-05-01)

### Bugs Fixed
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@azure/event-hubs",
"sdk-type": "client",
"version": "5.10.1",
"version": "5.11.0",
"description": "Azure Event Hubs SDK for JS.",
"author": "Microsoft Corporation",
"license": "MIT",
Expand Down
1 change: 1 addition & 0 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ export interface SubscribeOptions {
maxBatchSize?: number;
maxWaitTimeInSeconds?: number;
ownerLevel?: number;
prefetchCount?: number;
skipParsingBodyAsJson?: boolean;
startPosition?: EventPosition | {
[partitionId: string]: EventPosition;
Expand Down
4 changes: 4 additions & 0 deletions sdk/eventhub/event-hubs/src/eventHubConsumerClientModels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ export interface SubscribeOptions {
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
/**
* The count of events requested eagerly and queued without regard to whether a read was requested.
*/
prefetchCount?: number;
}

/**
Expand Down
4 changes: 4 additions & 0 deletions sdk/eventhub/event-hubs/src/models/private.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ export interface EventHubConsumerOptions {
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
/**
* The count of events requested eagerly and queued without regard to whether a read was requested.
*/
PrefetchCount?: number;
deyaaeldeen marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
1 change: 1 addition & 0 deletions sdk/eventhub/event-hubs/src/partitionPump.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ export class PartitionPump {
trackLastEnqueuedEventProperties: this._processorOptions.trackLastEnqueuedEventProperties,
retryOptions: this._processorOptions.retryOptions,
skipParsingBodyAsJson: this._processorOptions.skipParsingBodyAsJson,
PrefetchCount: this._processorOptions.prefetchCount,
}
);

Expand Down
36 changes: 13 additions & 23 deletions sdk/eventhub/event-hubs/src/partitionReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ export interface PartitionReceiver {
interface ConnectOptions {
abortSignal: AbortSignalLike | undefined;
timeoutInMs: number;
prefetchCount: number;
}

interface ReceiverState {
Expand Down Expand Up @@ -153,7 +154,7 @@ export function createReceiver(
logger.verbose(`is open? -> ${isOpen}`);
return isOpen;
},
async connect({ abortSignal, timeoutInMs }: ConnectOptions): Promise<void> {
async connect({ abortSignal, timeoutInMs, prefetchCount }: ConnectOptions): Promise<void> {
if (state.isConnecting || obj.isOpen()) {
return;
}
Expand All @@ -170,6 +171,7 @@ export function createReceiver(
obj,
state,
queue,
prefetchCount,
eventPosition,
logger,
options,
Expand Down Expand Up @@ -211,26 +213,19 @@ export function createReceiver(
cleanupBeforeAbort();
return Promise.reject(new AbortError(StandardAbortMessage));
}
return obj.isClosed || ctx.wasConnectionCloseCalled
? Promise.resolve(queue.splice(0))
: eventsToRetrieveCount === 0
return obj.isClosed || ctx.wasConnectionCloseCalled || eventsToRetrieveCount === 0
? Promise.resolve(queue.splice(0, maxMessageCount))
: new Promise<void>((resolve, reject) => {
obj._onError = reject;
obj // eslint-disable-line promise/catch-or-return
.connect({
abortSignal,
timeoutInMs: getRetryAttemptTimeoutInMs(options.retryOptions),
prefetchCount: options.PrefetchCount ?? maxMessageCount * 3,
})
.then(() => {
if (addCredits(state.link, eventsToRetrieveCount) > 0) {
return logger.verbose(
`setting the wait timer for ${maxWaitTimeInSeconds} seconds`
);
} else return;
})
.then(() =>
waitForEvents(
logger.verbose(`setting the wait timer for ${maxWaitTimeInSeconds} seconds`);
return waitForEvents(
maxMessageCount,
maxWaitTimeInSeconds * 1000,
qReadIntervalInMs,
Expand All @@ -252,8 +247,8 @@ export function createReceiver(
`no messages received when max wait time in seconds ${maxWaitTimeInSeconds} is over`
),
}
)
)
);
})
.catch(reject)
.then(resolve);
})
Expand Down Expand Up @@ -415,14 +410,6 @@ function setEventProps(eventProps: LastEnqueuedEventProperties, data: EventDataI
eventProps.retrievedOn = data.retrievalTime;
}

function addCredits(receiver: Link | undefined, eventsToRetrieveCount: number): number {
const creditsToAdd = eventsToRetrieveCount - (receiver?.credit ?? 0);
if (creditsToAdd > 0) {
receiver?.addCredit(creditsToAdd);
}
return creditsToAdd;
}

function clearHandlers(obj: WritableReceiver): void {
obj._onError = undefined;
}
Expand Down Expand Up @@ -513,6 +500,7 @@ function createRheaOptions(
obj: PartitionReceiver,
state: ReceiverState,
queue: ReceivedEventData[],
prefetchCount: number,
eventPosition: EventPosition,
logger: SimpleLogger,
options: EventHubConsumerOptions
Expand All @@ -523,7 +511,7 @@ function createRheaOptions(
source: {
address,
},
credit_window: 0,
credit_window: prefetchCount,
onClose: (context) => onClose(context, state, logger),
onSessionClose: (context) => onSessionClose(context, state, logger),
onError: (context) => onError(context, obj, state.link, logger),
Expand Down Expand Up @@ -555,6 +543,7 @@ async function setupLink(
obj: PartitionReceiver,
state: ReceiverState,
queue: ReceivedEventData[],
prefetchCount: number,
eventPosition: EventPosition,
logger: SimpleLogger,
options: EventHubConsumerOptions,
Expand All @@ -566,6 +555,7 @@ async function setupLink(
obj,
state,
queue,
prefetchCount,
eventPosition,
logger,
options
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/src/util/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/
export const packageJsonInfo = {
name: "@azure/event-hubs",
version: "5.10.1",
version: "5.11.0",
};

/**
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/test/internal/cancellation.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ testWithServiceTypes((serviceVersion) => {
it(`initialize supports cancellation (${caseType})`, async () => {
const abortSignal = getSignal();
try {
await client.connect({ abortSignal, timeoutInMs: 60000 });
await client.connect({ abortSignal, timeoutInMs: 60000, prefetchCount: 1 });
throw new Error(TEST_FAILURE);
} catch (err: any) {
should.equal(err.name, "AbortError");
Expand All @@ -118,7 +118,7 @@ testWithServiceTypes((serviceVersion) => {

it(`receiveBatch supports cancellation when connection already exists (${caseType})`, async () => {
// Open the connection.
await client.connect({ abortSignal: undefined, timeoutInMs: 60000 });
await client.connect({ abortSignal: undefined, timeoutInMs: 60000, prefetchCount: 1 });
try {
const abortSignal = getSignal();
await client.receiveBatch(10, undefined, abortSignal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ testWithServiceTypes((serviceVersion) => {
await receiver1.connect({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this prefetch count support related to the user issue?
I thought the user issue was about asking for a minimum number of messages?
Also, I don't think this was a feature in 5.8.0, how was rolling back to 5.8.0 was fixing the issue for the user?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not related. This PR improves performance by prefetching events so chances are the received batch size will be larger than before. We still don't want to wait until we receive maxBatchSize because that is against the library goal of maximizing throughput.

Now there is a question for you, how can we analyze the performance characteristic of this change? More specifically, can we show the average/median batch size before and after the change? can we show how often the user callback was called per minute or second?

Copy link
Member

@HarshaNalluru HarshaNalluru Jun 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, these are good metrics to track, how often the user callback is called is already being tracked in the perf test.
We would need to update the framework/test to track the batch size.

Let's work on testing this, so we don't miss anything obvious. 🙂

Copy link
Member

@HarshaNalluru HarshaNalluru Jun 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a stress test run for the checkpoint store test ..
Red ❤️ is the code from this PR, and Blue 💙 is the 5.10.0 version.

image

@deyaaeldeen and I were discussing...
And we speculate the bigger spikes in the CPU usage to be when the credits are added for the "new events to be received". With the new pre-fetch method, the spikes are a lot smaller and less dense.
The memory usage graph also validates the prefetch's improvement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot @HarshaNalluru!

Also, the performance analysis shows that the size median of the received batches increased as well as max and mean throughput.

abortSignal: undefined,
timeoutInMs: 60000,
prefetchCount: 1,
});
await receiver2.connect({
abortSignal: undefined,
timeoutInMs: 60000,
prefetchCount: 1,
});

// We are going to override sender1's close method so that it also invokes receiver2's close method.
Expand Down Expand Up @@ -183,10 +185,12 @@ testWithServiceTypes((serviceVersion) => {
await receiver1.connect({
abortSignal: undefined,
timeoutInMs: 60000,
prefetchCount: 1,
});
await receiver2.connect({
abortSignal: undefined,
timeoutInMs: 60000,
prefetchCount: 1,
});

// We are going to override sender1's close method so that it also invokes receiver2's close method.
Expand Down