diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/main/java/io/micrometer/tracing/brave/bridge/BraveBaggageInScope.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/main/java/io/micrometer/tracing/brave/bridge/BraveBaggageInScope.java index 97ddffc2..c2284b0f 100755 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/main/java/io/micrometer/tracing/brave/bridge/BraveBaggageInScope.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/main/java/io/micrometer/tracing/brave/bridge/BraveBaggageInScope.java @@ -16,6 +16,9 @@ package io.micrometer.tracing.brave.bridge; import brave.baggage.BaggageField; +import io.micrometer.common.lang.Nullable; +import io.micrometer.common.util.internal.logging.InternalLogger; +import io.micrometer.common.util.internal.logging.InternalLoggerFactory; import io.micrometer.tracing.Baggage; import io.micrometer.tracing.BaggageInScope; import io.micrometer.tracing.TraceContext; @@ -28,10 +31,19 @@ */ class BraveBaggageInScope implements Baggage, BaggageInScope { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(BraveBaggageInScope.class); + private final BaggageField delegate; - BraveBaggageInScope(BaggageField delegate) { + private final String previousBaggage; + + @Nullable + private brave.propagation.TraceContext traceContext; + + BraveBaggageInScope(BaggageField delegate, @Nullable brave.propagation.TraceContext traceContext) { this.delegate = delegate; + this.traceContext = traceContext; + this.previousBaggage = delegate.getValue(); } @Override @@ -41,7 +53,7 @@ public String name() { @Override public String get() { - return this.delegate.getValue(); + return this.traceContext != null ? this.delegate.getValue(traceContext) : this.delegate.getValue(); } @Override @@ -52,17 +64,34 @@ public String get(TraceContext traceContext) { @Override @Deprecated public Baggage set(String value) { - this.delegate.updateValue(value); + if (this.traceContext != null) { + this.delegate.updateValue(this.traceContext, value); + } + else { + this.delegate.updateValue(value); + } return this; } @Override @Deprecated public Baggage set(TraceContext traceContext, String value) { - this.delegate.updateValue(BraveTraceContext.toBrave(traceContext), value); + brave.propagation.TraceContext braveContext = updateBraveTraceContext(traceContext); + this.delegate.updateValue(braveContext, value); return this; } + private brave.propagation.TraceContext updateBraveTraceContext(TraceContext traceContext) { + brave.propagation.TraceContext braveContext = BraveTraceContext.toBrave(traceContext); + if (this.traceContext != braveContext) { + logger.debug( + "Create on baggage was called on <{}> but now you want to set baggage on <{}>. That's unexpected.", + this.traceContext, traceContext); + this.traceContext = braveContext; + } + return braveContext; + } + @Override public BaggageInScope makeCurrent() { return this; @@ -70,18 +99,27 @@ public BaggageInScope makeCurrent() { @Override public BaggageInScope makeCurrent(String value) { - this.delegate.updateValue(value); + if (this.traceContext != null) { + this.delegate.updateValue(this.traceContext, value); + } + else { + this.delegate.updateValue(value); + } return makeCurrent(); } @Override public BaggageInScope makeCurrent(TraceContext traceContext, String value) { - this.delegate.updateValue(BraveTraceContext.toBrave(traceContext), value); + brave.propagation.TraceContext braveContext = updateBraveTraceContext(traceContext); + this.delegate.updateValue(braveContext, value); return makeCurrent(); } @Override public void close() { + if (this.traceContext != null) { + this.delegate.updateValue(this.traceContext, this.previousBaggage); + } } } diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/main/java/io/micrometer/tracing/brave/bridge/BraveBaggageManager.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/main/java/io/micrometer/tracing/brave/bridge/BraveBaggageManager.java index 0ffe0532..2c563ecf 100755 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/main/java/io/micrometer/tracing/brave/bridge/BraveBaggageManager.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/main/java/io/micrometer/tracing/brave/bridge/BraveBaggageManager.java @@ -15,16 +15,16 @@ */ package io.micrometer.tracing.brave.bridge; +import java.io.Closeable; +import java.util.Map; + +import brave.Tracing; import brave.baggage.BaggageField; import io.micrometer.tracing.Baggage; import io.micrometer.tracing.BaggageInScope; import io.micrometer.tracing.BaggageManager; import io.micrometer.tracing.TraceContext; -import java.io.Closeable; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - /** * Brave implementation of a {@link BaggageManager}. * @@ -33,13 +33,19 @@ */ public class BraveBaggageManager implements Closeable, BaggageManager { - private static final Map CACHE = new ConcurrentHashMap<>(); - @Override public Map getAllBaggage() { return BaggageField.getAllValues(); } + @Override + public Map getAllBaggage(TraceContext traceContext) { + if (traceContext == null) { + return getAllBaggage(); + } + return BaggageField.getAllValues(BraveTraceContext.toBrave(traceContext)); + } + @Override public Baggage getBaggage(String name) { return createBaggage(name); @@ -51,7 +57,7 @@ public Baggage getBaggage(TraceContext traceContext, String name) { if (baggageField == null) { return null; } - return new BraveBaggageInScope(baggageField); + return new BraveBaggageInScope(baggageField, BraveTraceContext.toBrave(traceContext)); } @Override @@ -60,8 +66,18 @@ public Baggage createBaggage(String name) { return baggage(name); } + private BraveBaggageInScope baggage(String name, TraceContext traceContext) { + return new BraveBaggageInScope(BaggageField.create(name), BraveTraceContext.toBrave(traceContext)); + } + private BraveBaggageInScope baggage(String name) { - return CACHE.computeIfAbsent(name, s -> new BraveBaggageInScope(BaggageField.create(s))); + return new BraveBaggageInScope(BaggageField.create(name), currentTraceContext()); + } + + // Taken from BraveField + private static brave.propagation.TraceContext currentTraceContext() { + Tracing tracing = Tracing.current(); + return tracing != null ? tracing.currentTraceContext().get() : null; } @Override @@ -77,12 +93,12 @@ public BaggageInScope createBaggageInScope(String name, String value) { @Override public BaggageInScope createBaggageInScope(TraceContext traceContext, String name, String value) { - return baggage(name).makeCurrent(traceContext, value); + return baggage(name, traceContext).makeCurrent(traceContext, value); } @Override public void close() { - CACHE.clear(); + // We used to cache baggage fields } } diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/main/java/io/micrometer/tracing/brave/bridge/BraveTracer.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/main/java/io/micrometer/tracing/brave/bridge/BraveTracer.java index a5f43098..d0974f54 100755 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/main/java/io/micrometer/tracing/brave/bridge/BraveTracer.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/main/java/io/micrometer/tracing/brave/bridge/BraveTracer.java @@ -106,6 +106,11 @@ public TraceContext.Builder traceContextBuilder() { return new BraveTraceContextBuilder(); } + @Override + public Map getAllBaggage(TraceContext traceContext) { + return this.braveBaggageManager.getAllBaggage(traceContext); + } + @Override public Map getAllBaggage() { return this.braveBaggageManager.getAllBaggage(); diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/bridge/BaggageTests.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/bridge/BaggageTests.java index f2a12eb9..077e6ac3 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/bridge/BaggageTests.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/bridge/BaggageTests.java @@ -15,6 +15,9 @@ */ package io.micrometer.tracing.brave.bridge; +import java.util.HashMap; +import java.util.Map; + import brave.Tracing; import brave.baggage.BaggageField; import brave.baggage.BaggagePropagation; @@ -26,16 +29,18 @@ import brave.test.TestSpanHandler; import io.micrometer.common.util.internal.logging.InternalLogger; import io.micrometer.common.util.internal.logging.InternalLoggerFactory; -import io.micrometer.tracing.*; +import io.micrometer.context.ContextRegistry; +import io.micrometer.tracing.BaggageInScope; +import io.micrometer.tracing.CurrentTraceContext; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.contextpropagation.ObservationAwareSpanThreadLocalAccessor; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -import java.util.HashMap; -import java.util.Map; - import static org.assertj.core.api.BDDAssertions.then; class BaggageTests { @@ -158,6 +163,7 @@ void injectAndExtractKeepsTheBaggageWithLegacyApi() { @Test void baggageWithContextPropagation() { + ContextRegistry.getInstance().registerThreadLocalAccessor(new ObservationAwareSpanThreadLocalAccessor(tracer)); Hooks.enableAutomaticContextPropagation(); Span span = tracer.nextSpan().start(); @@ -167,20 +173,20 @@ void baggageWithContextPropagation() { then(baggageOutside).isEqualTo(VALUE_1); log.info( "BAGGAGE OUTSIDE OF REACTOR [" + baggageOutside + "], thread [" + Thread.currentThread() + "]"); - Baggage baggageFromReactor = Mono.just(KEY_1) + String baggageFromReactor = Mono.just(KEY_1) .publishOn(Schedulers.boundedElastic()) - .flatMap(s -> Mono.just(this.tracer.getBaggage(s)) + .flatMap(s -> Mono.just(this.tracer.getBaggage(s).get()) .doOnNext(baggage -> log.info("BAGGAGE IN OF REACTOR [" + baggageOutside + "], thread [" + Thread.currentThread() + "]"))) .block(); - then(baggageFromReactor).isNotNull(); - then(baggageFromReactor.get()).isEqualTo(VALUE_1); + then(baggageFromReactor).isEqualTo(VALUE_1); } } } @Test void baggageWithContextPropagationWithLegacyApi() { + ContextRegistry.getInstance().registerThreadLocalAccessor(new ObservationAwareSpanThreadLocalAccessor(tracer)); Hooks.enableAutomaticContextPropagation(); Span span = tracer.nextSpan().start(); @@ -190,14 +196,13 @@ void baggageWithContextPropagationWithLegacyApi() { then(baggageOutside).isEqualTo(VALUE_1); log.info( "BAGGAGE OUTSIDE OF REACTOR [" + baggageOutside + "], thread [" + Thread.currentThread() + "]"); - Baggage baggageFromReactor = Mono.just(KEY_1) + String baggageFromReactor = Mono.just(KEY_1) .publishOn(Schedulers.boundedElastic()) - .flatMap(s -> Mono.just(this.tracer.getBaggage(s)) + .flatMap(s -> Mono.just(this.tracer.getBaggage(s).get()) .doOnNext(baggage -> log.info("BAGGAGE IN OF REACTOR [" + baggageOutside + "], thread [" + Thread.currentThread() + "]"))) .block(); - then(baggageFromReactor).isNotNull(); - then(baggageFromReactor.get()).isEqualTo(VALUE_1); + then(baggageFromReactor).isEqualTo(VALUE_1); } } } diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/bridge/BraveTracingApiTests.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/bridge/BraveTracingApiTests.java index 90adf50b..4e81b033 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/bridge/BraveTracingApiTests.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/bridge/BraveTracingApiTests.java @@ -211,9 +211,8 @@ void should_work_with_baggage_with_legacy_api() { then(tracer.getBaggage("from_span_in_scope 2").get()).as("[Out of scope] Baggage 2").isNull(); then(tracer.getBaggage("from_span").get()).as("[Out of scope] Baggage 3").isNull(); - // You will retrieve the baggage value ALWAYS when you pass the context explicitly - then(tracer.getBaggage("from_span").get(span.context())).as("[Out of scope - with context] Baggage 3") - .isEqualTo("value 3"); + // Baggage is present only within the scope + then(tracer.getBaggage("from_span").get(span.context())).as("[Out of scope - with context] Baggage 3").isNull(); } @Test @@ -256,9 +255,8 @@ void should_work_with_baggage() { then(tracer.getBaggage("from_span_in_scope 2").get()).as("[Out of scope] Baggage 2").isNull(); then(tracer.getBaggage("from_span").get()).as("[Out of scope] Baggage 3").isNull(); - // You will retrieve the baggage value ALWAYS when you pass the context explicitly - then(tracer.getBaggage("from_span").get(span.context())).as("[Out of scope - with context] Baggage 3") - .isEqualTo("value 3"); + // Baggage is present only within the scope + then(tracer.getBaggage("from_span").get(span.context())).as("[Out of scope - with context] Baggage 3").isNull(); } @Test diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/contextpropagation/ScopesTests.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/contextpropagation/ScopesTests.java index 9732292b..27e90819 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/contextpropagation/ScopesTests.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-brave/src/test/java/io/micrometer/tracing/brave/contextpropagation/ScopesTests.java @@ -16,23 +16,25 @@ package io.micrometer.tracing.brave.contextpropagation; import brave.Tracing; -import brave.baggage.BaggageField; -import brave.baggage.CorrelationFlushScopeArrayReader; -import brave.baggage.CorrelationScopeConfig; +import brave.baggage.*; import brave.context.slf4j.MDCScopeDecorator; +import brave.propagation.B3Propagation; import brave.propagation.CurrentTraceContext; import brave.propagation.ThreadLocalCurrentTraceContext; +import brave.sampler.Sampler; import io.micrometer.common.util.internal.logging.InternalLogger; import io.micrometer.common.util.internal.logging.InternalLoggerFactory; import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; +import io.micrometer.tracing.BaggageInScope; import io.micrometer.tracing.Span; import io.micrometer.tracing.Tracer; import io.micrometer.tracing.brave.bridge.BraveBaggageManager; import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext; import io.micrometer.tracing.brave.bridge.BraveTracer; import io.micrometer.tracing.handler.DefaultTracingObservationHandler; +import io.micrometer.tracing.handler.TracingObservationHandler.TracingContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -40,6 +42,7 @@ import reactor.core.publisher.Mono; import reactor.util.context.Context; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.BDDAssertions.then; @@ -56,7 +59,15 @@ class ScopesTests { .build()) .build(); - Tracing tracing = Tracing.newBuilder().currentTraceContext(currentTraceContext).build(); + Tracing tracing = Tracing.newBuilder() + .currentTraceContext(currentTraceContext) + .supportsJoin(false) + .traceId128Bit(true) + .propagationFactory(BaggagePropagation.newFactoryBuilder(B3Propagation.FACTORY) + .add(BaggagePropagationConfig.SingleBaggageField.remote(BaggageField.create("foo"))) + .build()) + .sampler(Sampler.ALWAYS_SAMPLE) + .build(); brave.Tracer braveTracer = tracing.tracer(); @@ -133,6 +144,66 @@ void should_open_and_close_scopes_with_reactor() { logger.info("SPAN AFTER CLOSE 1 [" + tracer.currentSpan() + "]"); } + @Test + void should_open_and_close_scopes_with_reactor_with_baggage() { + Observation obs1 = Observation.start("1", observationRegistry); + + Observation.Scope scope = obs1.openScope(); + Span span1 = tracer.currentSpan(); + BaggageInScope baggageInScope1 = tracer.createBaggageInScope("foo", "span1"); + then(tracer.getAllBaggage().get("foo")).isEqualTo("span1"); + then(tracer.getAllBaggage(span1.context()).get("foo")).isEqualTo("span1"); + + Observation obs2 = Observation.start("2", observationRegistry); + Observation.Scope scope2 = obs2.openScope(); + Span span2 = tracer.currentSpan(); + BaggageInScope baggageInScope2 = tracer.createBaggageInScope(span2.context(), "foo", "span2"); + + then(tracer.getAllBaggage().get("foo")).isEqualTo("span2"); + then(tracer.getAllBaggage(span1.context()).get("foo")).isEqualTo("span1"); + then(tracer.getAllBaggage(span2.context()).get("foo")).isEqualTo("span2"); + + AtomicReference errorInFlatMap = new AtomicReference<>(); + AtomicReference errorInOnNext = new AtomicReference<>(); + + Mono.just(1).flatMap(integer -> { + return Mono.just(2).doOnNext(integer1 -> { + Map baggageInEmpty = tracer.getAllBaggage(); + logger.info("\n\n[2] BAGGAGE IN EMPTY [" + baggageInEmpty + "]"); + assertBaggageInReactor(errorInFlatMap, baggageInEmpty, null); + }).contextWrite(context -> Context.empty()); + }).doOnNext(integer -> { + Map baggageInOnNext = tracer.getAllBaggage(); + logger.info("\n\n[1] SPAN IN ON NEXT [" + baggageInOnNext + "]"); + assertBaggageInReactor(errorInOnNext, baggageInOnNext, "span2"); + }).contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, obs2)).block(); + + logger.info("Checking if there were no errors in reactor"); + then(errorInFlatMap).hasValue(null); + then(errorInOnNext).hasValue(null); + + then(tracer.getBaggage(span2.context(), "foo").get()).isEqualTo("span2"); + then(tracer.getBaggage(span1.context(), "foo").get()).isEqualTo("span1"); + + baggageInScope2.close(); + scope2.close(); + obs2.stop(); + TracingContext tracingContext2 = obs2.getContext().get(TracingContext.class); + then(tracingContext2.getBaggage()).isNullOrEmpty(); + + then(tracer.getBaggage("foo").get()).isEqualTo("span1"); + then(tracer.getBaggage(span1.context(), "foo").get()).isEqualTo("span1"); + then(tracer.getBaggage(span2.context(), "foo").get()).isEqualTo("span1"); + + baggageInScope1.close(); + scope.close(); + obs1.stop(); + TracingContext tracingContext1 = obs1.getContext().get(TracingContext.class); + then(tracingContext1.getBaggage()).isNullOrEmpty(); + + then(tracer.getAllBaggage()).isEmpty(); + } + private static void assertInReactor(AtomicReference errors, Span spanWOnNext, Span expectedSpan) { try { then(spanWOnNext).isEqualTo(expectedSpan); @@ -142,4 +213,14 @@ private static void assertInReactor(AtomicReference errors, Span } } + private static void assertBaggageInReactor(AtomicReference errors, Map baggageMap, + String expectedValue) { + try { + then(baggageMap.get("foo")).isEqualTo(expectedValue); + } + catch (AssertionError er) { + errors.set(er); + } + } + } diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelBaggageInScope.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelBaggageInScope.java index 5b5783bb..64331421 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelBaggageInScope.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelBaggageInScope.java @@ -15,6 +15,12 @@ */ package io.micrometer.tracing.otel.bridge; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +import io.micrometer.common.util.internal.logging.InternalLogger; +import io.micrometer.common.util.internal.logging.InternalLoggerFactory; import io.micrometer.tracing.BaggageInScope; import io.micrometer.tracing.CurrentTraceContext; import io.micrometer.tracing.TraceContext; @@ -24,9 +30,6 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; - /** * OpenTelemetry implementation of a {@link BaggageInScope}. * @@ -35,6 +38,8 @@ */ class OtelBaggageInScope implements io.micrometer.tracing.Baggage, BaggageInScope { + private static final InternalLogger log = InternalLoggerFactory.getInstance(OtelBaggageInScope.class); + private final OtelBaggageManager otelBaggageManager; private final CurrentTraceContext currentTraceContext; @@ -43,6 +48,10 @@ class OtelBaggageInScope implements io.micrometer.tracing.Baggage, BaggageInScop private final AtomicReference entry = new AtomicReference<>(); + private final AtomicReference contextWithoutBaggage = new AtomicReference<>(null); + + private final AtomicReference mutatedTraceContext = new AtomicReference<>(null); + private final AtomicReference contextWithBaggage = new AtomicReference<>(null); private final AtomicReference scope = new AtomicReference<>(); @@ -51,6 +60,16 @@ class OtelBaggageInScope implements io.micrometer.tracing.Baggage, BaggageInScop List tagFields, Entry entry) { this.otelBaggageManager = otelBaggageManager; this.currentTraceContext = currentTraceContext; + this.mutatedTraceContext.set((OtelTraceContext) currentTraceContext.context()); + this.tagFields = tagFields; + this.entry.set(entry); + } + + OtelBaggageInScope(OtelBaggageManager otelBaggageManager, CurrentTraceContext currentTraceContext, + OtelTraceContext traceContext, List tagFields, Entry entry) { + this.otelBaggageManager = otelBaggageManager; + this.currentTraceContext = currentTraceContext; + this.mutatedTraceContext.set(traceContext); this.tagFields = tagFields; this.entry.set(entry); } @@ -62,6 +81,9 @@ public String name() { @Override public String get() { + if (entry.get() != null) { + return entry.get().value; + } return this.otelBaggageManager.currentBaggage().getEntryValue(entry().getKey()); } @@ -88,7 +110,13 @@ private io.micrometer.tracing.Baggage doSet(TraceContext context, String value) Span currentSpan = Span.current(); io.opentelemetry.api.baggage.Baggage baggage; OtelTraceContext ctx = (OtelTraceContext) context; + if (!Objects.equals(mutatedTraceContext.get(), ctx)) { + log.trace( + "This is unexpected - someone created baggage when mutatedTraceContext was current and now when makeCurrent() was called a new traceContext is present"); + } + mutatedTraceContext.set(ctx); Context storedCtx = ctx.context(); + contextWithoutBaggage.set(storedCtx); Baggage fromContext = Baggage.fromContext(storedCtx); BaggageBuilder newBaggageBuilder = fromContext.toBuilder(); @@ -131,18 +159,22 @@ public BaggageInScope makeCurrent(TraceContext traceContext, String value) { @Override public BaggageInScope makeCurrent() { - Entry entry = entry(); + Entry storedEntry = entry(); Context context = contextWithBaggage.get(); if (context == null) { context = Context.current(); } Baggage baggage = Baggage.fromContext(context) .toBuilder() - .put(entry.getKey(), entry.getValue(), entry.getMetadata()) + .put(storedEntry.getKey(), storedEntry.getValue(), storedEntry.getMetadata()) .build(); Context updated = context.with(baggage); - Scope scope = updated.makeCurrent(); - this.scope.set(scope); + OtelTraceContext otelTraceContext = this.mutatedTraceContext.get(); + if (otelTraceContext != null) { + otelTraceContext.updateContext(updated); + } + Scope currentScope = updated.makeCurrent(); + this.scope.set(currentScope); return this; } @@ -152,6 +184,10 @@ public void close() { if (scope != null) { this.scope.set(null); scope.close(); + OtelTraceContext traceContext = this.mutatedTraceContext.get(); + if (traceContext != null) { + traceContext.updateContext(this.contextWithoutBaggage.get()); + } } } diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelBaggageManager.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelBaggageManager.java index 1e7a6b7f..aabceb9d 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelBaggageManager.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelBaggageManager.java @@ -15,6 +15,18 @@ */ package io.micrometer.tracing.otel.bridge; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiConsumer; + import io.micrometer.common.lang.Nullable; import io.micrometer.tracing.BaggageInScope; import io.micrometer.tracing.BaggageManager; @@ -26,9 +38,6 @@ import io.opentelemetry.api.baggage.BaggageEntryMetadata; import io.opentelemetry.context.Context; -import java.util.*; -import java.util.function.BiConsumer; - import static java.util.Collections.unmodifiableCollection; import static java.util.Collections.unmodifiableMap; import static java.util.function.Function.identity; @@ -68,19 +77,34 @@ public OtelBaggageManager(CurrentTraceContext currentTraceContext, List @Override public Map getAllBaggage() { + return toMap(currentBaggage()); + } + + private Map toMap(CompositeBaggage compositeBaggage) { Map baggage = new HashMap<>(); - currentBaggage().getEntries().forEach(entry -> baggage.put(entry.getKey(), entry.getValue())); + compositeBaggage.getEntries().forEach(entry -> baggage.put(entry.getKey(), entry.getValue())); return baggage; } + @Override + public Map getAllBaggage(TraceContext traceContext) { + if (traceContext == null) { + return getAllBaggage(); + } + return toMap(baggage((OtelTraceContext) traceContext)); + } + CompositeBaggage currentBaggage() { - OtelTraceContext traceContext = (OtelTraceContext) currentTraceContext.context(); + return baggage((OtelTraceContext) currentTraceContext.context()); + } + + private CompositeBaggage baggage(OtelTraceContext traceContext) { Context context = Context.current(); Deque stack = new ArrayDeque<>(); - if (traceContext != null) { + stack.addFirst(context); + if (traceContext != null && traceContext.context() != null) { stack.addFirst(traceContext.context()); } - stack.addFirst(context); return new CompositeBaggage(stack); } @@ -104,11 +128,11 @@ private Entry getBaggage(String name, io.opentelemetry.api.baggage.Baggage bagga @Override public io.micrometer.tracing.Baggage getBaggage(TraceContext traceContext, String name) { OtelTraceContext context = (OtelTraceContext) traceContext; - Deque stack = new ArrayDeque<>(); + LinkedList stack = new LinkedList<>(); Context current = Context.current(); Context traceContextContext = context.context(); stack.addFirst(current); - if (current != traceContextContext) { + if (!Objects.equals(current, traceContextContext)) { stack.addFirst(traceContextContext); } Context ctx = removeFirst(stack); @@ -118,7 +142,7 @@ public io.micrometer.tracing.Baggage getBaggage(TraceContext traceContext, Strin ctx = removeFirst(stack); } if (entry != null) { - return otelBaggage(entry); + return otelBaggage(context, entry); } return null; } @@ -145,6 +169,10 @@ private io.micrometer.tracing.Baggage otelBaggage(Entry entry) { return new OtelBaggageInScope(this, this.currentTraceContext, this.tagFields, entry); } + private io.micrometer.tracing.Baggage otelBaggage(OtelTraceContext otelTraceContext, Entry entry) { + return new OtelBaggageInScope(this, this.currentTraceContext, otelTraceContext, this.tagFields, entry); + } + @Override @Deprecated public io.micrometer.tracing.Baggage createBaggage(String name) { diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelCurrentTraceContext.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelCurrentTraceContext.java index 13cdecb2..8a58ae20 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelCurrentTraceContext.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelCurrentTraceContext.java @@ -15,6 +15,10 @@ */ package io.micrometer.tracing.otel.bridge; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + import io.micrometer.tracing.CurrentTraceContext; import io.micrometer.tracing.TraceContext; import io.opentelemetry.api.baggage.Baggage; @@ -22,10 +26,7 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.context.Context; - -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; +import io.opentelemetry.context.ContextKey; /** * OpenTelemetry implementation of a {@link CurrentTraceContext}. @@ -36,14 +37,22 @@ */ public class OtelCurrentTraceContext implements CurrentTraceContext { + private static final String TRACING_OTEL_CONTEXT_KEY = "otelTraceContext"; + + private static final ContextKey OTEL_CONTEXT_KEY = ContextKey.named(TRACING_OTEL_CONTEXT_KEY); + @Override public TraceContext context() { + OtelTraceContext otelTraceContext = Context.current().get(OTEL_CONTEXT_KEY); + if (otelTraceContext != null) { + return otelTraceContext; + } Span currentSpan = Span.current(); if (Span.getInvalid().equals(currentSpan)) { return null; } if (currentSpan instanceof SpanFromSpanContext) { - return new OtelTraceContext((SpanFromSpanContext) currentSpan); + return ((SpanFromSpanContext) currentSpan).parentTraceContext; } return new OtelTraceContext(currentSpan); } @@ -62,7 +71,7 @@ public Scope newScope(TraceContext context) { return new WrappedScope(io.opentelemetry.context.Scope.noop()); } Context current = Context.current(); - Context old = otelTraceContext.context(); + Context oldContext = otelTraceContext.context(); // Check if there's a span in the static OTel context Span spanFromCurrentCtx = Span.fromContext(current); // Check if there's a span in the ctx attached to TraceContext @@ -73,16 +82,16 @@ public Scope newScope(TraceContext context) { SpanFromSpanContext fromContext = new SpanFromSpanContext(((OtelTraceContext) context).span, spanContext, otelTraceContext); Baggage currentBaggage = Baggage.fromContext(current); - Baggage oldBaggage = Baggage.fromContext(old); + Baggage oldBaggage = Baggage.fromContext(oldContext); boolean sameBaggage = sameBaggage(currentBaggage, oldBaggage); if (sameSpan && sameBaggage) { return new WrappedScope(io.opentelemetry.context.Scope.noop()); } Baggage updatedBaggage = mergeBaggage(currentBaggage, oldBaggage); - Context newContext = old.with(fromContext).with(updatedBaggage); - io.opentelemetry.context.Scope attach = newContext.makeCurrent(); + Context newContext = oldContext.with(fromContext).with(updatedBaggage).with(OTEL_CONTEXT_KEY, otelTraceContext); + io.opentelemetry.context.Scope newScope = newContext.makeCurrent(); otelTraceContext.updateContext(newContext); - return new WrappedScope(attach, otelTraceContext, old); + return new WrappedScope(newScope, otelTraceContext, oldContext); } private static Baggage mergeBaggage(Baggage currentBaggage, Baggage oldBaggage) { diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelSpan.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelSpan.java index cce57561..63a3c4eb 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelSpan.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelSpan.java @@ -15,15 +15,14 @@ */ package io.micrometer.tracing.otel.bridge; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + import io.micrometer.tracing.Span; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Context; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - /** * OpenTelemetry implementation of a {@link Span}. * @@ -34,22 +33,21 @@ class OtelSpan implements Span { final io.opentelemetry.api.trace.Span delegate; - private final AtomicReference context; + final OtelTraceContext otelTraceContext; OtelSpan(io.opentelemetry.api.trace.Span delegate) { this.delegate = delegate; - if (delegate instanceof SpanFromSpanContext) { - SpanFromSpanContext fromSpanContext = (SpanFromSpanContext) delegate; - this.context = fromSpanContext.otelTraceContext.context; - } - else { - this.context = new AtomicReference<>(Context.current()); - } + this.otelTraceContext = new OtelTraceContext(delegate.getSpanContext(), delegate); } OtelSpan(io.opentelemetry.api.trace.Span delegate, Context context) { this.delegate = delegate; - this.context = new AtomicReference<>(context); + this.otelTraceContext = new OtelTraceContext(context, delegate.getSpanContext(), delegate); + } + + OtelSpan(OtelTraceContext traceContext) { + this.delegate = traceContext.span != null ? traceContext.span : io.opentelemetry.api.trace.Span.current(); + this.otelTraceContext = traceContext; } static io.opentelemetry.api.trace.Span toOtel(Span span) { @@ -74,7 +72,7 @@ public OtelTraceContext context() { if (this.delegate == null) { return null; } - return new OtelTraceContext(this.context, this.delegate.getSpanContext(), this.delegate); + return this.otelTraceContext; } @Override diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelSpanBuilder.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelSpanBuilder.java index f0361c00..cd753ef1 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelSpanBuilder.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelSpanBuilder.java @@ -15,6 +15,13 @@ */ package io.micrometer.tracing.otel.bridge; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import io.micrometer.common.util.StringUtils; import io.micrometer.tracing.Link; import io.micrometer.tracing.Span; @@ -28,10 +35,6 @@ import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; -import java.util.*; -import java.util.AbstractMap.SimpleEntry; -import java.util.concurrent.TimeUnit; - /** * OpenTelemetry implementation of a {@link Span.Builder}. * @@ -218,12 +221,11 @@ public Span start() { span.recordException(this.error); } this.annotations.forEach(span::addEvent); - Span otelSpan = OtelSpan.fromOtel(span); if (this.parentTraceContext != null) { return OtelSpan.fromOtel( new SpanFromSpanContext(span, span.getSpanContext(), (OtelTraceContext) this.parentTraceContext)); } - return otelSpan; + return OtelSpan.fromOtel(span); } } diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelTraceContext.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelTraceContext.java index c9e9e092..816ae520 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelTraceContext.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelTraceContext.java @@ -33,34 +33,38 @@ */ public class OtelTraceContext implements TraceContext { - final AtomicReference context; + final AtomicReference otelContext; final SpanContext delegate; final Span span; OtelTraceContext(Context context, SpanContext delegate, @Nullable Span span) { - this(new AtomicReference<>(context), delegate, span); + this(new AtomicReference<>(context == null ? Context.current() : context), delegate, span); } OtelTraceContext(AtomicReference context, SpanContext delegate, @Nullable Span span) { - this.context = context; + this.otelContext = context; this.delegate = delegate; this.span = span; } OtelTraceContext(SpanContext delegate, @Nullable Span span) { - this.context = new AtomicReference<>(Context.current()); + this.otelContext = context(span); this.delegate = delegate; this.span = span; } OtelTraceContext(Span span) { - this(Context.current(), span.getSpanContext(), span); + this(context(span), span.getSpanContext(), span); } - OtelTraceContext(SpanFromSpanContext span) { - this(span.otelTraceContext.context.get(), span.getSpanContext(), span); + private static AtomicReference context(Span span) { + if (span instanceof SpanFromSpanContext) { + Context contextFromParent = ((SpanFromSpanContext) span).parentTraceContext.context(); + return new AtomicReference<>(contextFromParent); + } + return new AtomicReference<>(Context.current()); } /** @@ -108,9 +112,10 @@ public String traceId() { @Override @Nullable public String parentId() { - Span span = this.span instanceof SpanFromSpanContext ? ((SpanFromSpanContext) this.span).span : this.span; - if (span instanceof ReadableSpan) { - ReadableSpan readableSpan = (ReadableSpan) span; + Span spanContextSpanOrSpan = this.span instanceof SpanFromSpanContext ? ((SpanFromSpanContext) this.span).span + : this.span; + if (spanContextSpanOrSpan instanceof ReadableSpan) { + ReadableSpan readableSpan = (ReadableSpan) spanContextSpanOrSpan; return readableSpan.toSpanData().getParentSpanId(); } return null; @@ -131,12 +136,9 @@ public String toString() { return this.delegate != null ? this.delegate.toString() : "null"; } - Span span() { - return this.span; - } - Context context() { - return this.context.get(); + Context ctx = this.otelContext.get(); + return ctx != null ? ctx : Context.root(); } SpanContext spanContext() { @@ -144,7 +146,7 @@ SpanContext spanContext() { } void updateContext(Context context) { - this.context.set(context); + this.otelContext.set(context); } @Override @@ -155,8 +157,8 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - OtelTraceContext context = (OtelTraceContext) o; - return Objects.equals(this.delegate, context.delegate); + OtelTraceContext otelTraceContext = (OtelTraceContext) o; + return Objects.equals(this.delegate, otelTraceContext.delegate); } @Override diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelTracer.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelTracer.java index 710a7081..b1eae51e 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelTracer.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/OtelTracer.java @@ -15,13 +15,20 @@ */ package io.micrometer.tracing.otel.bridge; -import io.micrometer.tracing.*; +import java.util.Map; + +import io.micrometer.tracing.Baggage; +import io.micrometer.tracing.BaggageInScope; +import io.micrometer.tracing.BaggageManager; +import io.micrometer.tracing.CurrentTraceContext; +import io.micrometer.tracing.ScopedSpan; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.SpanCustomizer; +import io.micrometer.tracing.TraceContext; +import io.micrometer.tracing.Tracer; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; - /** * OpenTelemetry implementation of a {@link Tracer}. * @@ -70,8 +77,7 @@ public Span nextSpan(Span parent) { return nextSpan(); } OtelSpan otelSpan = (OtelSpan) parent; - AtomicReference context = otelSpan.context().context; - Context otelContext = context.get(); + Context otelContext = otelSpan.context().context(); Scope scope = null; if (otelContext != null && Context.current() != otelContext) { // This shouldn't // happen @@ -91,20 +97,22 @@ public Span nextSpan(Span parent) { @Override public Tracer.SpanInScope withSpan(Span span) { - io.opentelemetry.api.trace.Span delegate = delegate(span); - CurrentTraceContext.Scope scope = this.otelCurrentTraceContext - .maybeScope(OtelSpan.fromOtel(delegate).context()); + TraceContext traceContext = traceContext(span); + CurrentTraceContext.Scope scope = this.otelCurrentTraceContext.maybeScope(traceContext); return new WrappedSpanInScope(scope); } - private io.opentelemetry.api.trace.Span delegate(Span span) { + private TraceContext traceContext(Span span) { if (span == null) { // remove any existing span/baggage data from the current state of anything // that might be holding on to it. this.publisher.publishEvent(new EventPublishingContextWrapper.ScopeClosedEvent()); - return io.opentelemetry.api.trace.Span.getInvalid(); + return new OtelTraceContext(io.opentelemetry.api.trace.Span.getInvalid()); + } + else if (span instanceof OtelSpan) { + return span.context(); } - return ((OtelSpan) span).delegate; + return new OtelTraceContext(io.opentelemetry.api.trace.Span.getInvalid()); } @Override @@ -114,6 +122,10 @@ public SpanCustomizer currentSpanCustomizer() { @Override public Span currentSpan() { + OtelTraceContext context = (OtelTraceContext) this.otelCurrentTraceContext.context(); + if (context != null && context.span != null) { + return new OtelSpan(context); + } io.opentelemetry.api.trace.Span currentSpan = io.opentelemetry.api.trace.Span.current(); if (currentSpan == null || currentSpan.equals(io.opentelemetry.api.trace.Span.getInvalid())) { return null; @@ -152,6 +164,11 @@ public Map getAllBaggage() { return this.otelBaggageManager.getAllBaggage(); } + @Override + public Map getAllBaggage(TraceContext traceContext) { + return this.otelBaggageManager.getAllBaggage(traceContext); + } + @Override public Baggage getBaggage(String name) { return this.otelBaggageManager.getBaggage(name); diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/SpanFromSpanContext.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/SpanFromSpanContext.java index c451a984..b2fd90c7 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/SpanFromSpanContext.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/SpanFromSpanContext.java @@ -15,6 +15,10 @@ */ package io.micrometer.tracing.otel.bridge; +import java.time.Instant; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + import io.micrometer.common.lang.Nullable; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; @@ -22,23 +26,19 @@ import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Context; -import java.time.Instant; -import java.util.Objects; -import java.util.concurrent.TimeUnit; - class SpanFromSpanContext implements io.opentelemetry.api.trace.Span { final io.opentelemetry.api.trace.Span span; final SpanContext newSpanContext; - final OtelTraceContext otelTraceContext; + final OtelTraceContext parentTraceContext; SpanFromSpanContext(io.opentelemetry.api.trace.Span span, SpanContext newSpanContext, - OtelTraceContext otelTraceContext) { + OtelTraceContext parentTraceContext) { this.span = span != null ? span : io.opentelemetry.api.trace.Span.wrap(newSpanContext); this.newSpanContext = newSpanContext; - this.otelTraceContext = otelTraceContext; + this.parentTraceContext = parentTraceContext; } @Override diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/test/java/io/micrometer/tracing/otel/bridge/BaggageTests.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/test/java/io/micrometer/tracing/otel/bridge/BaggageTests.java index c6f01c5d..690e215c 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/test/java/io/micrometer/tracing/otel/bridge/BaggageTests.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/test/java/io/micrometer/tracing/otel/bridge/BaggageTests.java @@ -15,12 +15,17 @@ */ package io.micrometer.tracing.otel.bridge; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + import io.micrometer.common.util.internal.logging.InternalLogger; import io.micrometer.common.util.internal.logging.InternalLoggerFactory; -import io.micrometer.tracing.Baggage; +import io.micrometer.context.ContextRegistry; import io.micrometer.tracing.BaggageInScope; import io.micrometer.tracing.Span; import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.contextpropagation.ObservationAwareSpanThreadLocalAccessor; import io.micrometer.tracing.otel.propagation.BaggageTextMapPropagator; import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; @@ -34,10 +39,6 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - import static org.assertj.core.api.BDDAssertions.then; class BaggageTests { @@ -135,6 +136,7 @@ void injectAndExtractKeepsTheBaggage() { @Test void baggageWithContextPropagation() { + ContextRegistry.getInstance().registerThreadLocalAccessor(new ObservationAwareSpanThreadLocalAccessor(tracer)); Hooks.enableAutomaticContextPropagation(); Span span = tracer.nextSpan().start(); @@ -144,20 +146,20 @@ void baggageWithContextPropagation() { then(baggageOutside).isEqualTo(VALUE_1); log.info( "BAGGAGE OUTSIDE OF REACTOR [" + baggageOutside + "], thread [" + Thread.currentThread() + "]"); - Baggage baggageFromReactor = Mono.just(KEY_1) + String baggageFromReactor = Mono.just(KEY_1) .publishOn(Schedulers.boundedElastic()) - .flatMap(s -> Mono.just(this.tracer.getBaggage(s)) + .flatMap(s -> Mono.just(this.tracer.getBaggage(s).get()) .doOnNext(baggage -> log.info("BAGGAGE IN OF REACTOR [" + baggageOutside + "], thread [" + Thread.currentThread() + "]"))) .block(); - then(baggageFromReactor).isNotNull(); - then(baggageFromReactor.get()).isEqualTo(VALUE_1); + then(baggageFromReactor).isEqualTo(VALUE_1); } } } @Test void baggageWithContextPropagationWithLegacyApi() { + ContextRegistry.getInstance().registerThreadLocalAccessor(new ObservationAwareSpanThreadLocalAccessor(tracer)); Hooks.enableAutomaticContextPropagation(); Span span = tracer.nextSpan().start(); @@ -167,14 +169,13 @@ void baggageWithContextPropagationWithLegacyApi() { then(baggageOutside).isEqualTo(VALUE_1); log.info( "BAGGAGE OUTSIDE OF REACTOR [" + baggageOutside + "], thread [" + Thread.currentThread() + "]"); - Baggage baggageFromReactor = Mono.just(KEY_1) + String baggageFromReactor = Mono.just(KEY_1) .publishOn(Schedulers.boundedElastic()) - .flatMap(s -> Mono.just(this.tracer.getBaggage(s)) + .flatMap(s -> Mono.just(this.tracer.getBaggage(s).get()) .doOnNext(baggage -> log.info("BAGGAGE IN OF REACTOR [" + baggageOutside + "], thread [" + Thread.currentThread() + "]"))) .block(); - then(baggageFromReactor).isNotNull(); - then(baggageFromReactor.get()).isEqualTo(VALUE_1); + then(baggageFromReactor).isEqualTo(VALUE_1); } } } diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/test/java/io/micrometer/tracing/otel/bridge/OtelTracingApiTests.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/test/java/io/micrometer/tracing/otel/bridge/OtelTracingApiTests.java index 4c18f45b..cdd684f9 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/test/java/io/micrometer/tracing/otel/bridge/OtelTracingApiTests.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/test/java/io/micrometer/tracing/otel/bridge/OtelTracingApiTests.java @@ -211,9 +211,8 @@ void should_work_with_baggage_with_legacy_api() { then(tracer.getBaggage("from_span_in_scope 2").get()).as("[Out of scope] Baggage 2").isNull(); then(tracer.getBaggage("from_span").get()).as("[Out of scope] Baggage 3").isNull(); - // You will retrieve the baggage value ALWAYS when you pass the context explicitly - then(tracer.getBaggage("from_span").get(span.context())).as("[Out of scope - with context] Baggage 3") - .isEqualTo("value 3"); + // Baggage is present only within the scope + then(tracer.getBaggage("from_span").get(span.context())).as("[Out of scope - with context] Baggage 3").isNull(); } @Test @@ -256,9 +255,8 @@ void should_work_with_baggage() { then(tracer.getBaggage("from_span_in_scope 2").get()).as("[Out of scope] Baggage 2").isNull(); then(tracer.getBaggage("from_span").get()).as("[Out of scope] Baggage 3").isNull(); - // You will retrieve the baggage value ALWAYS when you pass the context explicitly - then(tracer.getBaggage("from_span").get(span.context())).as("[Out of scope - with context] Baggage 3") - .isEqualTo("value 3"); + // Baggage is present only within the scope + then(tracer.getBaggage("from_span").get(span.context())).as("[Out of scope - with context] Baggage 3").isNull(); } @Test diff --git a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/test/java/io/micrometer/tracing/otel/contextpropagation/ScopesTests.java b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/test/java/io/micrometer/tracing/otel/contextpropagation/ScopesTests.java index 36a4674e..b5e041f6 100644 --- a/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/test/java/io/micrometer/tracing/otel/contextpropagation/ScopesTests.java +++ b/micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/test/java/io/micrometer/tracing/otel/contextpropagation/ScopesTests.java @@ -15,14 +15,20 @@ */ package io.micrometer.tracing.otel.contextpropagation; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + import io.micrometer.common.util.internal.logging.InternalLogger; import io.micrometer.common.util.internal.logging.InternalLoggerFactory; import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; +import io.micrometer.tracing.BaggageInScope; import io.micrometer.tracing.Span; import io.micrometer.tracing.Tracer; import io.micrometer.tracing.handler.DefaultTracingObservationHandler; +import io.micrometer.tracing.handler.TracingObservationHandler.TracingContext; import io.micrometer.tracing.otel.bridge.ArrayListSpanProcessor; import io.micrometer.tracing.otel.bridge.OtelBaggageManager; import io.micrometer.tracing.otel.bridge.OtelCurrentTraceContext; @@ -32,15 +38,13 @@ import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.util.context.Context; -import java.util.Collections; -import java.util.concurrent.atomic.AtomicReference; - import static org.assertj.core.api.BDDAssertions.then; class ScopesTests { @@ -62,7 +66,8 @@ class ScopesTests { io.opentelemetry.api.trace.Tracer otelTracer = openTelemetrySdk.getTracer("io.micrometer.micrometer-tracing"); Tracer tracer = new OtelTracer(otelTracer, new OtelCurrentTraceContext(), event -> { - }, new OtelBaggageManager(new OtelCurrentTraceContext(), Collections.emptyList(), Collections.emptyList())); + }, new OtelBaggageManager(new OtelCurrentTraceContext(), Collections.singletonList("foo"), + Collections.emptyList())); DefaultTracingObservationHandler handler = new DefaultTracingObservationHandler(tracer); @@ -76,6 +81,12 @@ void setup() { ObservationThreadLocalAccessor.getInstance().setObservationRegistry(observationRegistry); } + @AfterEach + void ensureScopesNotLeaking() { + then(io.opentelemetry.context.Context.current()).as("All scopes must be closed") + .isSameAs(io.opentelemetry.context.Context.root()); + } + @Test void should_open_and_close_scopes_with_reactor() { Observation obs1 = Observation.start("1", observationRegistry); @@ -122,6 +133,66 @@ void should_open_and_close_scopes_with_reactor() { logger.info("SPAN AFTER CLOSE 1 [" + tracer.currentSpan() + "]"); } + @Test + void should_open_and_close_scopes_with_reactor_with_baggage() { + Observation obs1 = Observation.start("1", observationRegistry); + + Observation.Scope scope = obs1.openScope(); + Span span1 = tracer.currentSpan(); + BaggageInScope baggageInScope1 = tracer.createBaggageInScope("foo", "span1"); + then(tracer.getAllBaggage().get("foo")).isEqualTo("span1"); + then(tracer.getAllBaggage(span1.context()).get("foo")).isEqualTo("span1"); + + Observation obs2 = Observation.start("2", observationRegistry); + Observation.Scope scope2 = obs2.openScope(); + Span span2 = tracer.currentSpan(); + BaggageInScope baggageInScope2 = tracer.createBaggageInScope(span2.context(), "foo", "span2"); + + then(tracer.getAllBaggage().get("foo")).isEqualTo("span2"); + then(tracer.getAllBaggage(span1.context()).get("foo")).isEqualTo("span1"); + then(tracer.getAllBaggage(span2.context()).get("foo")).isEqualTo("span2"); + + AtomicReference errorInFlatMap = new AtomicReference<>(); + AtomicReference errorInOnNext = new AtomicReference<>(); + + Mono.just(1).flatMap(integer -> { + return Mono.just(2).doOnNext(integer1 -> { + Map baggageInEmpty = tracer.getAllBaggage(); + logger.info("\n\n[2] BAGGAGE IN EMPTY [" + baggageInEmpty + "]"); + assertBaggageInReactor(errorInFlatMap, baggageInEmpty, null); + }).contextWrite(context -> Context.empty()); + }).doOnNext(integer -> { + Map baggageInOnNext = tracer.getAllBaggage(); + logger.info("\n\n[1] SPAN IN ON NEXT [" + baggageInOnNext + "]"); + assertBaggageInReactor(errorInOnNext, baggageInOnNext, "span2"); + }).contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, obs2)).block(); + + logger.info("Checking if there were no errors in reactor"); + then(errorInFlatMap).hasValue(null); + then(errorInOnNext).hasValue(null); + + then(tracer.getBaggage(span2.context(), "foo").get()).isEqualTo("span2"); + then(tracer.getBaggage(span1.context(), "foo").get()).isEqualTo("span1"); + + baggageInScope2.close(); + scope2.close(); + obs2.stop(); + TracingContext tracingContext2 = obs2.getContext().get(TracingContext.class); + then(tracingContext2.getBaggage()).isNullOrEmpty(); + + then(tracer.getBaggage("foo").get()).isEqualTo("span1"); + then(tracer.getBaggage(span1.context(), "foo").get()).isEqualTo("span1"); + then(tracer.getBaggage(span2.context(), "foo").get()).isEqualTo("span1"); + + baggageInScope1.close(); + scope.close(); + obs1.stop(); + TracingContext tracingContext1 = obs1.getContext().get(TracingContext.class); + then(tracingContext1.getBaggage()).isNullOrEmpty(); + + then(tracer.getAllBaggage()).isEmpty(); + } + private static void assertInReactor(AtomicReference errors, Span spanWOnNext, Span expectedSpan) { try { then(spanWOnNext).isEqualTo(expectedSpan); @@ -131,4 +202,14 @@ private static void assertInReactor(AtomicReference errors, Span } } + private static void assertBaggageInReactor(AtomicReference errors, Map baggageMap, + String expectedValue) { + try { + then(baggageMap.get("foo")).isEqualTo(expectedValue); + } + catch (AssertionError er) { + errors.set(er); + } + } + } diff --git a/micrometer-tracing/src/main/java/io/micrometer/tracing/BaggageManager.java b/micrometer-tracing/src/main/java/io/micrometer/tracing/BaggageManager.java index 858ecad9..a6c7b399 100644 --- a/micrometer-tracing/src/main/java/io/micrometer/tracing/BaggageManager.java +++ b/micrometer-tracing/src/main/java/io/micrometer/tracing/BaggageManager.java @@ -75,6 +75,18 @@ public BaggageInScope createBaggageInScope(TraceContext traceContext, String nam */ Map getAllBaggage(); + /** + * @param traceContext trace context with baggage. If {@code null} will try to get all + * baggage from current available context + * @return mapping of all baggage entries from the given scope + */ + default Map getAllBaggage(@Nullable TraceContext traceContext) { + if (traceContext == null) { + return getAllBaggage(); + } + return Collections.emptyMap(); + } + /** * Retrieves {@link Baggage} for the given name. * @param name baggage name diff --git a/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/DefaultTracingObservationHandler.java b/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/DefaultTracingObservationHandler.java index 351d48b6..58922d6b 100755 --- a/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/DefaultTracingObservationHandler.java +++ b/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/DefaultTracingObservationHandler.java @@ -50,7 +50,7 @@ public void onStop(Observation.Context context) { Span span = getRequiredSpan(context); span.name(getSpanName(context)); tagSpan(context, span); - span.end(); + endSpan(context, span); } @Override diff --git a/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/PropagatingReceiverTracingObservationHandler.java b/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/PropagatingReceiverTracingObservationHandler.java index c2c7b053..ce0b8f3e 100644 --- a/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/PropagatingReceiverTracingObservationHandler.java +++ b/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/PropagatingReceiverTracingObservationHandler.java @@ -98,7 +98,7 @@ public void onStop(T context) { tagSpan(context, span); customizeReceiverSpan(context, span); span.name(context.getContextualName() != null ? context.getContextualName() : context.getName()); - span.end(); + endSpan(context, span); } /** diff --git a/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/PropagatingSenderTracingObservationHandler.java b/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/PropagatingSenderTracingObservationHandler.java index c03e4e93..612c1c79 100644 --- a/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/PropagatingSenderTracingObservationHandler.java +++ b/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/PropagatingSenderTracingObservationHandler.java @@ -101,7 +101,7 @@ public void onStop(T context) { tagSpan(context, span); customizeSenderSpan(context, span); span.name(context.getContextualName() != null ? context.getContextualName() : context.getName()); - span.end(); + endSpan(context, span); } /** diff --git a/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/RevertingScope.java b/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/RevertingScope.java index 8fa66a8a..308709f6 100644 --- a/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/RevertingScope.java +++ b/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/RevertingScope.java @@ -16,7 +16,10 @@ package io.micrometer.tracing.handler; import io.micrometer.tracing.CurrentTraceContext; +import io.micrometer.tracing.CurrentTraceContext.Scope; +import io.micrometer.tracing.handler.TracingObservationHandler.TracingContext; +import java.util.Map; import java.util.Objects; class RevertingScope implements CurrentTraceContext.Scope { @@ -27,17 +30,21 @@ class RevertingScope implements CurrentTraceContext.Scope { private final CurrentTraceContext.Scope previousScope; - RevertingScope(TracingObservationHandler.TracingContext tracingContext, CurrentTraceContext.Scope currentScope, - CurrentTraceContext.Scope previousScope) { + private final Map previousBaggage; + + RevertingScope(TracingContext tracingContext, Scope currentScope, Scope previousScope, + Map previousBaggage) { this.tracingContext = tracingContext; this.currentScope = currentScope; this.previousScope = previousScope; + this.previousBaggage = previousBaggage; } @Override public void close() { this.currentScope.close(); this.tracingContext.setScope(this.previousScope); + this.tracingContext.setBaggage(this.previousBaggage); } @Override @@ -55,12 +62,13 @@ public boolean equals(Object o) { } RevertingScope that = (RevertingScope) o; return Objects.equals(tracingContext, that.tracingContext) && Objects.equals(currentScope, that.currentScope) - && Objects.equals(previousScope, that.previousScope); + && Objects.equals(previousScope, that.previousScope) + && Objects.equals(previousBaggage, that.previousBaggage); } @Override public int hashCode() { - return Objects.hash(tracingContext, currentScope, previousScope); + return Objects.hash(tracingContext, currentScope, previousScope, previousBaggage); } } diff --git a/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/TracingObservationHandler.java b/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/TracingObservationHandler.java index cddb9ad7..916d76a5 100755 --- a/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/TracingObservationHandler.java +++ b/micrometer-tracing/src/main/java/io/micrometer/tracing/handler/TracingObservationHandler.java @@ -29,6 +29,7 @@ import io.micrometer.tracing.Tracer; import io.micrometer.tracing.internal.SpanNameUtil; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -93,10 +94,16 @@ default void onScopeOpened(T context) { default void setMaybeScopeOnTracingContext(TracingContext tracingContext, @Nullable Span newSpan) { Span spanFromThisObservation = tracingContext.getSpan(); TraceContext newContext = newSpan != null ? newSpan.context() : null; + Map previousBaggage = Collections.emptyMap(); + if (spanFromThisObservation != null) { + previousBaggage = getTracer().getAllBaggage(spanFromThisObservation.context()); + } CurrentTraceContext.Scope scope = getTracer().currentTraceContext().maybeScope(newContext); CurrentTraceContext.Scope previousScopeOnThisObservation = tracingContext.getScope(); + Map newBaggage = getTracer().getAllBaggage(newContext); + tracingContext.setBaggage(newBaggage); tracingContext.setSpanAndScope(spanFromThisObservation, - new RevertingScope(tracingContext, scope, previousScopeOnThisObservation)); + new RevertingScope(tracingContext, scope, previousScopeOnThisObservation, previousBaggage)); } @Override @@ -197,6 +204,17 @@ default Span getRequiredSpan(T context) { return span; } + /** + * Ends span and clears resources. + * @param context context + * @param span span to end + */ + default void endSpan(T context, Span span) { + TracingContext tracingContext = getTracingContext(context); + tracingContext.close(); + span.end(); + } + /** * Returns the {@link Tracer}. * @return tracer @@ -209,12 +227,14 @@ default Span getRequiredSpan(T context) { * @author Marcin Grzejszczak * @since 1.0.0 */ - class TracingContext { + class TracingContext implements AutoCloseable { private Span span; private Map scopes = new ConcurrentHashMap<>(); + private Map> baggage = new ConcurrentHashMap<>(); + /** * Returns the span. * @return span @@ -252,6 +272,27 @@ public void setScope(CurrentTraceContext.Scope scope) { } } + /** + * Returns the baggage corresponding to this span. + * @return baggage attached to the span + */ + public Map getBaggage() { + return this.baggage.get(Thread.currentThread()); + } + + /** + * Sets the baggage + * @param baggage baggage to set + */ + public void setBaggage(Map baggage) { + if (baggage == null) { + this.baggage.remove(Thread.currentThread()); + } + else { + this.baggage.put(Thread.currentThread(), baggage); + } + } + /** * Convenience method to set both span and scope. * @param span span to set @@ -262,9 +303,15 @@ public void setSpanAndScope(Span span, CurrentTraceContext.Scope scope) { setScope(scope); } + @Override + public void close() { + this.baggage.clear(); + this.scopes.clear(); + } + @Override public String toString() { - return "TracingContext{" + "span=" + traceContextFromSpan() + '}'; + return "TracingContext{" + "span=" + traceContextFromSpan() + ",baggage=" + baggage + '}'; } private String traceContextFromSpan() { diff --git a/micrometer-tracing/src/test/java/io/micrometer/tracing/handler/TracingObservationHandlerTests.java b/micrometer-tracing/src/test/java/io/micrometer/tracing/handler/TracingObservationHandlerTests.java index a229321c..57c9280a 100644 --- a/micrometer-tracing/src/test/java/io/micrometer/tracing/handler/TracingObservationHandlerTests.java +++ b/micrometer-tracing/src/test/java/io/micrometer/tracing/handler/TracingObservationHandlerTests.java @@ -24,6 +24,8 @@ import org.mockito.BDDMockito; import org.mockito.InOrder; +import java.util.Collections; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.BDDAssertions.thenNoException; import static org.assertj.core.api.BDDAssertions.thenThrownBy; @@ -58,8 +60,9 @@ void spanShouldBeClearedOnScopeReset() { TracingObservationHandler.TracingContext tracingContext = new TracingObservationHandler.TracingContext(); CurrentTraceContext.Scope scope1 = mock(CurrentTraceContext.Scope.class); CurrentTraceContext.Scope scope2 = mock(CurrentTraceContext.Scope.class); - RevertingScope revertingScope1 = new RevertingScope(tracingContext, scope1, null); - RevertingScope revertingScope2 = new RevertingScope(tracingContext, scope2, revertingScope1); + RevertingScope revertingScope1 = new RevertingScope(tracingContext, scope1, null, Collections.emptyMap()); + RevertingScope revertingScope2 = new RevertingScope(tracingContext, scope2, revertingScope1, + Collections.emptyMap()); tracingContext.setScope(revertingScope2); context.put(TracingObservationHandler.TracingContext.class, tracingContext);