Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search] fix cancelation related memory leaks #81996

Merged
merged 4 commits into from
Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 72 additions & 7 deletions src/plugins/data/common/utils/abort_utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ describe('AbortUtils', () => {
describe('rejects', () => {
test('should not reject if the signal does not abort', async () => {
const controller = new AbortController();
const promise = toPromise(controller.signal);
const promise = toPromise(controller.signal).promise;
const whenRejected = jest.fn();
promise.catch(whenRejected);
await flushPromises();
Expand All @@ -50,29 +50,46 @@ describe('AbortUtils', () => {

test('should reject if the signal does abort', async () => {
const controller = new AbortController();
const promise = toPromise(controller.signal);
const promise = toPromise(controller.signal).promise;
const whenRejected = jest.fn();
promise.catch(whenRejected);
controller.abort();
await flushPromises();
expect(whenRejected).toBeCalled();
expect(whenRejected.mock.calls[0][0]).toBeInstanceOf(AbortError);
});

test('should expose cleanup handler', () => {
const controller = new AbortController();
const promise = toPromise(controller.signal);
expect(promise.cleanup).toBeDefined();
});

test('calling clean up handler prevents rejects', async () => {
const controller = new AbortController();
const { promise, cleanup } = toPromise(controller.signal);
const whenRejected = jest.fn();
promise.catch(whenRejected);
cleanup();
controller.abort();
await flushPromises();
expect(whenRejected).not.toBeCalled();
});
});
});

describe('getCombinedSignal', () => {
test('should return an AbortSignal', () => {
const signal = getCombinedSignal([]);
expect(signal instanceof AbortSignal).toBe(true);
const signal = getCombinedSignal([]).signal;
expect(signal).toBeInstanceOf(AbortSignal);
});

test('should not abort if none of the signals abort', async () => {
const controller1 = new AbortController();
const controller2 = new AbortController();
setTimeout(() => controller1.abort(), 2000);
setTimeout(() => controller2.abort(), 1000);
const signal = getCombinedSignal([controller1.signal, controller2.signal]);
const signal = getCombinedSignal([controller1.signal, controller2.signal]).signal;
expect(signal.aborted).toBe(false);
jest.advanceTimersByTime(500);
await flushPromises();
Expand All @@ -84,7 +101,7 @@ describe('AbortUtils', () => {
const controller2 = new AbortController();
setTimeout(() => controller1.abort(), 2000);
setTimeout(() => controller2.abort(), 1000);
const signal = getCombinedSignal([controller1.signal, controller2.signal]);
const signal = getCombinedSignal([controller1.signal, controller2.signal]).signal;
expect(signal.aborted).toBe(false);
jest.advanceTimersByTime(1000);
await flushPromises();
Expand All @@ -95,8 +112,56 @@ describe('AbortUtils', () => {
const controller1 = new AbortController();
const controller2 = new AbortController();
controller1.abort();
const signal = getCombinedSignal([controller1.signal, controller2.signal]);
const signal = getCombinedSignal([controller1.signal, controller2.signal]).signal;
expect(signal.aborted).toBe(true);
});

describe('cleanup listener', () => {
const createMockController = () => {
const controller = new AbortController();
const spyAddListener = jest.spyOn(controller.signal, 'addEventListener');
const spyRemoveListener = jest.spyOn(controller.signal, 'removeEventListener');
return {
controller,
getTotalListeners: () =>
Math.max(spyAddListener.mock.calls.length - spyRemoveListener.mock.calls.length, 0),
};
};

test('cleanup should cleanup inner listeners', () => {
const controller1 = createMockController();
const controller2 = createMockController();

const { cleanup } = getCombinedSignal([
controller1.controller.signal,
controller2.controller.signal,
]);

expect(controller1.getTotalListeners()).toBe(1);
expect(controller2.getTotalListeners()).toBe(1);

cleanup();

expect(controller1.getTotalListeners()).toBe(0);
expect(controller2.getTotalListeners()).toBe(0);
});

test('abort should cleanup inner listeners', async () => {
const controller1 = createMockController();
const controller2 = createMockController();

getCombinedSignal([controller1.controller.signal, controller2.controller.signal]);

expect(controller1.getTotalListeners()).toBe(1);
expect(controller2.getTotalListeners()).toBe(1);

controller1.controller.abort();

await flushPromises();

expect(controller1.getTotalListeners()).toBe(0);
expect(controller2.getTotalListeners()).toBe(0);
});
});
});
});
35 changes: 28 additions & 7 deletions src/plugins/data/common/utils/abort_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,50 @@ export class AbortError extends Error {
*
* @param signal The `AbortSignal` to generate the `Promise` from
*/
export function toPromise(signal: AbortSignal): Promise<never> {
return new Promise((resolve, reject) => {
if (signal.aborted) reject(new AbortError());
const abortHandler = () => {
export function toPromise(signal: AbortSignal): { promise: Promise<never>; cleanup: () => void } {
let abortHandler: () => void;
const cleanup = () => {
if (abortHandler) {
signal.removeEventListener('abort', abortHandler);
}
};
const promise = new Promise<never>((resolve, reject) => {
if (signal.aborted) reject(new AbortError());
abortHandler = () => {
cleanup();
reject(new AbortError());
};
signal.addEventListener('abort', abortHandler);
});

return { promise, cleanup };
}

/**
* Returns an `AbortSignal` that will be aborted when the first of the given signals aborts.
*
* @param signals
*/
export function getCombinedSignal(signals: AbortSignal[]) {
export function getCombinedSignal(
signals: AbortSignal[]
): { signal: AbortSignal; cleanup: () => void } {
const controller = new AbortController();
let cleanup = () => {};

if (signals.some((signal) => signal.aborted)) {
controller.abort();
} else {
const promises = signals.map((signal) => toPromise(signal));
Promise.race(promises).catch(() => controller.abort());
cleanup = () => {
promises.forEach((p) => p.cleanup());
controller.signal.removeEventListener('abort', cleanup);
};
controller.signal.addEventListener('abort', cleanup);
Promise.race(promises.map((p) => p.promise)).catch(() => {
cleanup();
controller.abort();
});
}
return controller.signal;

return { signal: controller.signal, cleanup };
}
7 changes: 4 additions & 3 deletions src/plugins/data/public/search/search_interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import { PublicMethodsOf } from '@kbn/utility-types';
import { CoreStart, CoreSetup, ToastsSetup } from 'kibana/public';
import { i18n } from '@kbn/i18n';
import {
getCombinedSignal,
AbortError,
IKibanaSearchRequest,
IKibanaSearchResponse,
ISearchOptions,
ES_SEARCH_STRATEGY,
ISessionService,
getCombinedSignal,
} from '../../common';
import { SearchUsageCollector } from './collectors';
import {
Expand Down Expand Up @@ -171,11 +171,12 @@ export class SearchInterceptor {
...(abortSignal ? [abortSignal] : []),
];

const combinedSignal = getCombinedSignal(signals);
const { signal: combinedSignal, cleanup: cleanupCombinedSignal } = getCombinedSignal(signals);
const cleanup = () => {
subscription.unsubscribe();
combinedSignal.removeEventListener('abort', cleanup);
cleanupCombinedSignal();
};

combinedSignal.addEventListener('abort', cleanup);

return {
Expand Down
22 changes: 13 additions & 9 deletions src/plugins/expressions/common/execution/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export class Execution<
* Races a given promise against the "abort" event of `abortController`.
*/
private race<T>(promise: Promise<T>): Promise<T> {
return Promise.race<T>([this.abortRejection, promise]);
return Promise.race<T>([this.abortRejection.promise, promise]);
}

/**
Expand Down Expand Up @@ -189,14 +189,18 @@ export class Execution<
else reject(error);
});

this.firstResultFuture.promise.then(
(result) => {
this.state.transitions.setResult(result);
},
(error) => {
this.state.transitions.setError(error);
}
);
this.firstResultFuture.promise
.then(
(result) => {
this.state.transitions.setResult(result);
},
(error) => {
this.state.transitions.setError(error);
}
)
.finally(() => {
this.abortRejection.cleanup();
});
}

async invokeChain(chainArr: ExpressionAstFunction[], input: unknown): Promise<any> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe('ExecutionContract', () => {

test('can cancel execution', () => {
const execution = createExecution('foo bar=123');
const spy = jest.spyOn(execution, 'cancel');
const spy = jest.spyOn(execution, 'cancel').mockImplementation(() => {});
Copy link
Contributor Author

@Dosant Dosant Oct 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that test were silently failing tests (on master)

(node:57027) UnhandledPromiseRejectionWarning: AbortError: Aborted
(node:57027) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:57027) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

I assume that it warns is that cancel is called without start and there is no one subscribed to handled inner AbortRejection promise of the execution.
I mocked it, since this unit test for execution_contract and we are not testing an execution here.

Copy link
Contributor Author

@Dosant Dosant Oct 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just fixed that particular higher level test, but really, probably we should also handle and test edge case:

const ex = new Execution();
ex.cancel() // ?? what happens? 
ex.start() // ?? what happens? 

const contract = new ExecutionContract(execution);

expect(spy).toHaveBeenCalledTimes(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
abortSignal: options.abortSignal,
timeout: this.searchTimeout,
});
const aborted$ = from(toPromise(combinedSignal));
const abortedPromise = toPromise(combinedSignal);
const strategy = options?.strategy || ENHANCED_ES_SEARCH_STRATEGY;

this.pendingCount$.next(this.pendingCount$.getValue() + 1);
Expand All @@ -90,7 +90,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
})
);
}),
takeUntil(aborted$),
takeUntil(from(abortedPromise.promise)),
catchError((e: any) => {
// If we haven't received the response to the initial request, including the ID, then
// we don't need to send a follow-up request to delete this search. Otherwise, we
Expand All @@ -103,6 +103,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
finalize(() => {
this.pendingCount$.next(this.pendingCount$.getValue() - 1);
cleanup();
abortedPromise.cleanup();
})
);
}
Expand Down