From e987353a1906aebd4bed28f3a636026715a7153b Mon Sep 17 00:00:00 2001 From: Deyaaeldeen Almahallawi Date: Mon, 6 Nov 2023 16:06:35 -0800 Subject: [PATCH] [Event Hubs] Improve prefetching (#27647) ### Packages impacted by this PR @azure/event-hubs ### Issues associated with this PR https://github.com/Azure/azure-sdk-for-js/issues/27253 ### Describe the problem that is addressed by this PR https://github.com/Azure/azure-sdk-for-js/issues/27253#issuecomment-1791749605 ### What are the possible designs available to address the problem? If there are more than one possible design, why was the one in this PR chosen? ### Are there test cases added in this PR? _(If not, why?)_ To be tested using stress testing framework. UPDATE: The results are in and it is confirmed there is no more space leak! ### Provide a list of related PRs _(if any)_ https://github.com/Azure/azure-sdk-for-js/pull/26065 ### Command used to generate this PR:**_(Applicable only to SDK release request PRs)_ ### Checklists - [x] Added impacted package name to the issue description - [ ] Does this PR needs any fixes in the SDK Generator?** _(If so, create an Issue in the [Autorest/typescript](https://github.com/Azure/autorest.typescript) repository and link it here)_ - [x] Added a changelog (if necessary) --- sdk/eventhub/event-hubs/CHANGELOG.md | 1 + .../event-hubs/src/partitionReceiver.ts | 20 ++++++++++--------- .../test/internal/cancellation.spec.ts | 4 ++-- .../test/internal/node/disconnect.spec.ts | 4 ---- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/sdk/eventhub/event-hubs/CHANGELOG.md b/sdk/eventhub/event-hubs/CHANGELOG.md index 713fbd00870c..50f0e046e209 100644 --- a/sdk/eventhub/event-hubs/CHANGELOG.md +++ b/sdk/eventhub/event-hubs/CHANGELOG.md @@ -7,6 +7,7 @@ ### Breaking Changes ### Bugs Fixed +- Improve event prefetching to not overload the internal queue. ### Other Changes diff --git a/sdk/eventhub/event-hubs/src/partitionReceiver.ts b/sdk/eventhub/event-hubs/src/partitionReceiver.ts index a9f35ba58258..173ad0f14a20 100644 --- a/sdk/eventhub/event-hubs/src/partitionReceiver.ts +++ b/sdk/eventhub/event-hubs/src/partitionReceiver.ts @@ -94,7 +94,6 @@ export interface PartitionReceiver { interface ConnectOptions { abortSignal: AbortSignalLike | undefined; timeoutInMs: number; - prefetchCount: number; } interface ReceiverState { @@ -156,7 +155,7 @@ export function createReceiver( logger.verbose(`is open? -> ${isOpen}`); return isOpen; }, - async connect({ abortSignal, timeoutInMs, prefetchCount }: ConnectOptions): Promise { + async connect({ abortSignal, timeoutInMs }: ConnectOptions): Promise { if (state.isConnecting || obj.isOpen()) { return; } @@ -174,7 +173,6 @@ export function createReceiver( obj, state, queue, - prefetchCount, eventPosition, logger, options, @@ -203,6 +201,7 @@ export function createReceiver( maxWaitTimeInSeconds: number = 60, abortSignal?: AbortSignalLike ) => { + const prefetchCount = options.prefetchCount ?? maxMessageCount * 3; const cleanupBeforeAbort = (): Promise => { logger.info(abortLogMessage); return obj.close(); @@ -224,10 +223,10 @@ export function createReceiver( .connect({ abortSignal, timeoutInMs: getRetryAttemptTimeoutInMs(options.retryOptions), - prefetchCount: options.prefetchCount ?? maxMessageCount * 3, }) .then(() => { - logger.verbose(`setting the wait timer for ${maxWaitTimeInSeconds} seconds`); + addCredits(state.link, Math.max(prefetchCount, maxMessageCount) - queue.length); + logger.verbose(`setting the max wait time to ${maxWaitTimeInSeconds} seconds`); return waitForEvents( maxMessageCount, maxWaitTimeInSeconds * 1000, @@ -504,7 +503,6 @@ function createRheaOptions( obj: PartitionReceiver, state: ReceiverState, queue: ReceivedEventData[], - prefetchCount: number, eventPosition: EventPosition, logger: SimpleLogger, options: PartitionReceiverOptions @@ -516,7 +514,7 @@ function createRheaOptions( source: { address, }, - credit_window: prefetchCount, + credit_window: 0, properties: { [receiverIdPropertyName]: consumerId, }, @@ -550,7 +548,6 @@ async function setupLink( obj: PartitionReceiver, state: ReceiverState, queue: ReceivedEventData[], - prefetchCount: number, eventPosition: EventPosition, logger: SimpleLogger, options: PartitionReceiverOptions, @@ -563,7 +560,6 @@ async function setupLink( obj, state, queue, - prefetchCount, eventPosition, logger, options @@ -577,3 +573,9 @@ async function setupLink( logger.verbose("is created successfully"); ctx.receivers[name] = obj; } + +function addCredits(receiver: Link | undefined, creditsToAdd: number): void { + if (creditsToAdd > 0) { + receiver?.addCredit(creditsToAdd); + } +} diff --git a/sdk/eventhub/event-hubs/test/internal/cancellation.spec.ts b/sdk/eventhub/event-hubs/test/internal/cancellation.spec.ts index 50def7904be0..6b742bf04bb2 100644 --- a/sdk/eventhub/event-hubs/test/internal/cancellation.spec.ts +++ b/sdk/eventhub/event-hubs/test/internal/cancellation.spec.ts @@ -98,7 +98,7 @@ testWithServiceTypes((serviceVersion) => { it(`initialize supports cancellation (${caseType})`, async () => { const abortSignal = getSignal(); try { - await client.connect({ abortSignal, timeoutInMs: 60000, prefetchCount: 1 }); + await client.connect({ abortSignal, timeoutInMs: 60000 }); throw new Error(TEST_FAILURE); } catch (err: any) { should.equal(err.name, "AbortError"); @@ -119,7 +119,7 @@ testWithServiceTypes((serviceVersion) => { it(`receiveBatch supports cancellation when connection already exists (${caseType})`, async () => { // Open the connection. - await client.connect({ abortSignal: undefined, timeoutInMs: 60000, prefetchCount: 1 }); + await client.connect({ abortSignal: undefined, timeoutInMs: 60000 }); try { const abortSignal = getSignal(); await client.receiveBatch(10, undefined, abortSignal); diff --git a/sdk/eventhub/event-hubs/test/internal/node/disconnect.spec.ts b/sdk/eventhub/event-hubs/test/internal/node/disconnect.spec.ts index d317e247646f..1445d0341027 100644 --- a/sdk/eventhub/event-hubs/test/internal/node/disconnect.spec.ts +++ b/sdk/eventhub/event-hubs/test/internal/node/disconnect.spec.ts @@ -131,12 +131,10 @@ testWithServiceTypes((serviceVersion) => { await receiver1.connect({ abortSignal: undefined, timeoutInMs: 60000, - prefetchCount: 1, }); await receiver2.connect({ abortSignal: undefined, timeoutInMs: 60000, - prefetchCount: 1, }); // We are going to override sender1's close method so that it also invokes receiver2's close method. @@ -199,12 +197,10 @@ testWithServiceTypes((serviceVersion) => { await receiver1.connect({ abortSignal: undefined, timeoutInMs: 60000, - prefetchCount: 1, }); await receiver2.connect({ abortSignal: undefined, timeoutInMs: 60000, - prefetchCount: 1, }); // We are going to override sender1's close method so that it also invokes receiver2's close method.