Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search] Add cancel function to pollSearch #85787

Merged
merged 21 commits into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions x-pack/plugins/data_enhanced/common/search/poll_search.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { pollSearch } from './poll_search';
import { AbortError } from '../../../../../src/plugins/kibana_utils/common';

describe('pollSearch', () => {
function getMockedSearch$(resolveOnI = 1, finishWithError = false) {
let counter = 0;
return jest.fn().mockImplementation(() => {
counter++;
const lastCall = counter === resolveOnI;
return new Promise((resolve) => {
if (lastCall) {
resolve({
isRunning: false,
isPartial: finishWithError,
});
} else {
resolve({
isRunning: true,
isPartial: true,
});
}
});
});
}

test('Resolves immediatelly', async () => {
const searchFn = getMockedSearch$(1);
const cancelFn = jest.fn();
await pollSearch(searchFn, cancelFn).toPromise();
expect(searchFn).toBeCalledTimes(1);
expect(cancelFn).toBeCalledTimes(0);
});

test('Resolves when complete', async () => {
const searchFn = getMockedSearch$(3);
const cancelFn = jest.fn();
await pollSearch(searchFn, cancelFn).toPromise();
expect(searchFn).toBeCalledTimes(3);
expect(cancelFn).toBeCalledTimes(0);
});

test('Throws AbortError and cancels on error', async () => {
const searchFn = getMockedSearch$(2, true);
const cancelFn = jest.fn();
const poll = pollSearch(searchFn, cancelFn).toPromise();
await expect(poll).rejects.toThrow(AbortError);
expect(searchFn).toBeCalledTimes(2);
expect(cancelFn).toBeCalledTimes(1);
});

test('Throws AbortError and cancels on abort', async () => {
const searchFn = getMockedSearch$(20);
const cancelFn = jest.fn();
const abortController = new AbortController();
const poll = pollSearch(searchFn, cancelFn, {
abortSignal: abortController.signal,
}).toPromise();

await new Promise((resolve) => setTimeout(resolve, 500));
abortController.abort();

await expect(poll).rejects.toThrow(AbortError);

await new Promise((resolve) => setTimeout(resolve, 1000));

expect(searchFn).toBeCalledTimes(1);
expect(cancelFn).toBeCalledTimes(1);
});

test("Stops, but doesn't cancel on unsubscribe", async () => {
const searchFn = getMockedSearch$(20);
const cancelFn = jest.fn();
const subscription = pollSearch(searchFn, cancelFn).subscribe(() => {});

await new Promise((resolve) => setTimeout(resolve, 500));
subscription.unsubscribe();
await new Promise((resolve) => setTimeout(resolve, 1000));

expect(searchFn).toBeCalledTimes(1);
expect(cancelFn).toBeCalledTimes(0);
});
});
19 changes: 15 additions & 4 deletions x-pack/plugins/data_enhanced/common/search/poll_search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,36 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { from, NEVER, Observable, timer } from 'rxjs';
import { expand, finalize, switchMap, takeUntil, takeWhile, tap } from 'rxjs/operators';
import { from, Observable, throwError, timer } from 'rxjs';
import { catchError, expand, finalize, switchMap, takeUntil, takeWhile, tap } from 'rxjs/operators';
import type { IKibanaSearchResponse } from '../../../../../src/plugins/data/common';
import { isErrorResponse, isPartialResponse } from '../../../../../src/plugins/data/common';
import { AbortError, abortSignalToPromise } from '../../../../../src/plugins/kibana_utils/common';
import type { IAsyncSearchOptions } from './types';

