Skip to content

Commit

Permalink
Implement vertx-kafka-client instrumentation; batch processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Mateusz Rzeszutek authored and trask committed May 5, 2022
1 parent 2a77003 commit 60bdbd1
Show file tree
Hide file tree
Showing 15 changed files with 725 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.kafka;
package io.opentelemetry.instrumentation.kafka.internal;

import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.kafka;
package io.opentelemetry.instrumentation.kafka.internal;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import java.util.Collections;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
Expand Down Expand Up @@ -118,4 +119,19 @@ public Instrumenter<ReceivedRecords, Void> createConsumerReceiveInstrumenter(
return builder.newConsumerInstrumenter(KafkaConsumerRecordGetter.INSTANCE);
}
}

public Instrumenter<ConsumerRecords<?, ?>, Void> createBatchProcessInstrumenter() {
KafkaBatchProcessAttributesGetter getter = KafkaBatchProcessAttributesGetter.INSTANCE;
MessageOperation operation = MessageOperation.PROCESS;

return Instrumenter.<ConsumerRecords<?, ?>, Void>builder(
openTelemetry,
instrumentationName,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addSpanLinksExtractor(
new KafkaBatchProcessSpanLinksExtractor(openTelemetry.getPropagators()))
.setErrorCauseExtractor(errorCauseExtractor)
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -19,26 +15,15 @@ public final class SpringKafkaSingletons {

private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-kafka-2.7";

private static final Instrumenter<ConsumerRecords<?, ?>, Void> BATCH_PROCESS_INSTRUMENTER =
buildBatchProcessInstrumenter();
private static final Instrumenter<ConsumerRecord<?, ?>, Void> PROCESS_INSTRUMENTER =
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
.setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE)
.createConsumerProcessInstrumenter();

private static Instrumenter<ConsumerRecords<?, ?>, Void> buildBatchProcessInstrumenter() {
KafkaBatchProcessAttributesGetter getter = KafkaBatchProcessAttributesGetter.INSTANCE;
MessageOperation operation = MessageOperation.PROCESS;

return Instrumenter.<ConsumerRecords<?, ?>, Void>builder(
GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addSpanLinksExtractor(
new KafkaBatchProcessSpanLinksExtractor(GlobalOpenTelemetry.getPropagators()))
.setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE)
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
private static final Instrumenter<ConsumerRecords<?, ?>, Void> BATCH_PROCESS_INSTRUMENTER;
private static final Instrumenter<ConsumerRecord<?, ?>, Void> PROCESS_INSTRUMENTER;

static {
KafkaInstrumenterFactory factory =
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
.setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE);
BATCH_PROCESS_INSTRUMENTER = factory.createBatchProcessInstrumenter();
PROCESS_INSTRUMENTER = factory.createConsumerProcessInstrumenter();
}

public static Instrumenter<ConsumerRecords<?, ?>, Void> batchProcessInstrumenter() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6;

import static io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6.VertxKafkaSingletons.batchProcessInstrumenter;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.vertx.core.Handler;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public final class InstrumentedBatchRecordsHandler<K, V> implements Handler<ConsumerRecords<K, V>> {

private final VirtualField<ConsumerRecords<K, V>, Context> receiveContextField;
@Nullable private final Handler<ConsumerRecords<K, V>> delegate;

public InstrumentedBatchRecordsHandler(
VirtualField<ConsumerRecords<K, V>, Context> receiveContextField,
@Nullable Handler<ConsumerRecords<K, V>> delegate) {
this.receiveContextField = receiveContextField;
this.delegate = delegate;
}

@Override
public void handle(ConsumerRecords<K, V> records) {
Context parentContext = getParentContext(records);

if (!batchProcessInstrumenter().shouldStart(parentContext, records)) {
callDelegateHandler(records);
return;
}

// the instrumenter iterates over records when adding links, we need to suppress that
boolean previousWrappingEnabled = KafkaClientsConsumerProcessTracing.setEnabled(false);

Context context = batchProcessInstrumenter().start(parentContext, records);
Throwable error = null;
try (Scope ignored = context.makeCurrent()) {
callDelegateHandler(records);
} catch (Throwable t) {
error = t;
throw t;
} finally {
batchProcessInstrumenter().end(context, records, null, error);
KafkaClientsConsumerProcessTracing.setEnabled(previousWrappingEnabled);
}
}

private Context getParentContext(ConsumerRecords<K, V> records) {
Context receiveContext = receiveContextField.get(records);

// use the receive CONSUMER span as parent if it's available
return receiveContext != null ? receiveContext : Context.current();
}

private void callDelegateHandler(ConsumerRecords<K, V> records) {
if (delegate != null) {
delegate.handle(records);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public void handle(ConsumerRecord<K, V> record) {
}
}

private Context getParentContext(ConsumerRecord<K, V> records) {
Context receiveContext = receiveContextField.get(records);
private Context getParentContext(ConsumerRecord<K, V> record) {
Context receiveContext = receiveContextField.get(record);

// use the receive CONSUMER span as parent if it's available
return receiveContext != null ? receiveContext : Context.current();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ public static class BatchHandlerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static <K, V> void onEnter(
@Advice.Argument(value = 0, readOnly = false) Handler<ConsumerRecords<K, V>> handler) {
// TODO: next PR

VirtualField<ConsumerRecords<K, V>, Context> receiveContextField =
VirtualField.find(ConsumerRecords.class, Context.class);
handler = new InstrumentedBatchRecordsHandler<>(receiveContextField, handler);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,25 @@
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public final class VertxKafkaSingletons {

private static final String INSTRUMENTATION_NAME = "io.opentelemetry.vertx-kafka-client-3.5";

private static final Instrumenter<ConsumerRecord<?, ?>, Void> PROCESS_INSTRUMENTER =
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
.createConsumerProcessInstrumenter();
private static final Instrumenter<ConsumerRecords<?, ?>, Void> BATCH_PROCESS_INSTRUMENTER;
private static final Instrumenter<ConsumerRecord<?, ?>, Void> PROCESS_INSTRUMENTER;

static {
KafkaInstrumenterFactory factory =
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME);
BATCH_PROCESS_INSTRUMENTER = factory.createBatchProcessInstrumenter();
PROCESS_INSTRUMENTER = factory.createConsumerProcessInstrumenter();
}

public static Instrumenter<ConsumerRecords<?, ?>, Void> batchProcessInstrumenter() {
return BATCH_PROCESS_INSTRUMENTER;
}

public static Instrumenter<ConsumerRecord<?, ?>, Void> processInstrumenter() {
return PROCESS_INSTRUMENTER;
Expand Down
Loading

0 comments on commit 60bdbd1

Please sign in to comment.