Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search] Add cancel function to pollSearch #85787

Merged
merged 21 commits into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions x-pack/plugins/data_enhanced/common/search/poll_search.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { pollSearch } from './poll_search';
import { AbortError } from '../../../../../src/plugins/kibana_utils/common';

describe('pollSearch', () => {
function getMockedSearch$(resolveOnI = 1, finishWithError = false) {
let counter = 0;
return jest.fn().mockImplementation(() => {
counter++;
const lastCall = counter === resolveOnI;
return new Promise((resolve) => {
if (lastCall) {
resolve({
isRunning: false,
isPartial: finishWithError,
});
} else {
resolve({
isRunning: true,
isPartial: true,
});
}
});
});
}

test('Defers execution', async () => {
const searchFn = getMockedSearch$(1);
const cancelFn = jest.fn();
pollSearch(searchFn, cancelFn);
expect(searchFn).toBeCalledTimes(0);
expect(cancelFn).toBeCalledTimes(0);
});

test('Resolves immediatelly', async () => {
const searchFn = getMockedSearch$(1);
const cancelFn = jest.fn();
await pollSearch(searchFn, cancelFn).toPromise();
expect(searchFn).toBeCalledTimes(1);
expect(cancelFn).toBeCalledTimes(0);
});

test('Resolves when complete', async () => {
const searchFn = getMockedSearch$(3);
const cancelFn = jest.fn();
await pollSearch(searchFn, cancelFn).toPromise();
expect(searchFn).toBeCalledTimes(3);
expect(cancelFn).toBeCalledTimes(0);
});

test('Throws Error on ES error response', async () => {
const searchFn = getMockedSearch$(2, true);
const cancelFn = jest.fn();
const poll = pollSearch(searchFn, cancelFn).toPromise();
await expect(poll).rejects.toThrow(Error);
expect(searchFn).toBeCalledTimes(2);
expect(cancelFn).toBeCalledTimes(0);
});

test('Throws AbortError on empty response', async () => {
const searchFn = jest.fn().mockResolvedValue(undefined);
const cancelFn = jest.fn();
const poll = pollSearch(searchFn, cancelFn).toPromise();
await expect(poll).rejects.toThrow(AbortError);
expect(searchFn).toBeCalledTimes(1);
expect(cancelFn).toBeCalledTimes(0);
});

test('Throws AbortError and cancels on abort', async () => {
const searchFn = getMockedSearch$(20);
const cancelFn = jest.fn();
const abortController = new AbortController();
const poll = pollSearch(searchFn, cancelFn, {
abortSignal: abortController.signal,
}).toPromise();

await new Promise((resolve) => setTimeout(resolve, 500));
abortController.abort();

await expect(poll).rejects.toThrow(AbortError);

await new Promise((resolve) => setTimeout(resolve, 1000));

expect(searchFn).toBeCalledTimes(1);
expect(cancelFn).toBeCalledTimes(1);
});

test("Stops, but doesn't cancel on unsubscribe", async () => {
const searchFn = getMockedSearch$(20);
const cancelFn = jest.fn();
const subscription = pollSearch(searchFn, cancelFn).subscribe(() => {});

await new Promise((resolve) => setTimeout(resolve, 500));
subscription.unsubscribe();
await new Promise((resolve) => setTimeout(resolve, 1000));

expect(searchFn).toBeCalledTimes(1);
expect(cancelFn).toBeCalledTimes(0);
});

test('Calls cancel even when consumer unsubscribes', async () => {
const searchFn = getMockedSearch$(20);
const cancelFn = jest.fn();
const abortController = new AbortController();
const subscription = pollSearch(searchFn, cancelFn, {
abortSignal: abortController.signal,
}).subscribe(() => {});
subscription.unsubscribe();
abortController.abort();

expect(searchFn).toBeCalledTimes(1);
expect(cancelFn).toBeCalledTimes(1);
});
});
46 changes: 30 additions & 16 deletions x-pack/plugins/data_enhanced/common/search/poll_search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,42 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { from, NEVER, Observable, timer } from 'rxjs';
import { expand, finalize, switchMap, takeUntil, takeWhile, tap } from 'rxjs/operators';
import { from, Observable, timer, defer, fromEvent, EMPTY } from 'rxjs';
import { expand, map, switchMap, takeUntil, takeWhile, tap } from 'rxjs/operators';
import type { IKibanaSearchResponse } from '../../../../../src/plugins/data/common';
import { isErrorResponse, isPartialResponse } from '../../../../../src/plugins/data/common';
import { AbortError, abortSignalToPromise } from '../../../../../src/plugins/kibana_utils/common';
import { AbortError } from '../../../../../src/plugins/kibana_utils/common';
import type { IAsyncSearchOptions } from './types';