export const pollSearch = <Response extends IKibanaSearchResponse>(
search: () => Promise<Response>,
cancel?: () => void,
{ pollInterval = 1000, ...options }: IAsyncSearchOptions = {}
): Observable<Response> => {
const aborted = options?.abortSignal
? abortSignalToPromise(options?.abortSignal)
: { promise: NEVER, cleanup: () => {} };
: { promise: new Promise(() => {}), cleanup: () => {} };

aborted.promise.catch(() => {
if (cancel) cancel();
});

return from(search()).pipe(
expand(() => timer(pollInterval).pipe(switchMap(search))),
tap((response) => {
if (isErrorResponse(response)) throw new AbortError();
if (isErrorResponse(response)) {
throw new Error('Received incomplete result');
}
}),
catchError((e: any) => {
if (cancel) cancel();
return throwError(new AbortError());
lizozom marked this conversation as resolved.
Show resolved Hide resolved
}),
takeWhile<Response>(isPartialResponse, true),
takeUntil<Response>(from(aborted.promise)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ jest.useFakeTimers();
function mockFetchImplementation(responses: any[]) {
let i = 0;
fetchMock.mockImplementation(() => {
const { time = 0, value = {}, isError = false } = responses[i++];
const { time = 0, value = {}, isFetchError = false } = responses[i++];
return new Promise((resolve, reject) =>
setTimeout(() => {
return (isError ? reject : resolve)(value);
return (isFetchError ? reject : resolve)(value);
}, time)
);
});
Expand Down Expand Up @@ -343,11 +343,11 @@ describe('EnhancedSearchInterceptor', () => {
time: 10,
value: {
error: 'oh no',
isPartial: false,
isPartial: true,
isRunning: false,
id: 1,
},
isError: true,
isFetchError: true,
},
];
mockFetchImplementation(responses);
Expand All @@ -366,7 +366,48 @@ describe('EnhancedSearchInterceptor', () => {
await timeTravel(10);

expect(error).toHaveBeenCalled();
expect(error.mock.calls[0][0]).toBe(responses[1].value);
expect(error.mock.calls[0][0]).toBeInstanceOf(AbortError);
expect(fetchMock).toHaveBeenCalledTimes(2);
expect(mockCoreSetup.http.delete).toHaveBeenCalled();
});

test('should DELETE a running async search on async timeout on failed ES result', async () => {
const responses = [
{
time: 10,
value: {
isPartial: true,
isRunning: true,
id: 1,
},
},
{
time: 10,
value: {
error: 'oh no',
isPartial: true,
isRunning: false,
id: 1,
},
},
];
mockFetchImplementation(responses);

const response = searchInterceptor.search({}, { pollInterval: 0 });
response.subscribe({ next, error });

await timeTravel(10);

expect(next).toHaveBeenCalled();
expect(error).not.toHaveBeenCalled();
expect(fetchMock).toHaveBeenCalled();
expect(mockCoreSetup.http.delete).not.toHaveBeenCalled();

// Long enough to reach the timeout but not long enough to reach the next response
await timeTravel(10);

expect(error).toHaveBeenCalled();
expect(error.mock.calls[0][0]).toBeInstanceOf(AbortError);
expect(fetchMock).toHaveBeenCalledTimes(2);
expect(mockCoreSetup.http.delete).toHaveBeenCalled();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,13 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
isSavedToBackground = true;
});

return pollSearch(search, { ...options, abortSignal: combinedSignal }).pipe(
const cancel = () => {
if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`);
};

return pollSearch(search, cancel, { ...options, abortSignal: combinedSignal }).pipe(
tap((response) => (id = response.id)),
catchError((e: AbortError) => {
if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`);
return throwError(this.handleSearchError(e, timeoutSignal, options));
}),
finalize(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ export const eqlSearchStrategyProvider = (
return toEqlKibanaSearchResponse(response);
};

return pollSearch(search, options).pipe(tap((response) => (id = response.id)));
const cancel = async () => {
if (id) {
await esClient.asCurrentUser.eql.delete({ id });
}
};

return pollSearch(search, cancel, options).pipe(tap((response) => (id = response.id)));
},
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,13 @@ export const enhancedEsSearchStrategyProvider = (
return toAsyncKibanaSearchResponse(body);
};

return pollSearch(search, options).pipe(
const cancel = async () => {
lizozom marked this conversation as resolved.
Show resolved Hide resolved
if (id) {
await esClient.asCurrentUser.asyncSearch.delete({ id });
}
};

return pollSearch(search, cancel, options).pipe(
tap((response) => (id = response.id)),
tap(searchUsageObserver(logger, usage))
);
Expand Down