Skip to content

Commit

Permalink
feat(winston): add ECS tracing fields if APM is in use (#39)
Browse files Browse the repository at this point in the history
Refs: #35
  • Loading branch information
trentm authored Jan 20, 2021
1 parent f777883 commit 09edd01
Show file tree
Hide file tree
Showing 10 changed files with 296 additions and 18 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,5 @@ typings/
.next

.ecs
.DS_Store
/tmp
7 changes: 5 additions & 2 deletions loggers/winston/benchmarks/bench-one
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ fi
set -o errexit
set -o pipefail

TOP=$(cd "$(dirname "$0")/.." >/dev/null && pwd)
AUTOCANNON=$TOP/node_modules/.bin/autocannon

echo "## Benchmarking `basename $1`"
echo
echo "node: $(node --version)"
Expand All @@ -29,7 +32,7 @@ server_pid=$!

echo
echo "Warmup 5s run"
autocannon -c 100 -d 5 -p 10 localhost:3000 > /dev/null 2>&1
$AUTOCANNON -c 100 -d 5 -p 10 localhost:3000 > /dev/null 2>&1
# Reported longer run (to attempt to get stddev down)
autocannon -c 100 -d 30 -p 10 localhost:3000
$AUTOCANNON -c 100 -d 30 -p 10 localhost:3000
echo
1 change: 0 additions & 1 deletion loggers/winston/examples/.npmrc

This file was deleted.

46 changes: 46 additions & 0 deletions loggers/winston/examples/http-with-elastic-apm.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

'use strict'

// This example shows how @elastic/ecs-winston-format logging will integrate
// with Elastic APM (https://www.elastic.co/apm).
//
// If usage of Elastic APM is detected (i.e. the "elastic-apm-node" package
// is being used), then log records will include trace identifiers, e.g.:
// "trace": { "id": "678f2a0189f259baf2ea17db8af5a4d0" },
// "transaction": { "id": "1cc6339964575165" },
// "span": { "id": "f72c52ceda81777a" },
// to correlate log and trace data in Kibana.

/* eslint-disable-next-line no-unused-vars */
const apm = require('elastic-apm-node').start({
serviceName: 'http-with-elastic-apm',
centralConfig: false,
captureExceptions: false,
metricsInterval: 0
})

const http = require('http')
const winston = require('winston')
const ecsFormat = require('../') // @elastic/ecs-winston-format

const logger = winston.createLogger({
level: 'info',
format: ecsFormat({ convertReqRes: true }),
transports: [
new winston.transports.Console()
]
})

const server = http.createServer(handler)
server.listen(3000, () => {
logger.info('listening at http://localhost:3000')
})

function handler (req, res) {
res.setHeader('Foo', 'Bar')
res.end('ok')
logger.info('handled request', { req, res })
}
10 changes: 0 additions & 10 deletions loggers/winston/examples/package.json

This file was deleted.

27 changes: 27 additions & 0 deletions loggers/winston/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ const {
formatHttpResponse
} = require('@elastic/ecs-helpers')

// We will query the Elastic APM agent if it is available.
let elasticApm = null
try {
elasticApm = require('elastic-apm-node')
} catch (ex) {
// Silently ignore.
}

const reservedFields = {
level: true,
'log.level': true,
Expand All @@ -31,6 +39,25 @@ function ecsTransform (info, opts) {
ecs: { version }
}

// https://www.elastic.co/guide/en/ecs/current/ecs-tracing.html
// istanbul ignore else
if (elasticApm) {
const tx = elasticApm.currentTransaction
if (tx) {
ecsFields.trace = ecsFields.trace || {}
ecsFields.trace.id = tx.traceId
ecsFields.transaction = ecsFields.transaction || {}
ecsFields.transaction.id = tx.id
const span = elasticApm.currentSpan
// istanbul ignore else
if (span) {
ecsFields.span = ecsFields.span || {}
ecsFields.span.id = span.id
}
}
}

// https://www.elastic.co/guide/en/ecs/current/ecs-http.html
if (info.req) {
if (opts.convertReqRes) {
formatHttpRequest(ecsFields, info.req)
Expand Down
9 changes: 6 additions & 3 deletions loggers/winston/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,23 @@
"homepage": "https://github.com/elastic/ecs-logging-js/blob/master/loggers/winston/README.md",
"scripts": {
"bench": "./benchmarks/bench",
"test": "standard && tap --100 --timeout 5"
"test": "standard && tap --100 --timeout 5 test/*.test.js"
},
"engines": {
"node": ">=10"
},
"dependencies": {
"@elastic/ecs-helpers": "^0.3.0"
"@elastic/ecs-helpers": "^0.4.0"
},
"devDependencies": {
"ajv": "^6.11.0",
"autocannon": "^7.0.1",
"elastic-apm-node": "^3.10.0",
"express": "^4.17.1",
"split2": "^3.2.2",
"standard": "^14.3.1",
"tap": "^14.x",
"winston": "^3.2.1"
"winston": "^3.3.3"
},
"standard": {
"ignore": [
Expand Down
152 changes: 152 additions & 0 deletions loggers/winston/test/apm.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

'use strict'

// Test integration with Elastic APM.

const http = require('http')
const path = require('path')
const { spawn } = require('child_process')
const zlib = require('zlib')

const Ajv = require('ajv')
const split = require('split2')
const test = require('tap').test

const ajv = Ajv({
allErrors: true,
verbose: true,
format: 'full'
})
const validate = ajv.compile(require('../../../utils/schema.json'))

test('tracing integration works', t => {
let apmServer
let app
const traceObjs = []
const logObjs = []
let stderr = ''

// 1. Setup a mock APM server to accept trace data. Callback when listening.
// Pass intake data to `collectTracesLogsAndCheck()`.
function step1StartMockApmServer (cb) {
apmServer = http.createServer(function apmServerReq (req, res) {
t.equal(req.method, 'POST')
t.equal(req.url, '/intake/v2/events')
var instream = req
if (req.headers['content-encoding'] === 'gzip') {
instream = req.pipe(zlib.createGunzip())
} else {
instream.setEncoding('utf8')
}
instream.pipe(split(JSON.parse)).on('data', function (traceObj) {
collectTracesLogsAndCheck(traceObj, null)
})
req.on('end', function () {
res.end('ok')
})
})
apmServer.listen(0, function () {
cb(null, 'http://localhost:' + apmServer.address().port)
})
}

// 2. Start a test app that uses APM and our mock APM Server.
// Callback on first log line, which includes the app's HTTP address.
// Pass parsed JSON log records to `collectTracesLogsAndCheck()`.
function step2StartApp (apmServerUrl, cb) {
app = spawn(
process.execPath,
[
path.join(__dirname, 'serve-one-http-req-with-apm.js'),
apmServerUrl
]
)
let handledFirstLogLine = false
app.stdout.pipe(split(JSON.parse)).on('data', function (logObj) {
if (!handledFirstLogLine) {
handledFirstLogLine = true
t.equal(logObj.message, 'listening')
t.ok(logObj.address, 'first listening log line has "address"')
cb(null, logObj.address)
} else {
collectTracesLogsAndCheck(null, logObj)
}
})
app.stderr.on('data', function (chunk) {
stderr += chunk
})
app.on('close', function (code) {
t.equal(stderr, '', 'empty stderr from app')
t.equal(code, 0, 'app exited 0')
})
}

// 3. Call the test app to generate a trace.
function step3CallApp (appUrl, cb) {
const req = http.request(appUrl + '/', function (res) {
res.on('data', function () {})
res.on('end', cb)
})
req.on('error', cb)
req.end()
}

// 4. Collect trace data from the APM Server, log data from the app, and when
// all the expected data is collected, then test it: assert matching tracing
// IDs.
function collectTracesLogsAndCheck (traceObj, logObj) {
if (traceObj) {
traceObjs.push(traceObj)
}
if (logObj) {
t.ok(validate(logObj), 'logObj is ECS valid')
logObjs.push(logObj)
}
if (traceObjs.length >= 3 && logObjs.length >= 1) {
t.ok(traceObjs[0].metadata, 'traceObjs[0] is metadata')
t.ok(traceObjs[1].transaction, 'traceObjs[1] is transaction')
t.ok(traceObjs[2].span, 'traceObjs[2] is span')
const span = traceObjs[2].span
t.equal(logObjs[0].trace.id, span.trace_id, 'trace.id matches')
t.equal(logObjs[0].transaction.id, span.transaction_id, 'transaction.id matches')
t.equal(logObjs[0].span.id, span.id, 'span.id matches')
finish()
}
}

function finish () {
app.on('close', function () {
apmServer.close(function () {
t.end()
})
})
}

step1StartMockApmServer(function onListening (apmServerErr, apmServerUrl) {
t.ifErr(apmServerErr)
if (apmServerErr) {
finish()
return
}
t.ok(apmServerUrl, 'apmServerUrl: ' + apmServerUrl)

step2StartApp(apmServerUrl, function onReady (appErr, appUrl) {
t.ifErr(appErr)
if (appErr) {
finish()
return
}
t.ok(appUrl, 'appUrl: ' + appUrl)

step3CallApp(appUrl, function (clientErr) {
t.ifErr(clientErr)

// The thread of control now is expected to be in
// `collectTracesLogsAndCheck()`.
})
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ const { MESSAGE } = require('triple-beam')
const Ajv = require('ajv')
const { version } = require('@elastic/ecs-helpers')

const ecsFormat = require('./')
const ecsFormat = require('../')

const ajv = Ajv({
allErrors: true,
verbose: true,
format: 'full'
})
const validate = ajv.compile(require('../../utils/schema.json'))
const validate = ajv.compile(require('../../../utils/schema.json'))

// Winston transport to capture logged records. Parsed JSON records are on
// `.records`. Raw records (what Winston calls `info` objects) are on `.infos`.
Expand Down
56 changes: 56 additions & 0 deletions loggers/winston/test/serve-one-http-req-with-apm.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

'use strict'

// This script is used to test @elastic/ecs-pino-format + APM.
//
// It will:
// - configure APM using the given APM server url (first arg)
// - start an HTTP server
// - log once when it is listening (with its address)
// - handle a single HTTP request
// - log that request
// - flush APM (i.e. ensure it has sent its data to its configured APM server)
// - exit

const serverUrl = process.argv[2]
/* eslint-disable-next-line no-unused-vars */
const apm = require('elastic-apm-node').start({
serverUrl,
serviceName: 'ecs-winston-format-test-apm',
centralConfig: false,
captureExceptions: false,
metricsInterval: 0
})

const http = require('http')
const ecsFormat = require('../') // @elastic/ecs-winston-format
const winston = require('winston')

const log = winston.createLogger({
level: 'info',
format: ecsFormat({ convertReqRes: true }),
transports: [
new winston.transports.Console()
]
})

const server = http.createServer()

server.once('request', function handler (req, res) {
var span = apm.startSpan('auth')
setImmediate(function doneAuth () {
span.end()
res.end('ok')
log.info('handled request', { req, res })
apm.flush(function onFlushed () {
server.close()
})
})
})

server.listen(0, () => {
log.info('listening', { address: `http://localhost:${server.address().port}` })
})

0 comments on commit 09edd01

Please sign in to comment.