export const pollSearch = <Response extends IKibanaSearchResponse>(
search: () => Promise<Response>,
{ pollInterval = 1000, ...options }: IAsyncSearchOptions = {}
cancel?: () => void,
{ pollInterval = 1000, abortSignal }: IAsyncSearchOptions = {}
): Observable<Response> => {
const aborted = options?.abortSignal
? abortSignalToPromise(options?.abortSignal)
: { promise: NEVER, cleanup: () => {} };
return defer(() => {
if (abortSignal?.aborted) {
throw new AbortError();
}

return from(search()).pipe(
expand(() => timer(pollInterval).pipe(switchMap(search))),
tap((response) => {
if (isErrorResponse(response)) throw new AbortError();
}),
takeWhile<Response>(isPartialResponse, true),
takeUntil<Response>(from(aborted.promise)),
finalize(aborted.cleanup)
);
if (cancel) {
abortSignal?.addEventListener('abort', cancel, { once: true });
}

const aborted$ = (abortSignal ? fromEvent(abortSignal, 'abort') : EMPTY).pipe(
map(() => {
throw new AbortError();
})
);

return from(search()).pipe(
expand(() => timer(pollInterval).pipe(switchMap(search))),
tap((response) => {
if (isErrorResponse(response)) {
throw response ? new Error('Received partial response') : new AbortError();
}
}),
takeWhile<Response>(isPartialResponse, true),
takeUntil<Response>(aborted$)
);
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ describe('EnhancedSearchInterceptor', () => {
await timeTravel(10);

expect(error).toHaveBeenCalled();
expect(error.mock.calls[0][0]).toBeInstanceOf(AbortError);
expect(error.mock.calls[0][0]).toBeInstanceOf(Error);
});

test('should abort on user abort', async () => {
Expand Down Expand Up @@ -262,7 +262,7 @@ describe('EnhancedSearchInterceptor', () => {
expect(error.mock.calls[0][0]).toBeInstanceOf(AbortError);

expect(fetchMock).toHaveBeenCalledTimes(2);
expect(mockCoreSetup.http.delete).toHaveBeenCalled();
expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1);
});

test('should not DELETE a running async search on async timeout prior to first response', async () => {
Expand Down Expand Up @@ -326,7 +326,7 @@ describe('EnhancedSearchInterceptor', () => {
expect(error).toHaveBeenCalled();
expect(error.mock.calls[0][0]).toBeInstanceOf(SearchTimeoutError);
expect(fetchMock).toHaveBeenCalledTimes(2);
expect(mockCoreSetup.http.delete).toHaveBeenCalled();
expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1);
});

test('should DELETE a running async search on async timeout on error from fetch', async () => {
Expand All @@ -343,8 +343,6 @@ describe('EnhancedSearchInterceptor', () => {
time: 10,
value: {
error: 'oh no',
isPartial: false,
isRunning: false,
id: 1,
},
isError: true,
Expand All @@ -368,7 +366,7 @@ describe('EnhancedSearchInterceptor', () => {
expect(error).toHaveBeenCalled();
expect(error.mock.calls[0][0]).toBe(responses[1].value);
expect(fetchMock).toHaveBeenCalledTimes(2);
expect(mockCoreSetup.http.delete).toHaveBeenCalled();
expect(mockCoreSetup.http.delete).toHaveBeenCalledTimes(1);
});

test('should NOT DELETE a running SAVED async search on abort', async () => {
Expand Down
12 changes: 8 additions & 4 deletions x-pack/plugins/data_enhanced/public/search/search_interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { once } from 'lodash';
import { throwError, Subscription } from 'rxjs';
import { tap, finalize, catchError, filter, take, skip } from 'rxjs/operators';
import {
Expand All @@ -14,7 +15,6 @@ import {
IKibanaSearchRequest,
SearchSessionState,
} from '../../../../../src/plugins/data/public';
import { AbortError } from '../../../../../src/plugins/kibana_utils/common';
import { ENHANCED_ES_SEARCH_STRATEGY, IAsyncSearchOptions, pollSearch } from '../../common';

export class EnhancedSearchInterceptor extends SearchInterceptor {
Expand Down Expand Up @@ -84,10 +84,14 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
isSavedToBackground = true;
});

return pollSearch(search, { ...options, abortSignal: combinedSignal }).pipe(
const cancel = once(() => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elegant 👍

if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`);
});

return pollSearch(search, cancel, { ...options, abortSignal: combinedSignal }).pipe(
tap((response) => (id = response.id)),
catchError((e: AbortError) => {
if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`);
catchError((e: Error) => {
cancel();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The de-duplication is neat, but we have to guard against calling this when the abortSignal has been triggered, because that has already caused the DELETE request to be sent.

Suggested change
cancel();
if (!combinedSignal.aborted) {
cancel();
}

Otherwise it will be sent twice on explicit abort.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukasolson are you still considering to fix this scenario?

return throwError(this.handleSearchError(e, timeoutSignal, options));
}),
finalize(() => {
Expand Down
16 changes: 13 additions & 3 deletions x-pack/plugins/data_enhanced/server/search/eql_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import { tap } from 'rxjs/operators';
import type { Logger } from 'kibana/server';
import type { IScopedClusterClient, Logger } from 'kibana/server';
import type { ISearchStrategy } from '../../../../../src/plugins/data/server';
import type {
EqlSearchStrategyRequest,
Expand All @@ -21,10 +21,14 @@ import { EqlSearchResponse } from './types';
export const eqlSearchStrategyProvider = (
logger: Logger
): ISearchStrategy<EqlSearchStrategyRequest, EqlSearchStrategyResponse> => {
async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) {
await esClient.asCurrentUser.asyncSearch.delete({ id });
}

return {
cancel: async (id, options, { esClient }) => {
logger.debug(`_eql/delete ${id}`);
await esClient.asCurrentUser.eql.delete({ id });
await cancelAsyncSearch(id, esClient);
},

search: ({ id, ...request }, options: IAsyncSearchOptions, { esClient, uiSettingsClient }) => {
Expand Down Expand Up @@ -54,7 +58,13 @@ export const eqlSearchStrategyProvider = (
return toEqlKibanaSearchResponse(response);
};

return pollSearch(search, options).pipe(tap((response) => (id = response.id)));
const cancel = async () => {
if (id) {
await cancelAsyncSearch(id, esClient);
}
};

return pollSearch(search, cancel, options).pipe(tap((response) => (id = response.id)));
},

extend: async (id, keepAlive, options, { esClient }) => {
Expand Down
16 changes: 13 additions & 3 deletions x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import type { Observable } from 'rxjs';
import type { Logger, SharedGlobalConfig } from 'kibana/server';
import type { IScopedClusterClient, Logger, SharedGlobalConfig } from 'kibana/server';
import { first, tap } from 'rxjs/operators';
import { SearchResponse } from 'elasticsearch';
import { from } from 'rxjs';
Expand Down Expand Up @@ -40,6 +40,10 @@ export const enhancedEsSearchStrategyProvider = (
logger: Logger,
usage?: SearchUsage
): ISearchStrategy<IEsSearchRequest> => {
async function cancelAsyncSearch(id: string, esClient: IScopedClusterClient) {
await esClient.asCurrentUser.asyncSearch.delete({ id });
}

function asyncSearch(
{ id, ...request }: IEsSearchRequest,
options: IAsyncSearchOptions,
Expand All @@ -58,7 +62,13 @@ export const enhancedEsSearchStrategyProvider = (
return toAsyncKibanaSearchResponse(body);
};

return pollSearch(search, options).pipe(
const cancel = async () => {
lizozom marked this conversation as resolved.
Show resolved Hide resolved
if (id) {
await cancelAsyncSearch(id, esClient);
}
};

return pollSearch(search, cancel, options).pipe(
tap((response) => (id = response.id)),
tap(searchUsageObserver(logger, usage))
);
Expand Down Expand Up @@ -109,7 +119,7 @@ export const enhancedEsSearchStrategyProvider = (
},
cancel: async (id, options, { esClient }) => {
logger.debug(`cancel ${id}`);
await esClient.asCurrentUser.asyncSearch.delete({ id });
await cancelAsyncSearch(id, esClient);
},
extend: async (id, keepAlive, options, { esClient }) => {
logger.debug(`extend ${id} by ${keepAlive}`);
Expand Down