Skip to content

Commit

Permalink
[Event Hubs] Improve prefetching (#27647)
Browse files Browse the repository at this point in the history
### Packages impacted by this PR
@Azure/event-hubs

### Issues associated with this PR
#27253

### Describe the problem that is addressed by this PR

#27253 (comment)

### What are the possible designs available to address the problem? If
there are more than one possible design, why was the one in this PR
chosen?


### Are there test cases added in this PR? _(If not, why?)_
To be tested using stress testing framework.

UPDATE: The results are in and it is confirmed there is no more space
leak!

### Provide a list of related PRs _(if any)_
#26065

### Command used to generate this PR:**_(Applicable only to SDK release
request PRs)_

### Checklists
- [x] Added impacted package name to the issue description
- [ ] Does this PR needs any fixes in the SDK Generator?** _(If so,
create an Issue in the
[Autorest/typescript](https://github.com/Azure/autorest.typescript)
repository and link it here)_
- [x] Added a changelog (if necessary)
  • Loading branch information
deyaaeldeen authored Nov 7, 2023
1 parent a5db245 commit e987353
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 15 deletions.
1 change: 1 addition & 0 deletions sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Breaking Changes

### Bugs Fixed
- Improve event prefetching to not overload the internal queue.

### Other Changes

Expand Down
20 changes: 11 additions & 9 deletions sdk/eventhub/event-hubs/src/partitionReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ export interface PartitionReceiver {
interface ConnectOptions {
abortSignal: AbortSignalLike | undefined;
timeoutInMs: number;
prefetchCount: number;
}

interface ReceiverState {
Expand Down Expand Up @@ -156,7 +155,7 @@ export function createReceiver(
logger.verbose(`is open? -> ${isOpen}`);
return isOpen;
},
async connect({ abortSignal, timeoutInMs, prefetchCount }: ConnectOptions): Promise<void> {
async connect({ abortSignal, timeoutInMs }: ConnectOptions): Promise<void> {
if (state.isConnecting || obj.isOpen()) {
return;
}
Expand All @@ -174,7 +173,6 @@ export function createReceiver(
obj,
state,
queue,
prefetchCount,
eventPosition,
logger,
options,
Expand Down Expand Up @@ -203,6 +201,7 @@ export function createReceiver(
maxWaitTimeInSeconds: number = 60,
abortSignal?: AbortSignalLike
) => {
const prefetchCount = options.prefetchCount ?? maxMessageCount * 3;
const cleanupBeforeAbort = (): Promise<void> => {
logger.info(abortLogMessage);
return obj.close();
Expand All @@ -224,10 +223,10 @@ export function createReceiver(
.connect({
abortSignal,
timeoutInMs: getRetryAttemptTimeoutInMs(options.retryOptions),
prefetchCount: options.prefetchCount ?? maxMessageCount * 3,
})
.then(() => {
logger.verbose(`setting the wait timer for ${maxWaitTimeInSeconds} seconds`);
addCredits(state.link, Math.max(prefetchCount, maxMessageCount) - queue.length);
logger.verbose(`setting the max wait time to ${maxWaitTimeInSeconds} seconds`);
return waitForEvents(
maxMessageCount,
maxWaitTimeInSeconds * 1000,
Expand Down Expand Up @@ -504,7 +503,6 @@ function createRheaOptions(
obj: PartitionReceiver,
state: ReceiverState,
queue: ReceivedEventData[],
prefetchCount: number,
eventPosition: EventPosition,
logger: SimpleLogger,
options: PartitionReceiverOptions
Expand All @@ -516,7 +514,7 @@ function createRheaOptions(
source: {
address,
},
credit_window: prefetchCount,
credit_window: 0,
properties: {
[receiverIdPropertyName]: consumerId,
},
Expand Down Expand Up @@ -550,7 +548,6 @@ async function setupLink(
obj: PartitionReceiver,
state: ReceiverState,
queue: ReceivedEventData[],
prefetchCount: number,
eventPosition: EventPosition,
logger: SimpleLogger,
options: PartitionReceiverOptions,
Expand All @@ -563,7 +560,6 @@ async function setupLink(
obj,
state,
queue,
prefetchCount,
eventPosition,
logger,
options
Expand All @@ -577,3 +573,9 @@ async function setupLink(
logger.verbose("is created successfully");
ctx.receivers[name] = obj;
}

function addCredits(receiver: Link | undefined, creditsToAdd: number): void {
if (creditsToAdd > 0) {
receiver?.addCredit(creditsToAdd);
}
}
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/test/internal/cancellation.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ testWithServiceTypes((serviceVersion) => {
it(`initialize supports cancellation (${caseType})`, async () => {
const abortSignal = getSignal();
try {
await client.connect({ abortSignal, timeoutInMs: 60000, prefetchCount: 1 });
await client.connect({ abortSignal, timeoutInMs: 60000 });
throw new Error(TEST_FAILURE);
} catch (err: any) {
should.equal(err.name, "AbortError");
Expand All @@ -119,7 +119,7 @@ testWithServiceTypes((serviceVersion) => {

it(`receiveBatch supports cancellation when connection already exists (${caseType})`, async () => {
// Open the connection.
await client.connect({ abortSignal: undefined, timeoutInMs: 60000, prefetchCount: 1 });
await client.connect({ abortSignal: undefined, timeoutInMs: 60000 });
try {
const abortSignal = getSignal();
await client.receiveBatch(10, undefined, abortSignal);
Expand Down
4 changes: 0 additions & 4 deletions sdk/eventhub/event-hubs/test/internal/node/disconnect.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,10 @@ testWithServiceTypes((serviceVersion) => {
await receiver1.connect({
abortSignal: undefined,
timeoutInMs: 60000,
prefetchCount: 1,
});
await receiver2.connect({
abortSignal: undefined,
timeoutInMs: 60000,
prefetchCount: 1,
});

// We are going to override sender1's close method so that it also invokes receiver2's close method.
Expand Down Expand Up @@ -199,12 +197,10 @@ testWithServiceTypes((serviceVersion) => {
await receiver1.connect({
abortSignal: undefined,
timeoutInMs: 60000,
prefetchCount: 1,
});
await receiver2.connect({
abortSignal: undefined,
timeoutInMs: 60000,
prefetchCount: 1,
});

// We are going to override sender1's close method so that it also invokes receiver2's close method.
Expand Down

0 comments on commit e987353

Please sign in to comment.