Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[APM] Upgrade ES client #86594

Merged
merged 21 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
aae68a8
[APM] Upgrade ES client
dgieselaar Dec 19, 2020
34ede87
Merge branch 'master' of github.com:elastic/kibana into upgrade-es-cl…
dgieselaar Dec 21, 2020
e04437c
Merge branch 'master' of github.com:elastic/kibana into upgrade-es-cl…
dgieselaar Dec 25, 2020
25ae7f8
Remove custom instrumentation
dgieselaar Dec 27, 2020
24c1d05
Merge branch 'master' into upgrade-es-client
kibanamachine Dec 29, 2020
df5ca31
Update x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telem…
dgieselaar Dec 29, 2020
0647f10
Merge branch 'master' of github.com:elastic/kibana into upgrade-es-cl…
dgieselaar Jan 4, 2021
f6a356d
Show ES error type in toast
dgieselaar Jan 4, 2021
5122af1
Merge branch 'upgrade-es-client' of github.com:dgieselaar/kibana into…
dgieselaar Jan 4, 2021
5b28a0a
Review feedback
dgieselaar Jan 6, 2021
70029d9
Merge branch 'master' of github.com:elastic/kibana into upgrade-es-cl…
dgieselaar Jan 6, 2021
dc4f6b1
Merge branch 'master' of github.com:elastic/kibana into upgrade-es-cl…
dgieselaar Jan 19, 2021
8841f31
Merge branch 'master' of github.com:elastic/kibana into upgrade-es-cl…
dgieselaar Jan 20, 2021
87bd80e
Review feedback
dgieselaar Jan 20, 2021
8d295b9
Merge branch 'master' of github.com:elastic/kibana into upgrade-es-cl…
dgieselaar Jan 22, 2021
9deda89
Revert change to .finally()
dgieselaar Jan 22, 2021
1a58b88
Merge branch 'master' into upgrade-es-client
kibanamachine Jan 25, 2021
e22af8b
Merge branch 'master' of github.com:elastic/kibana into upgrade-es-cl…
dgieselaar Jan 26, 2021
cf1b42f
Throw error so it is logged/sent to APM
dgieselaar Jan 26, 2021
bc78b62
Merge branch 'upgrade-es-client' of github.com:dgieselaar/kibana into…
dgieselaar Jan 26, 2021
c0659d7
Merge branch 'master' of github.com:elastic/kibana into upgrade-es-cl…
dgieselaar Jan 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions x-pack/plugins/apm/public/hooks/use_fetcher.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ export interface FetcherResult<Data> {
error?: IHttpFetchError;
}

function getDetailsFromErrorResponse(error: IHttpFetchError) {
const message = error.body?.message ?? error.response?.statusText;
return (
<>
{message} ({error.response?.status})
<h5>
{i18n.translate('xpack.apm.fetcher.error.url', {
defaultMessage: `URL`,
})}
</h5>
{error.response?.url}
</>
);
}

