From 7833fb36f18eb30f9c50298adf7c05c99c271822 Mon Sep 17 00:00:00 2001 From: Anton Dosov Date: Fri, 30 Oct 2020 17:38:37 +0100 Subject: [PATCH] [Search] fix cancelation related memory leaks (#81996) --- .../data/common/utils/abort_utils.test.ts | 79 +++++++++++++++++-- src/plugins/data/common/utils/abort_utils.ts | 35 ++++++-- .../data/public/search/search_interceptor.ts | 7 +- .../expressions/common/execution/execution.ts | 22 +++--- .../execution/execution_contract.test.ts | 2 +- .../public/search/search_interceptor.ts | 5 +- 6 files changed, 121 insertions(+), 29 deletions(-) diff --git a/src/plugins/data/common/utils/abort_utils.test.ts b/src/plugins/data/common/utils/abort_utils.test.ts index ca187e95f528ba..358f00e5e82bd9 100644 --- a/src/plugins/data/common/utils/abort_utils.test.ts +++ b/src/plugins/data/common/utils/abort_utils.test.ts @@ -41,7 +41,7 @@ describe('AbortUtils', () => { describe('rejects', () => { test('should not reject if the signal does not abort', async () => { const controller = new AbortController(); - const promise = toPromise(controller.signal); + const promise = toPromise(controller.signal).promise; const whenRejected = jest.fn(); promise.catch(whenRejected); await flushPromises(); @@ -50,7 +50,7 @@ describe('AbortUtils', () => { test('should reject if the signal does abort', async () => { const controller = new AbortController(); - const promise = toPromise(controller.signal); + const promise = toPromise(controller.signal).promise; const whenRejected = jest.fn(); promise.catch(whenRejected); controller.abort(); @@ -58,13 +58,30 @@ describe('AbortUtils', () => { expect(whenRejected).toBeCalled(); expect(whenRejected.mock.calls[0][0]).toBeInstanceOf(AbortError); }); + + test('should expose cleanup handler', () => { + const controller = new AbortController(); + const promise = toPromise(controller.signal); + expect(promise.cleanup).toBeDefined(); + }); + + test('calling clean up handler prevents rejects', async () => { + const controller = new AbortController(); + const { promise, cleanup } = toPromise(controller.signal); + const whenRejected = jest.fn(); + promise.catch(whenRejected); + cleanup(); + controller.abort(); + await flushPromises(); + expect(whenRejected).not.toBeCalled(); + }); }); }); describe('getCombinedSignal', () => { test('should return an AbortSignal', () => { - const signal = getCombinedSignal([]); - expect(signal instanceof AbortSignal).toBe(true); + const signal = getCombinedSignal([]).signal; + expect(signal).toBeInstanceOf(AbortSignal); }); test('should not abort if none of the signals abort', async () => { @@ -72,7 +89,7 @@ describe('AbortUtils', () => { const controller2 = new AbortController(); setTimeout(() => controller1.abort(), 2000); setTimeout(() => controller2.abort(), 1000); - const signal = getCombinedSignal([controller1.signal, controller2.signal]); + const signal = getCombinedSignal([controller1.signal, controller2.signal]).signal; expect(signal.aborted).toBe(false); jest.advanceTimersByTime(500); await flushPromises(); @@ -84,7 +101,7 @@ describe('AbortUtils', () => { const controller2 = new AbortController(); setTimeout(() => controller1.abort(), 2000); setTimeout(() => controller2.abort(), 1000); - const signal = getCombinedSignal([controller1.signal, controller2.signal]); + const signal = getCombinedSignal([controller1.signal, controller2.signal]).signal; expect(signal.aborted).toBe(false); jest.advanceTimersByTime(1000); await flushPromises(); @@ -95,8 +112,56 @@ describe('AbortUtils', () => { const controller1 = new AbortController(); const controller2 = new AbortController(); controller1.abort(); - const signal = getCombinedSignal([controller1.signal, controller2.signal]); + const signal = getCombinedSignal([controller1.signal, controller2.signal]).signal; expect(signal.aborted).toBe(true); }); + + describe('cleanup listener', () => { + const createMockController = () => { + const controller = new AbortController(); + const spyAddListener = jest.spyOn(controller.signal, 'addEventListener'); + const spyRemoveListener = jest.spyOn(controller.signal, 'removeEventListener'); + return { + controller, + getTotalListeners: () => + Math.max(spyAddListener.mock.calls.length - spyRemoveListener.mock.calls.length, 0), + }; + }; + + test('cleanup should cleanup inner listeners', () => { + const controller1 = createMockController(); + const controller2 = createMockController(); + + const { cleanup } = getCombinedSignal([ + controller1.controller.signal, + controller2.controller.signal, + ]); + + expect(controller1.getTotalListeners()).toBe(1); + expect(controller2.getTotalListeners()).toBe(1); + + cleanup(); + + expect(controller1.getTotalListeners()).toBe(0); + expect(controller2.getTotalListeners()).toBe(0); + }); + + test('abort should cleanup inner listeners', async () => { + const controller1 = createMockController(); + const controller2 = createMockController(); + + getCombinedSignal([controller1.controller.signal, controller2.controller.signal]); + + expect(controller1.getTotalListeners()).toBe(1); + expect(controller2.getTotalListeners()).toBe(1); + + controller1.controller.abort(); + + await flushPromises(); + + expect(controller1.getTotalListeners()).toBe(0); + expect(controller2.getTotalListeners()).toBe(0); + }); + }); }); }); diff --git a/src/plugins/data/common/utils/abort_utils.ts b/src/plugins/data/common/utils/abort_utils.ts index a26fec9423f832..81f30b7454c7b4 100644 --- a/src/plugins/data/common/utils/abort_utils.ts +++ b/src/plugins/data/common/utils/abort_utils.ts @@ -36,15 +36,23 @@ export class AbortError extends Error { * * @param signal The `AbortSignal` to generate the `Promise` from */ -export function toPromise(signal: AbortSignal): Promise { - return new Promise((resolve, reject) => { - if (signal.aborted) reject(new AbortError()); - const abortHandler = () => { +export function toPromise(signal: AbortSignal): { promise: Promise; cleanup: () => void } { + let abortHandler: () => void; + const cleanup = () => { + if (abortHandler) { signal.removeEventListener('abort', abortHandler); + } + }; + const promise = new Promise((resolve, reject) => { + if (signal.aborted) reject(new AbortError()); + abortHandler = () => { + cleanup(); reject(new AbortError()); }; signal.addEventListener('abort', abortHandler); }); + + return { promise, cleanup }; } /** @@ -52,13 +60,26 @@ export function toPromise(signal: AbortSignal): Promise { * * @param signals */ -export function getCombinedSignal(signals: AbortSignal[]) { +export function getCombinedSignal( + signals: AbortSignal[] +): { signal: AbortSignal; cleanup: () => void } { const controller = new AbortController(); + let cleanup = () => {}; + if (signals.some((signal) => signal.aborted)) { controller.abort(); } else { const promises = signals.map((signal) => toPromise(signal)); - Promise.race(promises).catch(() => controller.abort()); + cleanup = () => { + promises.forEach((p) => p.cleanup()); + controller.signal.removeEventListener('abort', cleanup); + }; + controller.signal.addEventListener('abort', cleanup); + Promise.race(promises.map((p) => p.promise)).catch(() => { + cleanup(); + controller.abort(); + }); } - return controller.signal; + + return { signal: controller.signal, cleanup }; } diff --git a/src/plugins/data/public/search/search_interceptor.ts b/src/plugins/data/public/search/search_interceptor.ts index 087ca9e4f5c479..0f069d54ee9c8b 100644 --- a/src/plugins/data/public/search/search_interceptor.ts +++ b/src/plugins/data/public/search/search_interceptor.ts @@ -24,13 +24,13 @@ import { PublicMethodsOf } from '@kbn/utility-types'; import { CoreStart, CoreSetup, ToastsSetup } from 'kibana/public'; import { i18n } from '@kbn/i18n'; import { - getCombinedSignal, AbortError, IKibanaSearchRequest, IKibanaSearchResponse, ISearchOptions, ES_SEARCH_STRATEGY, ISessionService, + getCombinedSignal, } from '../../common'; import { SearchUsageCollector } from './collectors'; import { @@ -171,11 +171,12 @@ export class SearchInterceptor { ...(abortSignal ? [abortSignal] : []), ]; - const combinedSignal = getCombinedSignal(signals); + const { signal: combinedSignal, cleanup: cleanupCombinedSignal } = getCombinedSignal(signals); const cleanup = () => { subscription.unsubscribe(); + combinedSignal.removeEventListener('abort', cleanup); + cleanupCombinedSignal(); }; - combinedSignal.addEventListener('abort', cleanup); return { diff --git a/src/plugins/expressions/common/execution/execution.ts b/src/plugins/expressions/common/execution/execution.ts index 2bcf441b14203e..ba115a7538604d 100644 --- a/src/plugins/expressions/common/execution/execution.ts +++ b/src/plugins/expressions/common/execution/execution.ts @@ -99,7 +99,7 @@ export class Execution< * Races a given promise against the "abort" event of `abortController`. */ private race(promise: Promise): Promise { - return Promise.race([this.abortRejection, promise]); + return Promise.race([this.abortRejection.promise, promise]); } /** @@ -189,14 +189,18 @@ export class Execution< else reject(error); }); - this.firstResultFuture.promise.then( - (result) => { - this.state.transitions.setResult(result); - }, - (error) => { - this.state.transitions.setError(error); - } - ); + this.firstResultFuture.promise + .then( + (result) => { + this.state.transitions.setResult(result); + }, + (error) => { + this.state.transitions.setError(error); + } + ) + .finally(() => { + this.abortRejection.cleanup(); + }); } async invokeChain(chainArr: ExpressionAstFunction[], input: unknown): Promise { diff --git a/src/plugins/expressions/common/execution/execution_contract.test.ts b/src/plugins/expressions/common/execution/execution_contract.test.ts index 856b22470d782b..eaf7e6ea862eb4 100644 --- a/src/plugins/expressions/common/execution/execution_contract.test.ts +++ b/src/plugins/expressions/common/execution/execution_contract.test.ts @@ -59,7 +59,7 @@ describe('ExecutionContract', () => { test('can cancel execution', () => { const execution = createExecution('foo bar=123'); - const spy = jest.spyOn(execution, 'cancel'); + const spy = jest.spyOn(execution, 'cancel').mockImplementation(() => {}); const contract = new ExecutionContract(execution); expect(spy).toHaveBeenCalledTimes(0); diff --git a/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts b/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts index aee32a7c62759d..3226ecf6f0ddab 100644 --- a/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts +++ b/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts @@ -64,7 +64,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor { abortSignal: options.abortSignal, timeout: this.searchTimeout, }); - const aborted$ = from(toPromise(combinedSignal)); + const abortedPromise = toPromise(combinedSignal); const strategy = options?.strategy || ENHANCED_ES_SEARCH_STRATEGY; this.pendingCount$.next(this.pendingCount$.getValue() + 1); @@ -90,7 +90,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor { }) ); }), - takeUntil(aborted$), + takeUntil(from(abortedPromise.promise)), catchError((e: any) => { // If we haven't received the response to the initial request, including the ID, then // we don't need to send a follow-up request to delete this search. Otherwise, we @@ -103,6 +103,7 @@ export class EnhancedSearchInterceptor extends SearchInterceptor { finalize(() => { this.pendingCount$.next(this.pendingCount$.getValue() - 1); cleanup(); + abortedPromise.cleanup(); }) ); }