diff --git a/x-pack/plugins/data_enhanced/common/search/poll_search.test.ts b/x-pack/plugins/data_enhanced/common/search/poll_search.test.ts new file mode 100644 index 00000000000000..ddb595c90444f3 --- /dev/null +++ b/x-pack/plugins/data_enhanced/common/search/poll_search.test.ts @@ -0,0 +1,119 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { pollSearch } from './poll_search'; +import { AbortError } from '../../../../../src/plugins/kibana_utils/common'; + +describe('pollSearch', () => { + function getMockedSearch$(resolveOnI = 1, finishWithError = false) { + let counter = 0; + return jest.fn().mockImplementation(() => { + counter++; + const lastCall = counter === resolveOnI; + return new Promise((resolve) => { + if (lastCall) { + resolve({ + isRunning: false, + isPartial: finishWithError, + }); + } else { + resolve({ + isRunning: true, + isPartial: true, + }); + } + }); + }); + } + + test('Defers execution', async () => { + const searchFn = getMockedSearch$(1); + const cancelFn = jest.fn(); + pollSearch(searchFn, cancelFn); + expect(searchFn).toBeCalledTimes(0); + expect(cancelFn).toBeCalledTimes(0); + }); + + test('Resolves immediatelly', async () => { + const searchFn = getMockedSearch$(1); + const cancelFn = jest.fn(); + await pollSearch(searchFn, cancelFn).toPromise(); + expect(searchFn).toBeCalledTimes(1); + expect(cancelFn).toBeCalledTimes(0); + }); + + test('Resolves when complete', async () => { + const searchFn = getMockedSearch$(3); + const cancelFn = jest.fn(); + await pollSearch(searchFn, cancelFn).toPromise(); + expect(searchFn).toBeCalledTimes(3); + expect(cancelFn).toBeCalledTimes(0); + }); + + test('Throws Error on ES error response', async () => { + const searchFn = getMockedSearch$(2, true); + const cancelFn = jest.fn(); + const poll = pollSearch(searchFn, cancelFn).toPromise(); + await expect(poll).rejects.toThrow(Error); + expect(searchFn).toBeCalledTimes(2); + expect(cancelFn).toBeCalledTimes(0); + }); + + test('Throws AbortError on empty response', async () => { + const searchFn = jest.fn().mockResolvedValue(undefined); + const cancelFn = jest.fn(); + const poll = pollSearch(searchFn, cancelFn).toPromise(); + await expect(poll).rejects.toThrow(AbortError); + expect(searchFn).toBeCalledTimes(1); + expect(cancelFn).toBeCalledTimes(0); + }); + + test('Throws AbortError and cancels on abort', async () => { + const searchFn = getMockedSearch$(20); + const cancelFn = jest.fn(); + const abortController = new AbortController(); + const poll = pollSearch(searchFn, cancelFn, { + abortSignal: abortController.signal, + }).toPromise(); + + await new Promise((resolve) => setTimeout(resolve, 500)); + abortController.abort(); + + await expect(poll).rejects.toThrow(AbortError); + + await new Promise((resolve) => setTimeout(resolve, 1000)); + + expect(searchFn).toBeCalledTimes(1); + expect(cancelFn).toBeCalledTimes(1); + }); + + test("Stops, but doesn't cancel on unsubscribe", async () => { + const searchFn = getMockedSearch$(20); + const cancelFn = jest.fn(); + const subscription = pollSearch(searchFn, cancelFn).subscribe(() => {}); + + await new Promise((resolve) => setTimeout(resolve, 500)); + subscription.unsubscribe(); + await new Promise((resolve) => setTimeout(resolve, 1000)); + + expect(searchFn).toBeCalledTimes(1); + expect(cancelFn).toBeCalledTimes(0); + }); + + test('Calls cancel even when consumer unsubscribes', async () => { + const searchFn = getMockedSearch$(20); + const cancelFn = jest.fn(); + const abortController = new AbortController(); + const subscription = pollSearch(searchFn, cancelFn, { + abortSignal: abortController.signal, + }).subscribe(() => {}); + subscription.unsubscribe(); + abortController.abort(); + + expect(searchFn).toBeCalledTimes(1); + expect(cancelFn).toBeCalledTimes(1); + }); +}); diff --git a/x-pack/plugins/data_enhanced/common/search/poll_search.ts b/x-pack/plugins/data_enhanced/common/search/poll_search.ts index c0e289c691cfda..2cc55836f8cc12 100644 --- a/x-pack/plugins/data_enhanced/common/search/poll_search.ts +++ b/x-pack/plugins/data_enhanced/common/search/poll_search.ts @@ -4,28 +4,42 @@ * you may not use this file except in compliance with the Elastic License. */ -import { from, NEVER, Observable, timer } from 'rxjs'; -import { expand, finalize, switchMap, takeUntil, takeWhile, tap } from 'rxjs/operators'; +import { from, Observable, timer, defer, fromEvent, EMPTY } from 'rxjs'; +import { expand, map, switchMap, takeUntil, takeWhile, tap } from 'rxjs/operators'; import type { IKibanaSearchResponse } from '../../../../../src/plugins/data/common'; import { isErrorResponse, isPartialResponse } from '../../../../../src/plugins/data/common'; -import { AbortError, abortSignalToPromise } from '../../../../../src/plugins/kibana_utils/common'; +import { AbortError } from '../../../../../src/plugins/kibana_utils/common'; import type { IAsyncSearchOptions } from './types'; export const pollSearch = ( search: () => Promise, - { pollInterval = 1000, ...options }: IAsyncSearchOptions = {} + cancel?: () => void, + { pollInterval = 1000, abortSignal }: IAsyncSearchOptions = {} ): Observable => { - const aborted = options?.abortSignal - ? abortSignalToPromise(options?.abortSignal) - : { promise: NEVER, cleanup: () => {} }; + return defer(() => { + if (abortSignal?.aborted) { + throw new AbortError(); + } - return from(search()).pipe( - expand(() => timer(pollInterval).pipe(switchMap(search))), - tap((response) => { - if (isErrorResponse(response)) throw new AbortError(); - }), - takeWhile(isPartialResponse, true), - takeUntil(from(aborted.promise)), - finalize(aborted.cleanup) - ); + if (cancel) { + abortSignal?.addEventListener('abort', cancel, { once: true }); + } + + const aborted$ = (abortSignal ? fromEvent(abortSignal, 'abort') : EMPTY).pipe( + map(() => { + throw new AbortError(); + }) + ); + + return from(search()).pipe( + expand(() => timer(pollInterval).pipe(switchMap(search))), + tap((response) => { + if (isErrorResponse(response)) { + throw response ? new Error('Received partial response') : new AbortError(); + } + }), + takeWhile(isPartialResponse, true), + takeUntil(aborted$) + ); + }); }; diff --git a/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts b/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts index d1bb672b985f4e..1a6fc724e2cf21 100644 --- a/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts +++ b/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts @@ -192,7 +192,7 @@ describe('EnhancedSearchInterceptor', () => { await timeTravel(10); expect(error).toHaveBeenCalled(); - expect(error.mock.calls[0][0]).toBeInstanceOf(AbortError); + expect(error.mock.calls[0][0]).toBeInstanceOf(Error); }); test('should abort on user abort', async () => { @@ -262,7 +262,7 @@ describe('EnhancedSearchInterceptor', () => { expect(error.mock.calls[0][0]).toBeInstanceOf(AbortError); expect(fetchMock).toHaveBeenCalledTimes(2); - expect(mockCoreSetup.http.delete).toHaveBeenCalled(); + expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1); }); test('should not DELETE a running async search on async timeout prior to first response', async () => { @@ -326,7 +326,7 @@ describe('EnhancedSearchInterceptor', () => { expect(error).toHaveBeenCalled(); expect(error.mock.calls[0][0]).toBeInstanceOf(SearchTimeoutError); expect(fetchMock).toHaveBeenCalledTimes(2); - expect(mockCoreSetup.http.delete).toHaveBeenCalled(); + expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1); }); test('should DELETE a running async search on async timeout on error from fetch', async () => { @@ -343,8 +343,6 @@ describe('EnhancedSearchInterceptor', () => { time: 10, value: { error: 'oh no', - isPartial: false, - isRunning: false, id: 1, }, isError: true, @@ -368,7 +366,7 @@ describe('EnhancedSearchInterceptor', () => { expect(error).toHaveBeenCalled(); expect(error.mock.calls[0][0]).toBe(responses[1].value); expect(fetchMock).toHaveBeenCalledTimes(2); - expect(mockCoreSetup.http.delete).toHaveBeenCalled(); + expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1); }); test('should NOT DELETE a running SAVED async search on abort', async () => { diff --git a/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts b/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts index 72d2cce49477ba..9145e35c4485f9 100644 --- a/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts +++ b/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts @@ -4,6 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ +import { once } from 'lodash'; import { throwError, Subscription } from 'rxjs'; import { tap, finalize, catchError, filter, take, skip } from 'rxjs/operators'; import { @@ -14,7 +15,6 @@ import { IKibanaSearchRequest, SearchSessionState, } from '../../../../../src/plugins/data/public'; -import { AbortError } from '../../../../../src/plugins/kibana_utils/common'; import { ENHANCED_ES_SEARCH_STRATEGY, IAsyncSearchOptions, pollSearch } from '../../common'; export class EnhancedSearchInterceptor extends SearchInterceptor { @@ -88,10 +88,14 @@ export class EnhancedSearchInterceptor extends SearchInterceptor { isSavedToBackground = true; }); - return pollSearch(search, { ...options, abortSignal: combinedSignal }).pipe( + const cancel = once(() => { + if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`); + }); + + return pollSearch(search, cancel, { ...options, abortSignal: combinedSignal }).pipe( tap((response) => (id = response.id)), - catchError((e: AbortError) => { - if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`); + catchError((e: Error) => { + cancel(); return throwError(this.handleSearchError(e, timeoutSignal, options)); }), finalize(() => { diff --git a/x-pack/plugins/data_enhanced/server/search/eql_search_strategy.ts b/x-pack/plugins/data_enhanced/server/search/eql_search_strategy.ts index e615d9d2a660ac..a0d4e9dcd19b99 100644 --- a/x-pack/plugins/data_enhanced/server/search/eql_search_strategy.ts +++ b/x-pack/plugins/data_enhanced/server/search/eql_search_strategy.ts @@ -5,7 +5,7 @@ */ import { tap } from 'rxjs/operators'; -import type { Logger } from 'kibana/server'; +import type { IScopedClusterClient, Logger } from 'kibana/server'; import type { ISearchStrategy } from '../../../../../src/plugins/data/server'; import type { EqlSearchStrategyRequest, @@ -21,10 +21,14 @@ import { EqlSearchResponse } from './types'; export const eqlSearchStrategyProvider = ( logger: Logger ): ISearchStrategy => { + async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) { + await esClient.asCurrentUser.asyncSearch.delete({ id }); + } + return { cancel: async (id, options, { esClient }) => { logger.debug(`_eql/delete ${id}`); - await esClient.asCurrentUser.eql.delete({ id }); + await cancelAsyncSearch(id, esClient); }, search: ({ id, ...request }, options: IAsyncSearchOptions, { esClient, uiSettingsClient }) => { @@ -54,7 +58,13 @@ export const eqlSearchStrategyProvider = ( return toEqlKibanaSearchResponse(response); }; - return pollSearch(search, options).pipe(tap((response) => (id = response.id))); + const cancel = async () => { + if (id) { + await cancelAsyncSearch(id, esClient); + } + }; + + return pollSearch(search, cancel, options).pipe(tap((response) => (id = response.id))); }, extend: async (id, keepAlive, options, { esClient }) => { diff --git a/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts b/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts index c1520d931c272e..54ed59b30952af 100644 --- a/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts +++ b/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts @@ -5,7 +5,7 @@ */ import type { Observable } from 'rxjs'; -import type { Logger, SharedGlobalConfig } from 'kibana/server'; +import type { IScopedClusterClient, Logger, SharedGlobalConfig } from 'kibana/server'; import { first, tap } from 'rxjs/operators'; import { SearchResponse } from 'elasticsearch'; import { from } from 'rxjs'; @@ -40,6 +40,10 @@ export const enhancedEsSearchStrategyProvider = ( logger: Logger, usage?: SearchUsage ): ISearchStrategy => { + async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) { + await esClient.asCurrentUser.asyncSearch.delete({ id }); + } + function asyncSearch( { id, ...request }: IEsSearchRequest, options: IAsyncSearchOptions, @@ -58,7 +62,13 @@ export const enhancedEsSearchStrategyProvider = ( return toAsyncKibanaSearchResponse(body); }; - return pollSearch(search, options).pipe( + const cancel = async () => { + if (id) { + await cancelAsyncSearch(id, esClient); + } + }; + + return pollSearch(search, cancel, options).pipe( tap((response) => (id = response.id)), tap(searchUsageObserver(logger, usage)) ); @@ -109,7 +119,7 @@ export const enhancedEsSearchStrategyProvider = ( }, cancel: async (id, options, { esClient }) => { logger.debug(`cancel ${id}`); - await esClient.asCurrentUser.asyncSearch.delete({ id }); + await cancelAsyncSearch(id, esClient); }, extend: async (id, keepAlive, options, { esClient }) => { logger.debug(`extend ${id} by ${keepAlive}`);