// fetcher functions can return undefined OR a promise. Previously we had a more simple type
// but it led to issues when using object destructuring with default values
type InferResponseType<TReturn> = Exclude<TReturn, undefined> extends Promise<
Expand Down Expand Up @@ -82,25 +97,14 @@ export function useFetcher<TReturn>(

if (!didCancel) {
const errorDetails =
'response' in err ? (
<>
{err.response?.statusText} ({err.response?.status})
<h5>
{i18n.translate('xpack.apm.fetcher.error.url', {
defaultMessage: `URL`,
})}
</h5>
{err.response?.url}
</>
) : (
err.message
);
'response' in err ? getDetailsFromErrorResponse(err) : err.message;

if (showToastOnError) {
notifications.toasts.addWarning({
notifications.toasts.addDanger({
title: i18n.translate('xpack.apm.fetcher.error.title', {
defaultMessage: `Error while fetching resource`,
}),

text: toMountPoint(
<div>
<h5>
Expand Down
12 changes: 7 additions & 5 deletions x-pack/plugins/apm/scripts/upload-telemetry-data/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { argv } from 'yargs';
import { Logger } from 'kibana/server';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { CollectTelemetryParams } from '../../server/lib/apm_telemetry/collect_data_telemetry';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { unwrapEsResponse } from '../../../observability/server/utils/unwrap_es_response';
import { downloadTelemetryTemplate } from '../shared/download-telemetry-template';
import { mergeApmTelemetryMapping } from '../../common/apm_telemetry';
import { generateSampleDocuments } from './generate-sample-documents';
Expand Down Expand Up @@ -80,18 +82,18 @@ async function uploadData() {
apmAgentConfigurationIndex: '.apm-agent-configuration',
},
search: (body) => {
return client.search(body as any).then((res) => res.body as any);
return unwrapEsResponse(client.search<any>(body));
},
indicesStats: (body) => {
return client.indices.stats(body as any).then((res) => res.body);
return unwrapEsResponse(client.indices.stats<any>(body));
},
transportRequest: ((params) => {
return client.transport
.request({
return unwrapEsResponse(
client.transport.request({
method: params.method,
path: params.path,
})
.then((res) => res.body);
);
}) as CollectTelemetryParams['transportRequest'],
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { merge } from 'lodash';
import { Logger, LegacyCallAPIOptions } from 'kibana/server';
import { IndicesStatsParams, Client } from 'elasticsearch';
import { Logger } from 'kibana/server';
import { RequestParams } from '@elastic/elasticsearch';
import {
ESSearchRequest,
ESSearchResponse,
Expand All @@ -20,9 +20,17 @@ type TelemetryTaskExecutor = (params: {
params: TSearchRequest
): Promise<ESSearchResponse<unknown, TSearchRequest>>;
indicesStats(
params: IndicesStatsParams,
options?: LegacyCallAPIOptions
): ReturnType<Client['indices']['stats']>;
params: RequestParams.IndicesStats
// promise returned by client has an abort property
dgieselaar marked this conversation as resolved.
Show resolved Hide resolved
// so we cannot use its ReturnType
): Promise<{
_all?: {
total?: { store?: { size_in_bytes?: number }; docs?: { count?: number } };
};
_shards?: {
total?: number;
};
}>;
transportRequest: (params: {
path: string;
method: 'get';
Expand Down
22 changes: 9 additions & 13 deletions x-pack/plugins/apm/server/lib/apm_telemetry/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
Logger,
SavedObjectsErrorHelpers,
} from '../../../../../../src/core/server';
import { unwrapEsResponse } from '../../../../observability/server';
import { APMConfig } from '../..';
import {
TaskManagerSetupContract,
Expand Down Expand Up @@ -65,27 +66,22 @@ export async function createApmTelemetry({
const collectAndStore = async () => {
const config = await config$.pipe(take(1)).toPromise();
const [{ elasticsearch }] = await core.getStartServices();
const esClient = elasticsearch.legacy.client;
const esClient = elasticsearch.client;

const indices = await getApmIndices({
config,
savedObjectsClient,
});

const search = esClient.callAsInternalUser.bind(
esClient,
'search'
) as CollectTelemetryParams['search'];
const search: CollectTelemetryParams['search'] = (params) =>
unwrapEsResponse(esClient.asInternalUser.search<any>(params));

const indicesStats = esClient.callAsInternalUser.bind(
esClient,
'indices.stats'
) as CollectTelemetryParams['indicesStats'];
const indicesStats: CollectTelemetryParams['indicesStats'] = (params) =>
unwrapEsResponse(esClient.asInternalUser.indices.stats(params));

const transportRequest = esClient.callAsInternalUser.bind(
esClient,
'transport.request'
) as CollectTelemetryParams['transportRequest'];
const transportRequest: CollectTelemetryParams['transportRequest'] = (
params
) => unwrapEsResponse(esClient.asInternalUser.transport.request(params));

const dataTelemetry = await collectDataTelemetry({
search,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,31 @@
/* eslint-disable no-console */

import chalk from 'chalk';
import {
LegacyAPICaller,
KibanaRequest,
} from '../../../../../../../src/core/server';
import { KibanaRequest } from '../../../../../../../src/core/server';

function formatObj(obj: Record<string, any>) {
return JSON.stringify(obj, null, 2);
}

export async function callClientWithDebug({
apiCaller,
operationName,
params,
export async function callAsyncWithDebug<T>({
cb,
getDebugMessage,
debug,
request,
}: {
apiCaller: LegacyAPICaller;
operationName: string;
params: Record<string, any>;
cb: () => Promise<T>;
getDebugMessage: () => { body: string; title: string };
debug: boolean;
request: KibanaRequest;
}) {
if (!debug) {
return cb();
}

const startTime = process.hrtime();

let res: any;
let esError = null;
try {
res = await apiCaller(operationName, params);
res = await cb();
} catch (e) {
// catch error and throw after outputting debug info
esError = e;
Expand All @@ -44,23 +41,14 @@ export async function callClientWithDebug({
const highlightColor = esError ? 'bgRed' : 'inverse';
const diff = process.hrtime(startTime);
const duration = `${Math.round(diff[0] * 1000 + diff[1] / 1e6)}ms`;
const routeInfo = `${request.route.method.toUpperCase()} ${
request.route.path
}`;

const { title, body } = getDebugMessage();

console.log(
chalk.bold[highlightColor](`=== Debug: ${routeInfo} (${duration}) ===`)
chalk.bold[highlightColor](`=== Debug: ${title} (${duration}) ===`)
);

if (operationName === 'search') {
console.log(`GET ${params.index}/_${operationName}`);
console.log(formatObj(params.body));
} else {
console.log(chalk.bold('ES operation:'), operationName);

console.log(chalk.bold('ES query:'));
console.log(formatObj(params));
}
console.log(body);
console.log(`\n`);
}

Expand All @@ -70,3 +58,19 @@ export async function callClientWithDebug({

return res;
}

export const getDebugBody = (
params: Record<string, any>,
operationName: string
) => {
if (operationName === 'search') {
return `GET ${params.index}/_search\n${formatObj(params.body)}`;
}

return `${chalk.bold('ES operation:')} ${operationName}\n${chalk.bold(
'ES query:'
)}\n${formatObj(params)}`;
};

export const getDebugTitle = (request: KibanaRequest) =>
`${request.route.method.toUpperCase()} ${request.route.path}`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport';
import { KibanaRequest } from 'src/core/server';

export function cancelEsRequestOnAbort<T extends TransportRequestPromise<any>>(
promise: T,
request: KibanaRequest
) {
const subscription = request.events.aborted$.subscribe(() => {
promise.abort();
});

// using .catch() here means unsubscribe will be called
// after it has thrown an error, so we use .then(onSuccess, onFailure)
// syntax
promise.then(
() => subscription.unsubscribe(),
() => subscription.unsubscribe()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do this twice? It looks intentional but a comment explaining would help me understand.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second argument to .then() is a failure callback. The difference between .then(onSuccess, onFailure) and .then(onSuccess).catch(onFailure) is that the former calls onFailure only when the original promise is rejected, the second is also called when the onSuccess handler throws. I will add a comment.

Copy link
Member

@sorenlouv sorenlouv Jan 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this what finally is for (available since Node 10).

promise.finally(() => subscription.unsubscribe())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's what I used initially but it failed on CI (with the API tests). Admittedly I was too lazy to figure out why that happened 😀 I'll have another look.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh duh I knew that I was just reading it as two chained thens for some reason. Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bummer. Okay, thanks for trying.

);

return promise;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { contextServiceMock } from 'src/core/server/mocks';
import { createHttpServer } from 'src/core/server/test_utils';
import supertest from 'supertest';
import { createApmEventClient } from '.';

describe('createApmEventClient', () => {
let server: ReturnType<typeof createHttpServer>;

beforeEach(() => {
server = createHttpServer();
});

afterEach(async () => {
await server.stop();
});
it('cancels a search when a request is aborted', async () => {
const { server: innerServer, createRouter } = await server.setup({
context: contextServiceMock.createSetupContract(),
});
const router = createRouter('/');

const abort = jest.fn();
router.get(
{ path: '/', validate: false },
async (context, request, res) => {
const eventClient = createApmEventClient({
esClient: {
search: () => {
return Object.assign(
new Promise((resolve) => setTimeout(resolve, 3000)),
{ abort }
);
},
} as any,
debug: false,
request,
indices: {} as any,
options: {
includeFrozen: false,
},
});

await eventClient.search({
apm: {
events: [],
},
});

return res.ok({ body: 'ok' });
}
);

await server.start();

const incomingRequest = supertest(innerServer.listener)
.get('/')
// end required to send request
.end();

await new Promise((resolve) => {
setTimeout(() => {
incomingRequest.abort();
setTimeout(() => {
resolve(undefined);
}, 0);
}, 50);
});

expect(abort).toHaveBeenCalled();
});
});
Loading