Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added instrumentation for kafkajs.Kafka.consumer #2244

Merged
merged 2 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/instrumentation/kafkajs.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@
*/

'use strict'

module.exports = require('./kafkajs/index')
101 changes: 101 additions & 0 deletions lib/instrumentation/kafkajs/consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2024 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'
const { kafkaCtx } = require('../../symbols')
const { MessageSpec, MessageSubscribeSpec, RecorderSpec } = require('../../shim/specs')
const { DESTINATIONS } = require('../../config/attribute-filter')
const CONSUMER_METHODS = [
'connect',
'disconnect',
'subscribe',
'stop',
'commitOffsets',
'seek',
'pause',
'resume'
]
const SEGMENT_PREFIX = 'kafkajs.Kafka.consumer#'

module.exports = function instrumentConsumer({ shim, kafkajs }) {
shim.wrap(kafkajs.Kafka.prototype, 'consumer', function wrapConsumer(shim, orig) {
return function wrappedConsumer() {
const args = shim.argsToArray.apply(shim, arguments)
const consumer = orig.apply(this, args)
consumer.on(consumer.events.REQUEST, function listener(data) {
// storing broker for when we add `host`, `port` to messaging spans
consumer[kafkaCtx] = {
clientId: data?.payload?.clientId,
broker: data?.payload.broker
}
})
shim.record(consumer, CONSUMER_METHODS, function wrapper(shim, fn, name) {
return new RecorderSpec({
name: `${SEGMENT_PREFIX}${name}`,
promise: true
})
})
shim.recordSubscribedConsume(
consumer,
'run',
new MessageSubscribeSpec({
name: `${SEGMENT_PREFIX}#run`,
destinationType: shim.TOPIC,
promise: true,
consumer: shim.FIRST,
functions: ['eachMessage'],
messageHandler: handler.bind(null, consumer)
})
)
return consumer
}
})
}

/**
* Message handler that extracts the topic and headers from message being consumed.
*
* This also sets some metrics for byte length of message, and number of messages.
* Lastly, adds tx attributes for byteCount and clientId
*
* @param {object} consumer the instance of kafka consumer
* @param {MessageShim} shim instance of shim
* @param {Array} args arguments passed to the `eachMessage` function of the `consumer.run`
* @returns {MessageSpec} spec for message handling
*/
function handler(consumer, shim, args) {
const [data] = args
const { topic } = data
const segment = shim.getActiveSegment()

if (segment?.transaction) {
const tx = segment.transaction
const byteLength = data?.message.value?.byteLength
const metricPrefix = `Message/Kafka/Topic/Named/${topic}/Received`
// This will always be 1
tx.metrics.getOrCreateMetric(`${metricPrefix}/Messages`).recordValue(1)
if (byteLength) {
tx.metrics.measureBytes(`${metricPrefix}/Bytes`, byteLength)
tx.trace.attributes.addAttribute(
DESTINATIONS.TRANS_SCOPE,
'kafka.consume.byteCount',
byteLength
)
}
if (consumer?.[kafkaCtx]) {
tx.trace.attributes.addAttribute(
DESTINATIONS.TRANS_EVENT,
'kafka.consume.client_id',
consumer[kafkaCtx].clientId
)
}
}

return new MessageSpec({
destinationType: `Topic/Consume`,
destinationName: data?.topic,
headers: data?.message?.headers
})
}
4 changes: 2 additions & 2 deletions lib/instrumentation/kafkajs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
'use strict'

const instrumentProducer = require('./producer')
const instrumentConsumer = require('./consumer')

