Skip to content

Commit

Permalink
Use term aggregations /w execution_hint:map for events - 100x speedup…
Browse files Browse the repository at this point in the history
… on the MVP cluster (#66)

* Remove aggregations from events query - 5x speedup on MVP

* Add compatibility wrappers

* Speed up aggs with execution_hint: 'map' and apply compat wrappers
  • Loading branch information
rockdaboot committed Jul 4, 2022
1 parent 0e5caec commit 58980f4
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 29 deletions.
1 change: 0 additions & 1 deletion src/plugins/profiling/common/flamegraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ export class FlameGraph {

private getExeFileName(exe: any, type: number) {
if (exe?.FileName === undefined) {
this.logger.warn('missing executable FileName');
return '';
}
if (exe.FileName !== '') {
Expand Down
57 changes: 57 additions & 0 deletions src/plugins/profiling/server/routes/compat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

// Code that works around incompatibilities between different
// versions of Kibana / ES.
// Currently, we work with 8.1 and 8.3 and thus this code only needs
// to address the incompatibilities between those two versions.

import type { TransportResult } from '@elastic/transport/lib/types';
import type {
SearchResponse,
SearchHitsMetadata,
SearchHit,
MgetResponse,
MgetResponseItem,
AggregationsAggregate,
} from '@elastic/elasticsearch/lib/api/types';
import type { ElasticsearchClient } from 'kibana/server';
import type { DataRequestHandlerContext } from '../../../data/server';

// Search results in 8.1 have 'body' but not in 8.3.
export function getHits(
res: TransportResult<SearchResponse<unknown, Record<string, AggregationsAggregate>>, unknown>
): SearchHitsMetadata<unknown> {
return 'body' in res ? res.body.hits : res.hits;
}

export function getAggs(
res: TransportResult<SearchResponse<unknown, Record<string, AggregationsAggregate>>, unknown>
): Record<string, AggregationsAggregate> | undefined {
return 'body' in res ? res.body.aggregations : res.aggregations;
}

export function getHitsItems(
res: TransportResult<SearchResponse<unknown, Record<string, AggregationsAggregate>>, unknown>
): Array<SearchHit<unknown>> {
return getHits(res)?.hits ?? [];
}

// Mget results in 8.1 have 'body' but not in 8.3.
export function getDocs(
res: TransportResult<MgetResponse<any>, unknown>
): Array<MgetResponseItem<any>> {
return ('body' in res ? res.body.docs : res.docs) ?? [];
}

// In 8.3, context.core is a Promise.
export async function getClient(context: DataRequestHandlerContext): Promise<ElasticsearchClient> {
return typeof context.core.then === 'function'
? (await context.core).elasticsearch.client.asCurrentUser
: context.core.elasticsearch.client.asCurrentUser;
}
3 changes: 2 additions & 1 deletion src/plugins/profiling/server/routes/downsampling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import seedrandom from 'seedrandom';
import type { ElasticsearchClient, Logger } from 'kibana/server';
import { ProjectTimeQuery } from './mappings';
import { StackTraceID } from '../../common/profiling';
import { getHits } from './compat';

export interface DownsampledEventsIndex {
name: string;
Expand Down Expand Up @@ -97,7 +98,7 @@ export async function findDownsampledIndex(
track_total_hits: true,
},
});
sampleCountFromInitialExp = resp.body.hits.total?.value as number;
sampleCountFromInitialExp = getHits(resp).total?.value as number;
} catch (e) {
logger.info(e.message);
}
Expand Down
34 changes: 16 additions & 18 deletions src/plugins/profiling/server/routes/flamechart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { logExecutionLatency } from './logger';
import { newProjectTimeQuery, ProjectTimeQuery } from './mappings';
import { downsampleEventsRandomly, findDownsampledIndex } from './downsampling';
import { mgetExecutables, mgetStackFrames, mgetStackTraces, searchStackTraces } from './stacktrace';
import { getHitsItems, getAggs, getClient } from './compat';

export function parallelMget(
nQueries: number,
Expand Down Expand Up @@ -69,21 +70,18 @@ async function queryFlameGraph(
return await client.search(
{
index: eventsIndex.name,
size: 0,
track_total_hits: false,
query: filter,
aggs: {
group_by: {
composite: {
size: 100000, // This is the upper limit of entries per event index.
sources: [
{
traceid: {
terms: {
field: 'StackTraceID',
},
},
},
],
terms: {
// 'size' should be max 100k, but might be slightly more. Better be on the safe side.
size: 150000,
field: 'StackTraceID',
// 'execution_hint: map' skips the slow building of ordinals that we don't need.
// Especially with high cardinality fields, this makes aggregations really slow.
// E.g. it reduces the latency from 70s to 0.7s on our 8.1. MVP cluster (as of 28.04.2022).
execution_hint: 'map',
},
aggs: {
count: {
Expand Down Expand Up @@ -113,12 +111,12 @@ async function queryFlameGraph(
}
);

let totalCount: number = resEvents.body.aggregations?.total_count.value;
let stackTraceEvents = new Map<StackTraceID, number>();
let totalCount: number = getAggs(resEvents)?.total_count.value;
const stackTraceEvents = new Map<StackTraceID, number>();

await logExecutionLatency(logger, 'processing events data', async () => {
resEvents.body.aggregations?.group_by.buckets.forEach((item: any) => {
const traceid: StackTraceID = item.key.traceid;
getAggs(resEvents)?.group_by.buckets.forEach((item: any) => {
const traceid: StackTraceID = item.key;
stackTraceEvents.set(traceid, item.count.value);
});
});
Expand Down Expand Up @@ -180,7 +178,7 @@ export function registerFlameChartElasticSearchRoute(
const targetSampleSize = 20000; // minimum number of samples to get statistically sound results

try {
const esClient = context.core.elasticsearch.client.asCurrentUser;
const esClient = await getClient(context);
const filter = newProjectTimeQuery(projectID!, timeFrom!, timeTo!);

const flamegraph = await queryFlameGraph(
Expand Down Expand Up @@ -231,7 +229,7 @@ export function registerFlameChartPixiSearchRoute(
const targetSampleSize = 20000; // minimum number of samples to get statistically sound results

try {
const esClient = context.core.elasticsearch.client.asCurrentUser;
const esClient = await getClient(context);
const filter = newProjectTimeQuery(projectID!, timeFrom!, timeTo!);

const flamegraph = await queryFlameGraph(
Expand Down
25 changes: 18 additions & 7 deletions src/plugins/profiling/server/routes/stacktrace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
StackTraceID,
} from '../../common/profiling';
import { logExecutionLatency } from './logger';
import { getHitsItems, getDocs } from './compat';

const traceLRU = new LRUCache<StackTraceID, StackTrace>({ max: 20000 });
const frameIDToFileIDCache = new LRUCache<string, FileID>({ max: 100000 });
Expand Down Expand Up @@ -105,7 +106,7 @@ export async function searchStackTraces(
const executableDocIDs = new Set<string>(); // Set of unique executable FileIDs.

await logExecutionLatency(logger, 'processing data', async () => {
const traces = stackResponses.flatMap((response) => response.body.hits.hits);
const traces = stackResponses.flatMap((response) => getHitsItems(response));
for (const trace of traces) {
const frameIDs = trace.fields.FrameID as string[];
const fileIDs = extractFileIDArrayFromFrameIDArray(frameIDs);
Expand Down Expand Up @@ -149,7 +150,7 @@ export async function mgetStackTraces(

const stackResponses = await logExecutionLatency(
logger,
'mget query for ' + events.size + ' stacktraces',
'mget query (' + concurrency + ' parallel) for ' + events.size + ' stacktraces',
async () => {
return await Promise.all(
chunks.map((ids) => {
Expand All @@ -172,7 +173,7 @@ export async function mgetStackTraces(
await logExecutionLatency(logger, 'processing data', async () => {
// flatMap() is significantly slower than an explicit for loop
for (const res of stackResponses) {
for (const trace of res.body.docs) {
for (const trace of getDocs(res)) {
// Sometimes we don't find the trace.
// This is due to ES delays writing (data is not immediately seen after write).
// Also, ES doesn't know about transactions.
Expand Down Expand Up @@ -220,6 +221,12 @@ export async function mgetStackFrames(
client: ElasticsearchClient,
stackFrameIDs: Set<string>
): Promise<Map<StackFrameID, StackFrame>> {
const stackFrames = new Map<StackFrameID, StackFrame>();

if (stackFrameIDs.size === 0) {
return stackFrames;
}

const resStackFrames = await logExecutionLatency(
logger,
'mget query for ' + stackFrameIDs.size + ' stackframes',
Expand All @@ -233,10 +240,9 @@ export async function mgetStackFrames(
);

// Create a lookup map StackFrameID -> StackFrame.
const stackFrames = new Map<StackFrameID, StackFrame>();
let framesFound = 0;
await logExecutionLatency(logger, 'processing data', async () => {
const docs = resStackFrames.body?.docs ?? [];
const docs = getDocs(resStackFrames);
for (const frame of docs) {
if (frame.found) {
stackFrames.set(frame._id, frame._source);
Expand All @@ -263,6 +269,12 @@ export async function mgetExecutables(
client: ElasticsearchClient,
executableIDs: Set<string>
): Promise<Map<FileID, Executable>> {
const executables = new Map<FileID, Executable>();

if (executableIDs.size === 0) {
return executables;
}

const resExecutables = await logExecutionLatency(
logger,
'mget query for ' + executableIDs.size + ' executables',
Expand All @@ -276,9 +288,8 @@ export async function mgetExecutables(
);

// Create a lookup map StackFrameID -> StackFrame.
const executables = new Map<FileID, Executable>();
await logExecutionLatency(logger, 'processing data', async () => {
const docs = resExecutables.body?.docs ?? [];
const docs = getDocs(resExecutables);
for (const exe of docs) {
if (exe.found) {
executables.set(exe._id, exe._source);
Expand Down
5 changes: 3 additions & 2 deletions src/plugins/profiling/server/routes/topn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { findDownsampledIndex } from './downsampling';
import { logExecutionLatency } from './logger';
import { autoHistogramSumCountOnGroupByField, newProjectTimeQuery } from './mappings';
import { mgetExecutables, mgetStackFrames, mgetStackTraces } from './stacktrace';
import { getClient, getAggs } from './compat';

export async function topNElasticSearchQuery(
client: ElasticsearchClient,
Expand Down Expand Up @@ -67,7 +68,7 @@ export async function topNElasticSearchQuery(
}
);

const histogram = resEvents.body.aggregations?.histogram as AggregationsHistogramAggregate;
const histogram = getAggs(resEvents)?.histogram as AggregationsHistogramAggregate;
const topN = createTopNBucketsByDate(histogram);

if (searchField !== 'StackTraceID') {
Expand Down Expand Up @@ -136,7 +137,7 @@ export function queryTopNCommon(
},
async (context, request, response) => {
const { index, projectID, timeFrom, timeTo, n } = request.query;
const client = context.core.elasticsearch.client.asCurrentUser;
const client = await getClient(context);

try {
return await topNElasticSearchQuery(
Expand Down

0 comments on commit 58980f4

Please sign in to comment.