Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework reactor netty context tracking #9286

Merged
merged 5 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST
if (SemconvStability.emitOldHttpSemconv()) {
internalSet(attributes, SemanticAttributes.HTTP_URL, fullUrl);
}

int resendCount = resendCountIncrementer.applyAsInt(parentContext);
if (resendCount > 0) {
attributes.put(SemanticAttributes.HTTP_RESEND_COUNT, resendCount);
}
Comment on lines +116 to +120
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
I don't really remember why I decided to put that in onEnd(), but there should be no problem adding this attribute onStart(), we might as well move it there.

}

@Override
Expand All @@ -127,11 +132,6 @@ public void onEnd(
internalNetExtractor.onEnd(attributes, request, response);
internalNetworkExtractor.onEnd(attributes, request, response);
internalServerExtractor.onEnd(attributes, request, response);

int resendCount = resendCountIncrementer.applyAsInt(context);
if (resendCount > 0) {
attributes.put(SemanticAttributes.HTTP_RESEND_COUNT, resendCount);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ void normal() {
AttributeKey.stringArrayKey("http.request.header.custom_request_header"),
asList("123", "456")),
entry(SemanticAttributes.NET_PEER_NAME, "github.com"),
entry(SemanticAttributes.NET_PEER_PORT, 123L));
entry(SemanticAttributes.NET_PEER_PORT, 123L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L));

AttributesBuilder endAttributes = Attributes.builder();
extractor.onEnd(endAttributes, Context.root(), request, response, null);
Expand All @@ -157,7 +158,6 @@ void normal() {
entry(SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH, 10L),
entry(SemanticAttributes.HTTP_STATUS_CODE, 202L),
entry(SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH, 20L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L),
entry(
AttributeKey.stringArrayKey("http.response.header.custom_response_header"),
asList("654", "321")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ void normal() {
entry(SemanticAttributes.NET_PEER_NAME, "github.com"),
entry(SemanticAttributes.NET_PEER_PORT, 123L),
entry(SemanticAttributes.SERVER_ADDRESS, "github.com"),
entry(SemanticAttributes.SERVER_PORT, 123L));
entry(SemanticAttributes.SERVER_PORT, 123L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L));

AttributesBuilder endAttributes = Attributes.builder();
extractor.onEnd(endAttributes, Context.root(), request, response, null);
Expand All @@ -157,7 +158,6 @@ void normal() {
entry(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 202L),
entry(SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH, 20L),
entry(SemanticAttributes.HTTP_RESPONSE_BODY_SIZE, 20L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L),
entry(
AttributeKey.stringArrayKey("http.response.header.custom_response_header"),
asList("654", "321")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ void normal() {
AttributeKey.stringArrayKey("http.request.header.custom_request_header"),
asList("123", "456")),
entry(SemanticAttributes.SERVER_ADDRESS, "github.com"),
entry(SemanticAttributes.SERVER_PORT, 123L));
entry(SemanticAttributes.SERVER_PORT, 123L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L));

AttributesBuilder endAttributes = Attributes.builder();
extractor.onEnd(endAttributes, Context.root(), request, response, null);
Expand All @@ -161,7 +162,6 @@ void normal() {
entry(SemanticAttributes.HTTP_REQUEST_BODY_SIZE, 10L),
entry(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 202L),
entry(SemanticAttributes.HTTP_RESPONSE_BODY_SIZE, 20L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L),
entry(
AttributeKey.stringArrayKey("http.response.header.custom_response_header"),
asList("654", "321")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResendCount;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
Expand All @@ -19,6 +20,8 @@
import reactor.netty.http.client.HttpClientResponse;

final class InstrumentationContexts {
private static final VirtualField<HttpClientRequest, Context> requestContextVirtualField =
VirtualField.find(HttpClientRequest.class, Context.class);

private static final AtomicReferenceFieldUpdater<InstrumentationContexts, Context>
parentContextUpdater =
Expand Down Expand Up @@ -56,18 +59,27 @@ Context startClientSpan(HttpClientRequest request) {
Context context = null;
if (instrumenter().shouldStart(parentContext, request)) {
context = instrumenter().start(parentContext, request);
requestContextVirtualField.set(request, context);
clientContexts.offer(new RequestAndContext(request, context));
}
return context;
}

// we are synchronizing here to ensure that spans are ended in the oder they are read from the
// queue
synchronized void endClientSpan(
@Nullable HttpClientResponse response, @Nullable Throwable error) {
void endClientSpan(@Nullable HttpClientResponse response, @Nullable Throwable error) {
HttpClientRequest request = null;
Context context = null;
RequestAndContext requestAndContext = clientContexts.poll();
if (requestAndContext != null) {
instrumenter().end(requestAndContext.context, requestAndContext.request, response, error);
if (response instanceof HttpClientRequest) {
request = (HttpClientRequest) response;
context = requestContextVirtualField.get(request);
} else if (requestAndContext != null) {
// this branch is taken when there was an error (e.g. timeout) and response was null
request = requestAndContext.request;
context = requestAndContext.context;
}

if (request != null && context != null) {
instrumenter().end(context, request, response, error);
}
}

Expand Down
Loading