Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search] fix cancelation related memory leaks #81996

Merged
merged 4 commits into from
Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions src/plugins/data/common/utils/abort_utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { AbortError, toPromise, getCombinedSignal } from './abort_utils';
import { AbortError, toPromise, getCombinedController } from './abort_utils';

jest.useFakeTimers();

Expand Down Expand Up @@ -58,21 +58,38 @@ describe('AbortUtils', () => {
expect(whenRejected).toBeCalled();
expect(whenRejected.mock.calls[0][0]).toBeInstanceOf(AbortError);
});

test('should expose cleanup handler', () => {
const controller = new AbortController();
const promise = toPromise(controller.signal);
expect(promise.cleanup).toBeDefined();
});

test('calling clean up handler prevents rejects', async () => {
const controller = new AbortController();
const promise = toPromise(controller.signal);
const whenRejected = jest.fn();
promise.catch(whenRejected);
promise.cleanup();
controller.abort();
await flushPromises();
expect(whenRejected).not.toBeCalled();
});
});
});

describe('getCombinedSignal', () => {
test('should return an AbortSignal', () => {
const signal = getCombinedSignal([]);
expect(signal instanceof AbortSignal).toBe(true);
describe('getCombinedController', () => {
test('should return an AbortController', () => {
const controller = getCombinedController([]);
expect(controller).toBeInstanceOf(AbortController);
});

test('should not abort if none of the signals abort', async () => {
const controller1 = new AbortController();
const controller2 = new AbortController();
setTimeout(() => controller1.abort(), 2000);
setTimeout(() => controller2.abort(), 1000);
const signal = getCombinedSignal([controller1.signal, controller2.signal]);
const signal = getCombinedController([controller1.signal, controller2.signal]).signal;
expect(signal.aborted).toBe(false);
jest.advanceTimersByTime(500);
await flushPromises();
Expand All @@ -84,7 +101,7 @@ describe('AbortUtils', () => {
const controller2 = new AbortController();
setTimeout(() => controller1.abort(), 2000);
setTimeout(() => controller2.abort(), 1000);
const signal = getCombinedSignal([controller1.signal, controller2.signal]);
const signal = getCombinedController([controller1.signal, controller2.signal]).signal;
expect(signal.aborted).toBe(false);
jest.advanceTimersByTime(1000);
await flushPromises();
Expand All @@ -95,7 +112,7 @@ describe('AbortUtils', () => {
const controller1 = new AbortController();
const controller2 = new AbortController();
controller1.abort();
const signal = getCombinedSignal([controller1.signal, controller2.signal]);
const signal = getCombinedController([controller1.signal, controller2.signal]).signal;
expect(signal.aborted).toBe(true);
});
});
Expand Down
34 changes: 26 additions & 8 deletions src/plugins/data/common/utils/abort_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,47 @@ export class AbortError extends Error {
*
* @param signal The `AbortSignal` to generate the `Promise` from
*/
export function toPromise(signal: AbortSignal): Promise<never> {
return new Promise((resolve, reject) => {
if (signal.aborted) reject(new AbortError());
const abortHandler = () => {
export function toPromise(signal: AbortSignal): Promise<never> & { cleanup: () => void } {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not a huge fan of adding a method to a built-in class (Promise). We might consider returning an object instead (like { promise, cleanup }). I think it will also help to enforce that any consumers of this function will handle cleaning up and not just use the promise returned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind initial approach I took mostly because it is clearly typed that it has a cleanup method, nevertheless:

I think it will also help to enforce that any consumers of this function will handle cleaning up and not just use the promise returned.

I agree with this 👍 changed {promise, cleanup}

one tiny caveat is that now we can't just use:

Promise.race(signals.map(toPromise))

but have to:

Promise.race(signals.map(toPromise).map(pr => pr.promise))

let abortHandler: () => void;
const cleanup = () => {
if (abortHandler) {
signal.removeEventListener('abort', abortHandler);
}
};
const promise = new Promise<never>((resolve, reject) => {
if (signal.aborted) reject(new AbortError());
abortHandler = () => {
cleanup();
reject(new AbortError());
};
signal.addEventListener('abort', abortHandler);
});

return Object.assign(promise, { cleanup });
}

/**
* Returns an `AbortSignal` that will be aborted when the first of the given signals aborts.
* Returns an `AbortController` that will be aborted when the first of the given signals aborts.
*
* @param signals
*/
export function getCombinedSignal(signals: AbortSignal[]) {
export function getCombinedController(signals: AbortSignal[]) {
const controller = new AbortController();

if (signals.some((signal) => signal.aborted)) {
controller.abort();
} else {
const promises = signals.map((signal) => toPromise(signal));
Promise.race(promises).catch(() => controller.abort());
const cleanup = () => {
promises.forEach((p) => p.cleanup());
controller.signal.removeEventListener('abort', cleanup);
};
controller.signal.addEventListener('abort', cleanup);
Promise.race(promises).catch(() => {
cleanup();
controller.abort();
});
}
return controller.signal;

return controller;
}
2 changes: 1 addition & 1 deletion src/plugins/data/common/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@

/** @internal */
export { shortenDottedString } from './shorten_dotted_string';
export { AbortError, toPromise, getCombinedSignal } from './abort_utils';
export { AbortError, toPromise, getCombinedController } from './abort_utils';
11 changes: 6 additions & 5 deletions src/plugins/data/public/search/search_interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import { PublicMethodsOf } from '@kbn/utility-types';
import { CoreStart, CoreSetup, ToastsSetup } from 'kibana/public';
import { i18n } from '@kbn/i18n';
import {
getCombinedSignal,
AbortError,
IKibanaSearchRequest,
IKibanaSearchResponse,
ISearchOptions,
ES_SEARCH_STRATEGY,
ISessionService,
getCombinedController,
} from '../../common';
import { SearchUsageCollector } from './collectors';
import {
Expand Down Expand Up @@ -171,16 +171,17 @@ export class SearchInterceptor {
...(abortSignal ? [abortSignal] : []),
];

const combinedSignal = getCombinedSignal(signals);
const combinedController = getCombinedController(signals);
const cleanup = () => {
subscription.unsubscribe();
combinedController.signal.removeEventListener('abort', cleanup);
combinedController.abort();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the reason we need to abort here because we need to trigger the internal cleanup from getCombinedController? If so, maybe we can do similarly to what I've suggested above (instead of returning just the signal from getCombinedSignal, return something like {signal, cleanup}). Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the reason we need to abort here because we need to trigger the internal cleanup from getCombinedController

Yes, exactly.

If so, maybe we can do similarly to what I've suggested above (instead of returning just the signal from getCombinedSignal, return something like {signal, cleanup})

Agree. Done!

};

combinedSignal.addEventListener('abort', cleanup);
combinedController.signal.addEventListener('abort', cleanup);

return {
timeoutSignal,
combinedSignal,
combinedSignal: combinedController.signal,
cleanup,
};
}
Expand Down
20 changes: 12 additions & 8 deletions src/plugins/expressions/common/execution/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,18 @@ export class Execution<
else reject(error);
});

this.firstResultFuture.promise.then(
(result) => {
this.state.transitions.setResult(result);
},
(error) => {
this.state.transitions.setError(error);
}
);
this.firstResultFuture.promise
.then(
(result) => {
this.state.transitions.setResult(result);
},
(error) => {
this.state.transitions.setError(error);
}
)
.finally(() => {
this.abortRejection.cleanup();
});
}

async invokeChain(chainArr: ExpressionAstFunction[], input: unknown): Promise<any> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe('ExecutionContract', () => {

test('can cancel execution', () => {
const execution = createExecution('foo bar=123');
const spy = jest.spyOn(execution, 'cancel');
const spy = jest.spyOn(execution, 'cancel').mockImplementation(() => {});
Copy link
Contributor Author

@Dosant Dosant Oct 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that test were silently failing tests (on master)

(node:57027) UnhandledPromiseRejectionWarning: AbortError: Aborted
(node:57027) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:57027) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

I assume that it warns is that cancel is called without start and there is no one subscribed to handled inner AbortRejection promise of the execution.
I mocked it, since this unit test for execution_contract and we are not testing an execution here.

Copy link
Contributor Author

@Dosant Dosant Oct 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just fixed that particular higher level test, but really, probably we should also handle and test edge case:

const ex = new Execution();
ex.cancel() // ?? what happens? 
ex.start() // ?? what happens? 

const contract = new ExecutionContract(execution);

expect(spy).toHaveBeenCalledTimes(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
abortSignal: options.abortSignal,
timeout: this.searchTimeout,
});
const aborted$ = from(toPromise(combinedSignal));
const abortedPromise = toPromise(combinedSignal);
const strategy = options?.strategy || ENHANCED_ES_SEARCH_STRATEGY;

this.pendingCount$.next(this.pendingCount$.getValue() + 1);
Expand All @@ -90,7 +90,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
})
);
}),
takeUntil(aborted$),
takeUntil(from(abortedPromise)),
catchError((e: any) => {
// If we haven't received the response to the initial request, including the ID, then
// we don't need to send a follow-up request to delete this search. Otherwise, we
Expand All @@ -103,6 +103,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
finalize(() => {
this.pendingCount$.next(this.pendingCount$.getValue() - 1);
cleanup();
abortedPromise.cleanup();
})
);
}
Expand Down