-
Notifications
You must be signed in to change notification settings - Fork 8.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Search] Add cancel function to pollSearch #85787
Changes from all commits
e8132ab
4a09685
0810f82
22aa4af
14ad169
b532398
32d33a9
a989b34
0408f42
806c7e4
551f930
bb4af4d
2f8e3d8
7240de3
2016703
3df3d74
0df0079
844441b
8d5e2f4
e281285
9b8085f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
import { pollSearch } from './poll_search'; | ||
import { AbortError } from '../../../../../src/plugins/kibana_utils/common'; | ||
|
||
describe('pollSearch', () => { | ||
function getMockedSearch$(resolveOnI = 1, finishWithError = false) { | ||
let counter = 0; | ||
return jest.fn().mockImplementation(() => { | ||
counter++; | ||
const lastCall = counter === resolveOnI; | ||
return new Promise((resolve) => { | ||
if (lastCall) { | ||
resolve({ | ||
isRunning: false, | ||
isPartial: finishWithError, | ||
}); | ||
} else { | ||
resolve({ | ||
isRunning: true, | ||
isPartial: true, | ||
}); | ||
} | ||
}); | ||
}); | ||
} | ||
|
||
test('Defers execution', async () => { | ||
const searchFn = getMockedSearch$(1); | ||
const cancelFn = jest.fn(); | ||
pollSearch(searchFn, cancelFn); | ||
expect(searchFn).toBeCalledTimes(0); | ||
expect(cancelFn).toBeCalledTimes(0); | ||
}); | ||
|
||
test('Resolves immediatelly', async () => { | ||
const searchFn = getMockedSearch$(1); | ||
const cancelFn = jest.fn(); | ||
await pollSearch(searchFn, cancelFn).toPromise(); | ||
expect(searchFn).toBeCalledTimes(1); | ||
expect(cancelFn).toBeCalledTimes(0); | ||
}); | ||
|
||
test('Resolves when complete', async () => { | ||
const searchFn = getMockedSearch$(3); | ||
const cancelFn = jest.fn(); | ||
await pollSearch(searchFn, cancelFn).toPromise(); | ||
expect(searchFn).toBeCalledTimes(3); | ||
expect(cancelFn).toBeCalledTimes(0); | ||
}); | ||
|
||
test('Throws Error on ES error response', async () => { | ||
const searchFn = getMockedSearch$(2, true); | ||
const cancelFn = jest.fn(); | ||
const poll = pollSearch(searchFn, cancelFn).toPromise(); | ||
await expect(poll).rejects.toThrow(Error); | ||
expect(searchFn).toBeCalledTimes(2); | ||
expect(cancelFn).toBeCalledTimes(0); | ||
}); | ||
|
||
test('Throws AbortError on empty response', async () => { | ||
const searchFn = jest.fn().mockResolvedValue(undefined); | ||
const cancelFn = jest.fn(); | ||
const poll = pollSearch(searchFn, cancelFn).toPromise(); | ||
await expect(poll).rejects.toThrow(AbortError); | ||
expect(searchFn).toBeCalledTimes(1); | ||
expect(cancelFn).toBeCalledTimes(0); | ||
}); | ||
|
||
test('Throws AbortError and cancels on abort', async () => { | ||
const searchFn = getMockedSearch$(20); | ||
const cancelFn = jest.fn(); | ||
const abortController = new AbortController(); | ||
const poll = pollSearch(searchFn, cancelFn, { | ||
abortSignal: abortController.signal, | ||
}).toPromise(); | ||
|
||
await new Promise((resolve) => setTimeout(resolve, 500)); | ||
abortController.abort(); | ||
|
||
await expect(poll).rejects.toThrow(AbortError); | ||
|
||
await new Promise((resolve) => setTimeout(resolve, 1000)); | ||
|
||
expect(searchFn).toBeCalledTimes(1); | ||
expect(cancelFn).toBeCalledTimes(1); | ||
}); | ||
|
||
test("Stops, but doesn't cancel on unsubscribe", async () => { | ||
const searchFn = getMockedSearch$(20); | ||
const cancelFn = jest.fn(); | ||
const subscription = pollSearch(searchFn, cancelFn).subscribe(() => {}); | ||
|
||
await new Promise((resolve) => setTimeout(resolve, 500)); | ||
subscription.unsubscribe(); | ||
await new Promise((resolve) => setTimeout(resolve, 1000)); | ||
|
||
expect(searchFn).toBeCalledTimes(1); | ||
expect(cancelFn).toBeCalledTimes(0); | ||
}); | ||
|
||
test('Calls cancel even when consumer unsubscribes', async () => { | ||
const searchFn = getMockedSearch$(20); | ||
const cancelFn = jest.fn(); | ||
const abortController = new AbortController(); | ||
const subscription = pollSearch(searchFn, cancelFn, { | ||
abortSignal: abortController.signal, | ||
}).subscribe(() => {}); | ||
subscription.unsubscribe(); | ||
abortController.abort(); | ||
|
||
expect(searchFn).toBeCalledTimes(1); | ||
expect(cancelFn).toBeCalledTimes(1); | ||
}); | ||
}); |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -4,6 +4,7 @@ | |||||||||
* you may not use this file except in compliance with the Elastic License. | ||||||||||
*/ | ||||||||||
|
||||||||||
import { once } from 'lodash'; | ||||||||||
import { throwError, Subscription } from 'rxjs'; | ||||||||||
import { tap, finalize, catchError, filter, take, skip } from 'rxjs/operators'; | ||||||||||
import { | ||||||||||
|
@@ -14,7 +15,6 @@ import { | |||||||||
IKibanaSearchRequest, | ||||||||||
SearchSessionState, | ||||||||||
} from '../../../../../src/plugins/data/public'; | ||||||||||
import { AbortError } from '../../../../../src/plugins/kibana_utils/common'; | ||||||||||
import { ENHANCED_ES_SEARCH_STRATEGY, IAsyncSearchOptions, pollSearch } from '../../common'; | ||||||||||
|
||||||||||
export class EnhancedSearchInterceptor extends SearchInterceptor { | ||||||||||
|
@@ -84,10 +84,14 @@ export class EnhancedSearchInterceptor extends SearchInterceptor { | |||||||||
isSavedToBackground = true; | ||||||||||
}); | ||||||||||
|
||||||||||
return pollSearch(search, { ...options, abortSignal: combinedSignal }).pipe( | ||||||||||
const cancel = once(() => { | ||||||||||
if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`); | ||||||||||
}); | ||||||||||
|
||||||||||
return pollSearch(search, cancel, { ...options, abortSignal: combinedSignal }).pipe( | ||||||||||
tap((response) => (id = response.id)), | ||||||||||
catchError((e: AbortError) => { | ||||||||||
if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`); | ||||||||||
catchError((e: Error) => { | ||||||||||
cancel(); | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The de-duplication is neat, but we have to guard against calling this when the
Suggested change
Otherwise it will be sent twice on explicit abort. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lukasolson are you still considering to fix this scenario? |
||||||||||
return throwError(this.handleSearchError(e, timeoutSignal, options)); | ||||||||||
}), | ||||||||||
finalize(() => { | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elegant 👍