// eslint-disable-next-line no-unused-vars
module.exports = function initialize(agent, kafkajs, _moduleName, shim) {
if (agent.config.feature_flag.kafkajs_instrumentation === false) {
shim.logger.debug(
Expand All @@ -17,6 +17,6 @@ module.exports = function initialize(agent, kafkajs, _moduleName, shim) {
}

shim.setLibrary(shim.KAFKA)

instrumentConsumer({ shim, kafkajs })
instrumentProducer({ shim, kafkajs })
}
2 changes: 2 additions & 0 deletions lib/instrumentation/kafkajs/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ module.exports = function instrumentProducer({ shim, kafkajs }) {
}

return new MessageSpec({
promise: true,
destinationName: data.topic,
destinationType: shim.TOPIC,
headers: firstMessage.headers
Expand All @@ -45,6 +46,7 @@ module.exports = function instrumentProducer({ shim, kafkajs }) {
}

return new MessageSpec({
promise: true,
destinationName: data.topicMessages[0].topic,
destinationType: shim.TOPIC,
headers: firstMessage.headers
Expand Down
11 changes: 10 additions & 1 deletion lib/shim/message-shim/subscribe-consume.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,22 @@ function createSubscriberWrapper({ shim, fn, spec, destNameIsArg }) {
}
}

if (consumerIdx !== null) {
if (consumerIdx !== null && !spec.functions) {
bizob2828 marked this conversation as resolved.
Show resolved Hide resolved
args[consumerIdx] = shim.wrap(
args[consumerIdx],
makeWrapConsumer({ spec, queue, destinationName, destNameIsArg })
)
}

if (consumerIdx !== null && spec.functions) {
bizob2828 marked this conversation as resolved.
Show resolved Hide resolved
spec.functions.forEach((fn) => {
args[consumerIdx][fn] = shim.wrap(
args[consumerIdx][fn],
makeWrapConsumer({ spec, queue, destinationName, destNameIsArg })
)
})
}

return fn.apply(this, args)
}
}
Expand Down
25 changes: 25 additions & 0 deletions lib/shim/specs/message-subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,30 @@ class MessageSubscribeSpec extends MessageSpec {
*/
consumer

/**
* Indicates names of functions to be wrapped for message consumption.
* This must be used in tandem with consumer.
* @type {Array<string>|null}
* @example
* // Wrap the eachMessage method on a consumer
* class Consumer() {
* constructor() {}
* async run(consumer) {
* consumer.eachMessage({ message })
* }
* }
*
* const spec = new MessageSubscribeSpec({
* name: 'Consumer#run'
* promise: true
* consumer: shim.FIRST,
* functions: ['eachMessage']
* })
*
* shim.recordSubscribedConsume(Consumer.prototype, 'run', spec)
*/
functions

/* eslint-disable jsdoc/require-param-description */
/**
* @param {MessageSubscribeSpecParams} params
Expand All @@ -35,6 +59,7 @@ class MessageSubscribeSpec extends MessageSpec {
super(params)

this.consumer = params.consumer ?? null
this.functions = Array.isArray(params.functions) ? params.functions : null
}
}

Expand Down
1 change: 1 addition & 0 deletions lib/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module.exports = {
databaseName: Symbol('databaseName'),
disableDT: Symbol('Disable distributed tracing'), // description for backwards compatibility
executorContext: Symbol('executorContext'),
kafkaCtx: Symbol('kafkaCtx'),
koaBody: Symbol('body'),
koaBodySet: Symbol('bodySet'),
koaRouter: Symbol('koaRouter'),
Expand Down
9 changes: 8 additions & 1 deletion lib/transaction/tracecontext.js
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ class TraceContext {
return traceParentInfo
}

if (Buffer.isBuffer(traceparent)) {
traceparent = traceparent.toString()
}
const trimmed = traceparent.trim()
const parts = trimmed.split('-')

Expand Down Expand Up @@ -445,10 +448,14 @@ class TraceContext {
}

_parseTraceState(params) {
const { tracestate, hasTrustKey, expectedNrKey } = params
const { hasTrustKey, expectedNrKey } = params
let { tracestate } = params
let nrTraceStateValue = null
const finalListMembers = []
const vendors = []
if (Buffer.isBuffer(tracestate)) {
tracestate = tracestate.toString()
}
const incomingListMembers = tracestate.split(',')
for (let i = 0; i < incomingListMembers.length; i++) {
const listMember = incomingListMembers[i].trim()
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@
"scripts": {
"bench": "node ./bin/run-bench.js",
"docker-env": "./bin/docker-env-vars.sh",
"docs": "npm ci && jsdoc -c ./jsdoc-conf.json --private -r .",
"docs": "rm -rf ./out && jsdoc -c ./jsdoc-conf.jsonc --private -r .",
"integration": "npm run prepare-test && npm run sub-install && time c8 -o ./coverage/integration tap --test-regex='(\\/|^test\\/integration\\/.*\\.tap\\.js)$' --timeout=600 --no-coverage --reporter classic",
"integration:esm": "time c8 -o ./coverage/integration-esm tap --node-arg='--loader=./esm-loader.mjs' --test-regex='(test\\/integration\\/.*\\.tap\\.mjs)$' --timeout=600 --no-coverage --reporter classic",
"prepare-test": "npm run ssl && npm run docker-env",
Expand Down
31 changes: 31 additions & 0 deletions test/unit/distributed_tracing/tracecontext.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ tap.test('TraceContext', function (t) {
t.equal(traceContext._validateAndParseTraceParentHeader(shorterStr).entryValid, false)
t.end()
})

t.test('should handle if traceparent is a buffer', (t) => {
const { traceContext } = t.context
const traceparent = '00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00'
const bufferTraceParent = Buffer.from(traceparent, 'utf8')
t.ok(traceContext._validateAndParseTraceParentHeader(bufferTraceParent).entryValid)
t.end()
})
})

t.test('_validateAndParseTraceStateHeader', (t) => {
Expand Down Expand Up @@ -283,6 +291,29 @@ tap.test('TraceContext', function (t) {
t.end()
})

t.test('should pass a valid tracestate header if a buffer', (t) => {
const { agent, traceContext } = t.context
agent.config.trusted_account_key = '190'
const goodTraceStateHeader =
/* eslint-disable-next-line max-len */
'190@nr=0-0-709288-8599547-f85f42fd82a4cf1d-164d3b4b0d09cb05-1-0.789-1563574856827,234234@foo=bar'
const bufferTraceState = Buffer.from(goodTraceStateHeader, 'utf8')
const valid = traceContext._validateAndParseTraceStateHeader(bufferTraceState)
t.ok(valid)
t.equal(valid.entryFound, true)
t.equal(valid.entryValid, true)
t.equal(valid.intrinsics.version, 0)
t.equal(valid.intrinsics.parentType, 'App')
t.equal(valid.intrinsics.accountId, '709288')
t.equal(valid.intrinsics.appId, '8599547')
t.equal(valid.intrinsics.spanId, 'f85f42fd82a4cf1d')
t.equal(valid.intrinsics.transactionId, '164d3b4b0d09cb05')
t.equal(valid.intrinsics.sampled, true)
t.equal(valid.intrinsics.priority, 0.789)
t.equal(valid.intrinsics.timestamp, 1563574856827)
t.end()
})

t.test('should fail mismatched trusted account ID in tracestate header', (t) => {
const { agent, traceContext } = t.context
agent.config.trusted_account_key = '666'
Expand Down
28 changes: 28 additions & 0 deletions test/unit/shim/message-shim.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1170,5 +1170,33 @@ tap.test('MessageShim', function (t) {
t.ok(parent)
})
})

t.test('should wrap object key of consumer', function (t) {
t.plan(3)
const message = { foo: 'bar' }
const subscriber = function subscriber(consumer) {
consumer.eachMessage(message)
}
const wrapped = shim.recordSubscribedConsume(subscriber, {
name: 'Channel#subscribe',
consumer: shim.FIRST,
functions: ['eachMessage'],
messageHandler: function (shim, args) {
t.same(args[0], message)
return {
destinationName: 'exchange.foo',
destinationType: shim.EXCHANGE
}
}
})
wrapped({
eachMessage: function consumer(msg) {
const segment = shim.getSegment()
t.equal(segment.name, 'OtherTransaction/Message/RabbitMQ/Exchange/Named/exchange.foo')
t.equal(msg, message)
t.end()
}
})
})
})
})
14 changes: 14 additions & 0 deletions test/unit/shim/shim.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3149,4 +3149,18 @@ tap.test('Shim', function (t) {
t.ok(shim.specs.params.QueueMessageParameters)
t.end()
})

t.test('should not use functions in MessageSubscribeSpec if it is not an array', (t) => {
const agent = helper.loadMockedAgent()
t.teardown(() => {
helper.unloadAgent(agent)
})

const shim = new Shim(agent, 'test-mod')
const spec = new shim.specs.MessageSubscribeSpec({
functions: 'foo-bar'
})
t.notOk(spec.functions)
t.end()
})
})
Loading
Loading