Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
marcingrzejszczak committed Aug 31, 2023
1 parent 929b456 commit 25c248b
Show file tree
Hide file tree
Showing 17 changed files with 356 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package io.micrometer.tracing.brave.bridge;

import brave.baggage.BaggageField;
import io.micrometer.common.lang.Nullable;
import io.micrometer.common.util.internal.logging.InternalLogger;
import io.micrometer.common.util.internal.logging.InternalLoggerFactory;
import io.micrometer.tracing.Baggage;
import io.micrometer.tracing.BaggageInScope;
import io.micrometer.tracing.TraceContext;
Expand All @@ -28,10 +31,19 @@
*/
class BraveBaggageInScope implements Baggage, BaggageInScope {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(BraveBaggageInScope.class);

private final BaggageField delegate;

BraveBaggageInScope(BaggageField delegate) {
private final String previousBaggage;

@Nullable
private brave.propagation.TraceContext traceContext;

BraveBaggageInScope(BaggageField delegate, @Nullable brave.propagation.TraceContext traceContext) {
this.delegate = delegate;
this.traceContext = traceContext;
this.previousBaggage = delegate.getValue();
}

@Override
Expand All @@ -41,7 +53,7 @@ public String name() {

@Override
public String get() {
return this.delegate.getValue();
return this.traceContext != null ? this.delegate.getValue(traceContext) : this.delegate.getValue();
}

@Override
Expand All @@ -52,36 +64,62 @@ public String get(TraceContext traceContext) {
@Override
@Deprecated
public Baggage set(String value) {
this.delegate.updateValue(value);
if (this.traceContext != null) {
this.delegate.updateValue(this.traceContext, value);
}
else {
this.delegate.updateValue(value);
}
return this;
}

@Override
@Deprecated
public Baggage set(TraceContext traceContext, String value) {
this.delegate.updateValue(BraveTraceContext.toBrave(traceContext), value);
brave.propagation.TraceContext braveContext = updateBraveTraceContext(traceContext);
this.delegate.updateValue(braveContext, value);
return this;
}

private brave.propagation.TraceContext updateBraveTraceContext(TraceContext traceContext) {
brave.propagation.TraceContext braveContext = BraveTraceContext.toBrave(traceContext);
if (this.traceContext != braveContext) {
logger.debug(
"Create on baggage was called on <{}> but now you want to set baggage on <{}>. That's unexpected.",
this.traceContext, traceContext);
this.traceContext = braveContext;
}
return braveContext;
}

@Override
public BaggageInScope makeCurrent() {
return this;
}

@Override
public BaggageInScope makeCurrent(String value) {
this.delegate.updateValue(value);
if (this.traceContext != null) {
this.delegate.updateValue(this.traceContext, value);
}
else {
this.delegate.updateValue(value);
}
return makeCurrent();
}

@Override
public BaggageInScope makeCurrent(TraceContext traceContext, String value) {
this.delegate.updateValue(BraveTraceContext.toBrave(traceContext), value);
brave.propagation.TraceContext braveContext = updateBraveTraceContext(traceContext);
this.delegate.updateValue(braveContext, value);
return makeCurrent();
}

@Override
public void close() {
if (this.traceContext != null) {
this.delegate.updateValue(this.traceContext, this.previousBaggage);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.micrometer.tracing.brave.bridge;

import brave.Tracing;
import brave.baggage.BaggageField;
import io.micrometer.tracing.Baggage;
import io.micrometer.tracing.BaggageInScope;
Expand All @@ -23,7 +24,6 @@

import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Brave implementation of a {@link BaggageManager}.
Expand All @@ -33,13 +33,19 @@
*/
public class BraveBaggageManager implements Closeable, BaggageManager {

private static final Map<String, BraveBaggageInScope> CACHE = new ConcurrentHashMap<>();

@Override
public Map<String, String> getAllBaggage() {
return BaggageField.getAllValues();
}

@Override
public Map<String, String> getAllBaggage(TraceContext traceContext) {
if (traceContext == null) {
return getAllBaggage();
}
return BaggageField.getAllValues(BraveTraceContext.toBrave(traceContext));
}

@Override
public Baggage getBaggage(String name) {
return createBaggage(name);
Expand All @@ -51,7 +57,7 @@ public Baggage getBaggage(TraceContext traceContext, String name) {
if (baggageField == null) {
return null;
}
return new BraveBaggageInScope(baggageField);
return new BraveBaggageInScope(baggageField, BraveTraceContext.toBrave(traceContext));
}

@Override
Expand All @@ -60,8 +66,18 @@ public Baggage createBaggage(String name) {
return baggage(name);
}

private BraveBaggageInScope baggage(String name, TraceContext traceContext) {
return new BraveBaggageInScope(BaggageField.create(name), BraveTraceContext.toBrave(traceContext));
}

private BraveBaggageInScope baggage(String name) {
return CACHE.computeIfAbsent(name, s -> new BraveBaggageInScope(BaggageField.create(s)));
return new BraveBaggageInScope(BaggageField.create(name), currentTraceContext());
}

// Taken from BraveField
private static brave.propagation.TraceContext currentTraceContext() {
Tracing tracing = Tracing.current();
return tracing != null ? tracing.currentTraceContext().get() : null;
}

@Override
Expand All @@ -77,12 +93,12 @@ public BaggageInScope createBaggageInScope(String name, String value) {

@Override
public BaggageInScope createBaggageInScope(TraceContext traceContext, String name, String value) {
return baggage(name).makeCurrent(traceContext, value);
return baggage(name, traceContext).makeCurrent(traceContext, value);
}

@Override
public void close() {
CACHE.clear();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public TraceContext.Builder traceContextBuilder() {
return new BraveTraceContextBuilder();
}

@Override
public Map<String, String> getAllBaggage(TraceContext traceContext) {
return this.braveBaggageManager.getAllBaggage(traceContext);
}

@Override
public Map<String, String> getAllBaggage() {
return this.braveBaggageManager.getAllBaggage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,8 @@ void should_work_with_baggage_with_legacy_api() {
then(tracer.getBaggage("from_span_in_scope 2").get()).as("[Out of scope] Baggage 2").isNull();
then(tracer.getBaggage("from_span").get()).as("[Out of scope] Baggage 3").isNull();

// You will retrieve the baggage value ALWAYS when you pass the context explicitly
then(tracer.getBaggage("from_span").get(span.context())).as("[Out of scope - with context] Baggage 3")
.isEqualTo("value 3");
// Baggage is present only within the scope
then(tracer.getBaggage("from_span").get(span.context())).as("[Out of scope - with context] Baggage 3").isNull();
}

@Test
Expand Down Expand Up @@ -256,9 +255,8 @@ void should_work_with_baggage() {
then(tracer.getBaggage("from_span_in_scope 2").get()).as("[Out of scope] Baggage 2").isNull();
then(tracer.getBaggage("from_span").get()).as("[Out of scope] Baggage 3").isNull();

// You will retrieve the baggage value ALWAYS when you pass the context explicitly
then(tracer.getBaggage("from_span").get(span.context())).as("[Out of scope - with context] Baggage 3")
.isEqualTo("value 3");
// Baggage is present only within the scope
then(tracer.getBaggage("from_span").get(span.context())).as("[Out of scope - with context] Baggage 3").isNull();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,33 @@
package io.micrometer.tracing.brave.contextpropagation;

import brave.Tracing;
import brave.baggage.BaggageField;
import brave.baggage.CorrelationFlushScopeArrayReader;
import brave.baggage.CorrelationScopeConfig;
import brave.baggage.*;
import brave.context.slf4j.MDCScopeDecorator;
import brave.propagation.B3Propagation;
import brave.propagation.CurrentTraceContext;
import brave.propagation.ThreadLocalCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.common.util.internal.logging.InternalLogger;
import io.micrometer.common.util.internal.logging.InternalLoggerFactory;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.micrometer.tracing.BaggageInScope;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
import io.micrometer.tracing.brave.bridge.BraveTracer;
import io.micrometer.tracing.handler.DefaultTracingObservationHandler;
import io.micrometer.tracing.handler.TracingObservationHandler.TracingContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.BDDAssertions.then;
Expand All @@ -50,13 +53,23 @@ class ScopesTests {

CurrentTraceContext currentTraceContext = ThreadLocalCurrentTraceContext.newBuilder()
.addScopeDecorator(MDCScopeDecorator.newBuilder()
.add(CorrelationScopeConfig.SingleCorrelationField.newBuilder(BaggageField.create("foo"))
.add(CorrelationScopeConfig.SingleCorrelationField
.newBuilder(BaggageField.create("foo"))
.flushOnUpdate()
.build())
.build())
.build();

Tracing tracing = Tracing.newBuilder().currentTraceContext(currentTraceContext).build();
Tracing tracing = Tracing.newBuilder()
.currentTraceContext(currentTraceContext)
.supportsJoin(false)
.traceId128Bit(true)
.propagationFactory(BaggagePropagation.newFactoryBuilder(B3Propagation.FACTORY)
.add(BaggagePropagationConfig.SingleBaggageField
.remote(BaggageField.create("foo")))
.build())
.sampler(Sampler.ALWAYS_SAMPLE)
.build();

brave.Tracer braveTracer = tracing.tracer();

Expand Down Expand Up @@ -133,6 +146,66 @@ void should_open_and_close_scopes_with_reactor() {
logger.info("SPAN AFTER CLOSE 1 [" + tracer.currentSpan() + "]");
}

@Test
void should_open_and_close_scopes_with_reactor_with_baggage() {
Observation obs1 = Observation.start("1", observationRegistry);

Observation.Scope scope = obs1.openScope();
Span span1 = tracer.currentSpan();
BaggageInScope baggageInScope1 = tracer.createBaggageInScope("foo", "span1");
then(tracer.getAllBaggage().get("foo")).isEqualTo("span1");
then(tracer.getAllBaggage(span1.context()).get("foo")).isEqualTo("span1");

Observation obs2 = Observation.start("2", observationRegistry);
Observation.Scope scope2 = obs2.openScope();
Span span2 = tracer.currentSpan();
BaggageInScope baggageInScope2 = tracer.createBaggageInScope(span2.context(), "foo", "span2");

then(tracer.getAllBaggage().get("foo")).isEqualTo("span2");
then(tracer.getAllBaggage(span1.context()).get("foo")).isEqualTo("span1");
then(tracer.getAllBaggage(span2.context()).get("foo")).isEqualTo("span2");

AtomicReference<AssertionError> errorInFlatMap = new AtomicReference<>();
AtomicReference<AssertionError> errorInOnNext = new AtomicReference<>();

Mono.just(1).flatMap(integer -> {
return Mono.just(2).doOnNext(integer1 -> {
Map<String, String> baggageInEmpty = tracer.getAllBaggage();
logger.info("\n\n[2] BAGGAGE IN EMPTY [" + baggageInEmpty + "]");
assertBaggageInReactor(errorInFlatMap, baggageInEmpty, null);
}).contextWrite(context -> Context.empty());
}).doOnNext(integer -> {
Map<String, String> baggageInOnNext = tracer.getAllBaggage();
logger.info("\n\n[1] SPAN IN ON NEXT [" + baggageInOnNext + "]");
assertBaggageInReactor(errorInOnNext, baggageInOnNext, "span2");
}).contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, obs2)).block();

logger.info("Checking if there were no errors in reactor");
then(errorInFlatMap).hasValue(null);
then(errorInOnNext).hasValue(null);

then(tracer.getBaggage(span2.context(), "foo").get()).isEqualTo("span2");
then(tracer.getBaggage(span1.context(), "foo").get()).isEqualTo("span1");

baggageInScope2.close();
scope2.close();
obs2.stop();
TracingContext tracingContext2 = obs2.getContext().get(TracingContext.class);
then(tracingContext2.getBaggage()).isNullOrEmpty();

then(tracer.getBaggage("foo").get()).isEqualTo("span1");
then(tracer.getBaggage(span1.context(), "foo").get()).isEqualTo("span1");
then(tracer.getBaggage(span2.context(), "foo").get()).isEqualTo("span1");

baggageInScope1.close();
scope.close();
obs1.stop();
TracingContext tracingContext1 = obs1.getContext().get(TracingContext.class);
then(tracingContext1.getBaggage()).isNullOrEmpty();

then(tracer.getAllBaggage()).isEmpty();
}

private static void assertInReactor(AtomicReference<AssertionError> errors, Span spanWOnNext, Span expectedSpan) {
try {
then(spanWOnNext).isEqualTo(expectedSpan);
Expand All @@ -142,4 +215,13 @@ private static void assertInReactor(AtomicReference<AssertionError> errors, Span
}
}

private static void assertBaggageInReactor(AtomicReference<AssertionError> errors, Map<String, String> baggageMap, String expectedValue) {
try {
then(baggageMap.get("foo")).isEqualTo(expectedValue);
}
catch (AssertionError er) {
errors.set(er);
}
}

}
Loading

0 comments on commit 25c248b

Please sign in to comment.