Skip to content

Commit

Permalink
[APM] Optimize synthtrace (#116091)
Browse files Browse the repository at this point in the history
  • Loading branch information
dgieselaar authored Oct 26, 2021
1 parent d442213 commit 9d9eeb0
Show file tree
Hide file tree
Showing 18 changed files with 482 additions and 301 deletions.
31 changes: 16 additions & 15 deletions packages/elastic-apm-synthtrace/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,24 @@ const esEvents = toElasticsearchOutput([

Via the CLI, you can upload scenarios, either using a fixed time range or continuously generating data. Some examples are available in in `src/scripts/examples`. Here's an example for live data:

`$ node packages/elastic-apm-synthtrace/src/scripts/run packages/elastic-apm-synthtrace/src/examples/01_simple_trace.ts --target=http://admin:changeme@localhost:9200 --live`
`$ node packages/elastic-apm-synthtrace/src/scripts/run packages/elastic-apm-generator/src/examples/01_simple_trace.ts --target=http://admin:changeme@localhost:9200 --live`

For a fixed time window:
`$ node packages/elastic-apm-synthtrace/src/scripts/run packages/elastic-apm-synthtrace/src/examples/01_simple_trace.ts --target=http://admin:changeme@localhost:9200 --from=now-24h --to=now`
`$ node packages/elastic-apm-synthtrace/src/scripts/run packages/elastic-apm-generator/src/examples/01_simple_trace.ts --target=http://admin:changeme@localhost:9200 --from=now-24h --to=now`

The script will try to automatically find bootstrapped APM indices. **If these indices do not exist, the script will exit with an error. It will not bootstrap the indices itself.**
The script will try to automatically find bootstrapped APM indices. __If these indices do not exist, the script will exit with an error. It will not bootstrap the indices itself.__

The following options are supported:
| Option | Description | Default |
| -------------- | ------------------------------------------------------- | ------------ |
| `--from` | The start of the time window. | `now - 15m` |
| `--to` | The end of the time window. | `now` |
| `--live` | Continously ingest data | `false` |
| `--bucketSize` | Size of bucket for which to generate data. | `15m` |
| `--clean` | Clean APM indices before indexing new data. | `false` |
| `--interval` | The interval at which to index data. | `10s` |
| `--logLevel` | Log level. | `info` |
| `--lookback` | The lookback window for which data should be generated. | `15m` |
| `--target` | Elasticsearch target, including username/password. | **Required** |
| `--workers` | Amount of simultaneously connected ES clients. | `1` |
| Option | Description | Default |
| ------------------| ------------------------------------------------------- | ------------ |
| `--target` | Elasticsearch target, including username/password. | **Required** |
| `--from` | The start of the time window. | `now - 15m` |
| `--to` | The end of the time window. | `now` |
| `--live` | Continously ingest data | `false` |
| `--clean` | Clean APM indices before indexing new data. | `false` |
| `--workers` | Amount of Node.js worker threads | `5` |
| `--bucketSize` | Size of bucket for which to generate data. | `15m` |
| `--interval` | The interval at which to index data. | `10s` |
| `--clientWorkers` | Number of simultaneously connected ES clients | `5` |
| `--batchSize` | Number of documents per bulk index request | `1000` |
| `--logLevel` | Log level. | `info` |
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ export interface ElasticsearchOutputWriteTargets {
metric: string;
}

const observerDefaults = getObserverDefaults();

const esDocumentDefaults = {
ecs: {
version: '1.4',
},
};

// eslint-disable-next-line guard-for-in
for (const key in observerDefaults) {
set(esDocumentDefaults, key, observerDefaults[key as keyof typeof observerDefaults]);
}
export function toElasticsearchOutput({
events,
writeTargets,
Expand All @@ -30,22 +42,25 @@ export function toElasticsearchOutput({
writeTargets: ElasticsearchOutputWriteTargets;
}): ElasticsearchOutput[] {
return events.map((event) => {
const values = {
...event,
...getObserverDefaults(),
const values = {};

Object.assign(values, event, {
'@timestamp': new Date(event['@timestamp']!).toISOString(),
'timestamp.us': event['@timestamp']! * 1000,
'ecs.version': '1.4',
'service.node.name':
event['service.node.name'] || event['container.id'] || event['host.name'],
};
});

const document = {};

Object.assign(document, esDocumentDefaults);

// eslint-disable-next-line guard-for-in
for (const key in values) {
const val = values[key as keyof typeof values];
set(document, key, val);
}

return {
_index: writeTargets[event['processor.event'] as keyof ElasticsearchOutputWriteTargets],
_source: document,
Expand Down
13 changes: 5 additions & 8 deletions packages/elastic-apm-synthtrace/src/lib/utils/generate_id.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,17 @@
* Side Public License, v 1.
*/

import uuidv5 from 'uuid/v5';

let seq = 0;

const namespace = 'f38d5b83-8eee-4f5b-9aa6-2107e15a71e3';

function generateId(seed?: string) {
return uuidv5(seed ?? String(seq++), namespace).replace(/-/g, '');
function generateId(seed?: string, length: number = 32) {
const str = seed ?? String(seq++);
return str.padStart(length, '0');
}

export function generateShortId(seed?: string) {
return generateId(seed).substr(0, 16);
return generateId(seed, 16);
}

export function generateLongId(seed?: string) {
return generateId(seed).substr(0, 32);
return generateId(seed, 32);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@ import { service, timerange, getTransactionMetrics, getSpanDestinationMetrics }
import { getBreakdownMetrics } from '../../lib/utils/get_breakdown_metrics';

export default function ({ from, to }: { from: number; to: number }) {
const instance = service('opbeans-go', 'production', 'go').instance('instance');
const numServices = 3;

const range = timerange(from, to);

const transactionName = '240rpm/75% 1000ms';

const successfulTraceEvents = range
.interval('1s')
.rate(3)
.flatMap((timestamp) =>
const successfulTimestamps = range.interval('1s').rate(3);

const failedTimestamps = range.interval('1s').rate(1);

return new Array(numServices).fill(undefined).flatMap((_, index) => {
const instance = service(`opbeans-go-${index}`, 'production', 'go').instance('instance');

const successfulTraceEvents = successfulTimestamps.flatMap((timestamp) =>
instance
.transaction(transactionName)
.timestamp(timestamp)
Expand All @@ -37,10 +41,7 @@ export default function ({ from, to }: { from: number; to: number }) {
.serialize()
);

const failedTraceEvents = range
.interval('1s')
.rate(1)
.flatMap((timestamp) =>
const failedTraceEvents = failedTimestamps.flatMap((timestamp) =>
instance
.transaction(transactionName)
.timestamp(timestamp)
Expand All @@ -52,27 +53,28 @@ export default function ({ from, to }: { from: number; to: number }) {
.serialize()
);

const metricsets = range
.interval('30s')
.rate(1)
.flatMap((timestamp) =>
instance
.appMetrics({
'system.memory.actual.free': 800,
'system.memory.total': 1000,
'system.cpu.total.norm.pct': 0.6,
'system.process.cpu.total.norm.pct': 0.7,
})
.timestamp(timestamp)
.serialize()
);
const events = successfulTraceEvents.concat(failedTraceEvents);
const metricsets = range
.interval('30s')
.rate(1)
.flatMap((timestamp) =>
instance
.appMetrics({
'system.memory.actual.free': 800,
'system.memory.total': 1000,
'system.cpu.total.norm.pct': 0.6,
'system.process.cpu.total.norm.pct': 0.7,
})
.timestamp(timestamp)
.serialize()
);
const events = successfulTraceEvents.concat(failedTraceEvents);

return [
...events,
...metricsets,
...getTransactionMetrics(events),
...getSpanDestinationMetrics(events),
...getBreakdownMetrics(events),
];
return [
...events,
...metricsets,
...getTransactionMetrics(events),
...getSpanDestinationMetrics(events),
...getBreakdownMetrics(events),
];
});
}
85 changes: 55 additions & 30 deletions packages/elastic-apm-synthtrace/src/scripts/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,6 @@
import datemath from '@elastic/datemath';
import yargs from 'yargs/yargs';
import { cleanWriteTargets } from './utils/clean_write_targets';
import {
bucketSizeOption,
cleanOption,
fileOption,
intervalOption,
targetOption,
workerOption,
logLevelOption,
} from './utils/common_options';
import { intervalToMs } from './utils/interval_to_ms';
import { getCommonResources } from './utils/get_common_resources';
import { startHistoricalDataUpload } from './utils/start_historical_data_upload';
Expand All @@ -28,13 +19,16 @@ yargs(process.argv.slice(2))
'Generate data and index into Elasticsearch',
(y) => {
return y
.positional('file', fileOption)
.option('bucketSize', bucketSizeOption)
.option('workers', workerOption)
.option('interval', intervalOption)
.option('clean', cleanOption)
.option('target', targetOption)
.option('logLevel', logLevelOption)
.positional('file', {
describe: 'File that contains the trace scenario',
demandOption: true,
string: true,
})
.option('target', {
describe: 'Elasticsearch target, including username/password',
demandOption: true,
string: true,
})
.option('from', {
description: 'The start of the time window',
})
Expand All @@ -45,20 +39,47 @@ yargs(process.argv.slice(2))
description: 'Generate and index data continuously',
boolean: true,
})
.option('clean', {
describe: 'Clean APM indices before indexing new data',
default: false,
boolean: true,
})
.option('workers', {
describe: 'Amount of Node.js worker threads',
default: 5,
})
.option('bucketSize', {
describe: 'Size of bucket for which to generate data',
default: '15m',
})
.option('interval', {
describe: 'The interval at which to index data',
default: '10s',
})
.option('clientWorkers', {
describe: 'Number of concurrently connected ES clients',
default: 5,
})
.option('batchSize', {
describe: 'Number of documents per bulk index request',
default: 1000,
})
.option('logLevel', {
describe: 'Log level',
default: 'info',
})
.conflicts('to', 'live');
},
async (argv) => {
const {
scenario,
intervalInMs,
bucketSizeInMs,
target,
workers,
clean,
logger,
writeTargets,
client,
} = await getCommonResources(argv);
const file = String(argv.file || argv._[0]);

const { target, workers, clean, clientWorkers, batchSize } = argv;

const { scenario, intervalInMs, bucketSizeInMs, logger, writeTargets, client, logLevel } =
await getCommonResources({
...argv,
file,
});

if (clean) {
await cleanWriteTargets({ writeTargets, client, logger });
Expand Down Expand Up @@ -91,13 +112,16 @@ yargs(process.argv.slice(2))
startHistoricalDataUpload({
from,
to,
scenario,
intervalInMs,
file,
bucketSizeInMs,
client,
workers,
clientWorkers,
batchSize,
writeTargets,
logger,
logLevel,
target,
});

if (live) {
Expand All @@ -108,7 +132,8 @@ yargs(process.argv.slice(2))
logger,
scenario,
start: to,
workers,
clientWorkers,
batchSize,
writeTargets,
});
}
Expand Down

This file was deleted.

Loading

0 comments on commit 9d9eeb0

Please sign in to comment.