Skip to content

Commit

Permalink
[Search] fix cancelation related memory leaks (#81996)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dosant authored Oct 30, 2020
1 parent f5b1fae commit 7833fb3
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 29 deletions.
79 changes: 72 additions & 7 deletions src/plugins/data/common/utils/abort_utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ describe('AbortUtils', () => {
describe('rejects', () => {
test('should not reject if the signal does not abort', async () => {
const controller = new AbortController();
const promise = toPromise(controller.signal);
const promise = toPromise(controller.signal).promise;
const whenRejected = jest.fn();
promise.catch(whenRejected);
await flushPromises();
Expand All @@ -50,29 +50,46 @@ describe('AbortUtils', () => {

test('should reject if the signal does abort', async () => {
const controller = new AbortController();
const promise = toPromise(controller.signal);
const promise = toPromise(controller.signal).promise;
const whenRejected = jest.fn();
promise.catch(whenRejected);
controller.abort();
await flushPromises();
expect(whenRejected).toBeCalled();
expect(whenRejected.mock.calls[0][0]).toBeInstanceOf(AbortError);
});

test('should expose cleanup handler', () => {
const controller = new AbortController();
const promise = toPromise(controller.signal);
expect(promise.cleanup).toBeDefined();
});

test('calling clean up handler prevents rejects', async () => {
const controller = new AbortController();
const { promise, cleanup } = toPromise(controller.signal);
const whenRejected = jest.fn();
promise.catch(whenRejected);
cleanup();
controller.abort();
await flushPromises();
expect(whenRejected).not.toBeCalled();
});
});
});

describe('getCombinedSignal', () => {
test('should return an AbortSignal', () => {
const signal = getCombinedSignal([]);
expect(signal instanceof AbortSignal).toBe(true);
const signal = getCombinedSignal([]).signal;
expect(signal).toBeInstanceOf(AbortSignal);
});

test('should not abort if none of the signals abort', async () => {
const controller1 = new AbortController();
const controller2 = new AbortController();
setTimeout(() => controller1.abort(), 2000);
setTimeout(() => controller2.abort(), 1000);
const signal = getCombinedSignal([controller1.signal, controller2.signal]);
const signal = getCombinedSignal([controller1.signal, controller2.signal]).signal;
expect(signal.aborted).toBe(false);
jest.advanceTimersByTime(500);
await flushPromises();
Expand All @@ -84,7 +101,7 @@ describe('AbortUtils', () => {
const controller2 = new AbortController();
setTimeout(() => controller1.abort(), 2000);
setTimeout(() => controller2.abort(), 1000);
const signal = getCombinedSignal([controller1.signal, controller2.signal]);
const signal = getCombinedSignal([controller1.signal, controller2.signal]).signal;
expect(signal.aborted).toBe(false);
jest.advanceTimersByTime(1000);
await flushPromises();
Expand All @@ -95,8 +112,56 @@ describe('AbortUtils', () => {
const controller1 = new AbortController();
const controller2 = new AbortController();
controller1.abort();
const signal = getCombinedSignal([controller1.signal, controller2.signal]);
const signal = getCombinedSignal([controller1.signal, controller2.signal]).signal;
expect(signal.aborted).toBe(true);
});

describe('cleanup listener', () => {
const createMockController = () => {
const controller = new AbortController();
const spyAddListener = jest.spyOn(controller.signal, 'addEventListener');
const spyRemoveListener = jest.spyOn(controller.signal, 'removeEventListener');
return {
controller,
getTotalListeners: () =>
Math.max(spyAddListener.mock.calls.length - spyRemoveListener.mock.calls.length, 0),
};
};

test('cleanup should cleanup inner listeners', () => {
const controller1 = createMockController();
const controller2 = createMockController();

const { cleanup } = getCombinedSignal([
controller1.controller.signal,
controller2.controller.signal,
]);

expect(controller1.getTotalListeners()).toBe(1);
expect(controller2.getTotalListeners()).toBe(1);

cleanup();

expect(controller1.getTotalListeners()).toBe(0);
expect(controller2.getTotalListeners()).toBe(0);
});

test('abort should cleanup inner listeners', async () => {
const controller1 = createMockController();
const controller2 = createMockController();

getCombinedSignal([controller1.controller.signal, controller2.controller.signal]);

expect(controller1.getTotalListeners()).toBe(1);
expect(controller2.getTotalListeners()).toBe(1);

controller1.controller.abort();

await flushPromises();

expect(controller1.getTotalListeners()).toBe(0);
expect(controller2.getTotalListeners()).toBe(0);
});
});
});
});
35 changes: 28 additions & 7 deletions src/plugins/data/common/utils/abort_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,50 @@ export class AbortError extends Error {
*
* @param signal The `AbortSignal` to generate the `Promise` from
*/
export function toPromise(signal: AbortSignal): Promise<never> {
return new Promise((resolve, reject) => {
if (signal.aborted) reject(new AbortError());
const abortHandler = () => {
export function toPromise(signal: AbortSignal): { promise: Promise<never>; cleanup: () => void } {
let abortHandler: () => void;
const cleanup = () => {
if (abortHandler) {
signal.removeEventListener('abort', abortHandler);
}
};
const promise = new Promise<never>((resolve, reject) => {
if (signal.aborted) reject(new AbortError());
abortHandler = () => {
cleanup();
reject(new AbortError());
};
signal.addEventListener('abort', abortHandler);
});

return { promise, cleanup };
}

/**
* Returns an `AbortSignal` that will be aborted when the first of the given signals aborts.
*
* @param signals
*/
export function getCombinedSignal(signals: AbortSignal[]) {
export function getCombinedSignal(
signals: AbortSignal[]
): { signal: AbortSignal; cleanup: () => void } {
const controller = new AbortController();
let cleanup = () => {};

if (signals.some((signal) => signal.aborted)) {
controller.abort();
} else {
const promises = signals.map((signal) => toPromise(signal));
Promise.race(promises).catch(() => controller.abort());
cleanup = () => {
promises.forEach((p) => p.cleanup());
controller.signal.removeEventListener('abort', cleanup);
};
controller.signal.addEventListener('abort', cleanup);
Promise.race(promises.map((p) => p.promise)).catch(() => {
cleanup();
controller.abort();
});
}
return controller.signal;

return { signal: controller.signal, cleanup };
}
7 changes: 4 additions & 3 deletions src/plugins/data/public/search/search_interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import { PublicMethodsOf } from '@kbn/utility-types';
import { CoreStart, CoreSetup, ToastsSetup } from 'kibana/public';
import { i18n } from '@kbn/i18n';
import {
getCombinedSignal,
AbortError,
IKibanaSearchRequest,
IKibanaSearchResponse,
ISearchOptions,
ES_SEARCH_STRATEGY,
ISessionService,
getCombinedSignal,
} from '../../common';
import { SearchUsageCollector } from './collectors';
import {
Expand Down Expand Up @@ -171,11 +171,12 @@ export class SearchInterceptor {
...(abortSignal ? [abortSignal] : []),
];

const combinedSignal = getCombinedSignal(signals);
const { signal: combinedSignal, cleanup: cleanupCombinedSignal } = getCombinedSignal(signals);
const cleanup = () => {
subscription.unsubscribe();
combinedSignal.removeEventListener('abort', cleanup);
cleanupCombinedSignal();
};

combinedSignal.addEventListener('abort', cleanup);

return {
Expand Down
22 changes: 13 additions & 9 deletions src/plugins/expressions/common/execution/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export class Execution<
* Races a given promise against the "abort" event of `abortController`.
*/
private race<T>(promise: Promise<T>): Promise<T> {
return Promise.race<T>([this.abortRejection, promise]);
return Promise.race<T>([this.abortRejection.promise, promise]);
}

/**
Expand Down Expand Up @@ -189,14 +189,18 @@ export class Execution<
else reject(error);
});

this.firstResultFuture.promise.then(
(result) => {
this.state.transitions.setResult(result);
},
(error) => {
this.state.transitions.setError(error);
}
);
this.firstResultFuture.promise
.then(
(result) => {
this.state.transitions.setResult(result);
},
(error) => {
this.state.transitions.setError(error);
}
)
.finally(() => {
this.abortRejection.cleanup();
});
}

async invokeChain(chainArr: ExpressionAstFunction[], input: unknown): Promise<any> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe('ExecutionContract', () => {

test('can cancel execution', () => {
const execution = createExecution('foo bar=123');
const spy = jest.spyOn(execution, 'cancel');
const spy = jest.spyOn(execution, 'cancel').mockImplementation(() => {});
const contract = new ExecutionContract(execution);

expect(spy).toHaveBeenCalledTimes(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
abortSignal: options.abortSignal,
timeout: this.searchTimeout,
});
const aborted$ = from(toPromise(combinedSignal));
const abortedPromise = toPromise(combinedSignal);
const strategy = options?.strategy || ENHANCED_ES_SEARCH_STRATEGY;

this.pendingCount$.next(this.pendingCount$.getValue() + 1);
Expand All @@ -90,7 +90,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
})
);
}),
takeUntil(aborted$),
takeUntil(from(abortedPromise.promise)),
catchError((e: any) => {
// If we haven't received the response to the initial request, including the ID, then
// we don't need to send a follow-up request to delete this search. Otherwise, we
Expand All @@ -103,6 +103,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
finalize(() => {
this.pendingCount$.next(this.pendingCount$.getValue() - 1);
cleanup();
abortedPromise.cleanup();
})
);
}
Expand Down

0 comments on commit 7833fb3

Please sign in to comment.