Skip to content

Commit

Permalink
[Search] Use async es client endpoints (#76872)
Browse files Browse the repository at this point in the history
* Use ES Client asyncSearch

* Rename to queryOptions

* Simplify options

* Update jest test and use delete route

* Common async options
  • Loading branch information
lizozom authored Sep 8, 2020
1 parent eecb0e5 commit 28c5f27
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,24 @@ const mockRollupResponse = {

describe('ES search strategy', () => {
const mockApiCaller = jest.fn();
const mockGetCaller = jest.fn();
const mockSubmitCaller = jest.fn();
const mockLogger: any = {
debug: () => {},
};
const mockContext = {
core: {
elasticsearch: { client: { asCurrentUser: { transport: { request: mockApiCaller } } } },
elasticsearch: {
client: {
asCurrentUser: {
asyncSearch: {
get: mockGetCaller,
submit: mockSubmitCaller,
},
transport: { request: mockApiCaller },
},
},
},
},
};
const mockConfig$ = pluginInitializerContextConfigMock<any>({}).legacy.globalConfig$;
Expand All @@ -56,47 +68,32 @@ describe('ES search strategy', () => {
});

it('makes a POST request to async search with params when no ID is provided', async () => {
mockApiCaller.mockResolvedValueOnce(mockAsyncResponse);
mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse);

const params = { index: 'logstash-*', body: { query: {} } };
const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger);

await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });

expect(mockApiCaller).toBeCalled();
const { method, path, body } = mockApiCaller.mock.calls[0][0];
expect(method).toBe('POST');
expect(path).toBe('/logstash-*/_async_search');
expect(body).toEqual({ query: {} });
expect(mockSubmitCaller).toBeCalled();
const request = mockSubmitCaller.mock.calls[0][0];
expect(request.index).toEqual(params.index);
expect(request.body).toEqual(params.body);
});

it('makes a GET request to async search with ID when ID is provided', async () => {
mockApiCaller.mockResolvedValueOnce(mockAsyncResponse);
mockGetCaller.mockResolvedValueOnce(mockAsyncResponse);

const params = { index: 'logstash-*', body: { query: {} } };
const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger);

await esSearch.search((mockContext as unknown) as RequestHandlerContext, { id: 'foo', params });

expect(mockApiCaller).toBeCalled();
const { method, path, body } = mockApiCaller.mock.calls[0][0];
expect(method).toBe('GET');
expect(path).toBe('/_async_search/foo');
expect(body).toEqual(undefined);
});

it('encodes special characters in the path', async () => {
mockApiCaller.mockResolvedValueOnce(mockAsyncResponse);

const params = { index: 'foo-程', body: {} };
const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger);

await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });

expect(mockApiCaller).toBeCalled();
const { method, path } = mockApiCaller.mock.calls[0][0];
expect(method).toBe('POST');
expect(path).toBe('/foo-%E7%A8%8B/_async_search');
expect(mockGetCaller).toBeCalled();
const request = mockGetCaller.mock.calls[0][0];
expect(request.id).toEqual('foo');
expect(request).toHaveProperty('wait_for_completion_timeout');
expect(request).toHaveProperty('keep_alive');
});

it('calls the rollup API if the index is a rollup type', async () => {
Expand All @@ -117,16 +114,16 @@ describe('ES search strategy', () => {
});

it('sets wait_for_completion_timeout and keep_alive in the request', async () => {
mockApiCaller.mockResolvedValueOnce(mockAsyncResponse);
mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse);

const params = { index: 'foo-*', body: {} };
const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger);

await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });

expect(mockApiCaller).toBeCalled();
const { querystring } = mockApiCaller.mock.calls[0][0];
expect(querystring).toHaveProperty('wait_for_completion_timeout');
expect(querystring).toHaveProperty('keep_alive');
expect(mockSubmitCaller).toBeCalled();
const request = mockSubmitCaller.mock.calls[0][0];
expect(request).toHaveProperty('wait_for_completion_timeout');
expect(request).toHaveProperty('keep_alive');
});
});
47 changes: 18 additions & 29 deletions x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ export const enhancedEsSearchStrategyProvider = (

const cancel = async (context: RequestHandlerContext, id: string) => {
logger.debug(`cancel ${id}`);
await context.core.elasticsearch.client.asCurrentUser.transport.request({
method: 'DELETE',
path: encodeURI(`/_async_search/${id}`),
await context.core.elasticsearch.client.asCurrentUser.asyncSearch.delete({
id,
});
};

Expand All @@ -84,39 +83,29 @@ async function asyncSearch(
request: IEnhancedEsSearchRequest,
options?: ISearchOptions
): Promise<IEsSearchResponse> {
const { timeout = undefined, restTotalHitsAsInt = undefined, ...params } = {
...request.params,
};

params.trackTotalHits = true; // Get the exact count of hits

// If we have an ID, then just poll for that ID, otherwise send the entire request body
const { body = undefined, index = undefined, ...queryParams } = request.id ? {} : params;

const method = request.id ? 'GET' : 'POST';
const path = encodeURI(request.id ? `/_async_search/${request.id}` : `/${index}/_async_search`);

// Only report partial results every 64 shards; this should be reduced when we actually display partial results
const batchedReduceSize = request.id ? undefined : 64;
let esResponse;

const asyncOptions = {
waitForCompletionTimeout: '100ms', // Wait up to 100ms for the response to return
keepAlive: '1m', // Extend the TTL for this search request by one minute
};

const querystring = toSnakeCase({
...asyncOptions,
...(batchedReduceSize && { batchedReduceSize }),
...queryParams,
});
// If we have an ID, then just poll for that ID, otherwise send the entire request body
if (!request.id) {
const submitOptions = toSnakeCase({
batchedReduceSize: 64, // Only report partial results every 64 shards; this should be reduced when we actually display partial results
trackTotalHits: true, // Get the exact count of hits
...asyncOptions,
...request.params,
});

// TODO: replace with async endpoints once https://github.com/elastic/elasticsearch-js/issues/1280 is resolved
const esResponse = await client.transport.request({
method,
path,
body,
querystring,
});
esResponse = await client.asyncSearch.submit(submitOptions);
} else {
esResponse = await client.asyncSearch.get({
id: request.id,
...toSnakeCase(asyncOptions),
});
}

const { id, response, is_partial: isPartial, is_running: isRunning } = esResponse.body;
return {
Expand Down

0 comments on commit 28c5f27

Please sign in to comment.