Skip to content

Commit

Permalink
Context propagation to elasticsearch-rest callbacks (#3858)
Browse files Browse the repository at this point in the history
* Context propagation to elasticsearch-rest callbacks

* remove unused import
  • Loading branch information
laurit committed Aug 19, 2021
1 parent f89579a commit f05c378
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ public static void onEnter(
@Advice.Local("otelScope") Scope scope,
@Advice.Argument(value = 5, readOnly = false) ResponseListener responseListener) {

context = tracer().startSpan(currentContext(), null, method + " " + endpoint);
Context parentContext = currentContext();
context = tracer().startSpan(parentContext, null, method + " " + endpoint);
scope = context.makeCurrent();

responseListener = new RestResponseListener(responseListener, context);
responseListener = new RestResponseListener(responseListener, context, parentContext);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
*/

import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL

import groovy.json.JsonSlurper
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.CountDownLatch
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
import org.apache.http.util.EntityUtils
import org.elasticsearch.client.Response
import org.elasticsearch.client.ResponseListener
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestClientBuilder
import org.testcontainers.elasticsearch.ElasticsearchContainer
Expand Down Expand Up @@ -78,4 +81,66 @@ class Elasticsearch5RestClientTest extends AgentInstrumentationSpecification {
}
}
}

def "test elasticsearch status async"() {
setup:
Response requestResponse = null
Exception exception = null
CountDownLatch countDownLatch = new CountDownLatch(1)
ResponseListener responseListener = new ResponseListener() {
@Override
void onSuccess(Response response) {
runWithSpan("callback") {
requestResponse = response
countDownLatch.countDown()
}
}

@Override
void onFailure(Exception e) {
runWithSpan("callback") {
exception = e
countDownLatch.countDown()
}
}
}
runWithSpan("parent") {
client.performRequestAsync("GET", "_cluster/health", responseListener)
}
countDownLatch.await()

if (exception != null) {
throw exception
}
Map result = new JsonSlurper().parseText(EntityUtils.toString(requestResponse.entity))

expect:
result.status == "green"

assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "GET _cluster/health"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch"
"${SemanticAttributes.DB_OPERATION.key}" "GET _cluster/health"
"${SemanticAttributes.NET_PEER_NAME.key}" httpHost.hostName
"${SemanticAttributes.NET_PEER_PORT.key}" httpHost.port
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ public static void onEnter(
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {

Context parentContext = currentContext();
context =
tracer()
.startSpan(currentContext(), null, request.getMethod() + " " + request.getEndpoint());
.startSpan(parentContext, null, request.getMethod() + " " + request.getEndpoint());
scope = context.makeCurrent();

responseListener = new RestResponseListener(responseListener, context);
responseListener = new RestResponseListener(responseListener, context, parentContext);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
*/

import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL

import groovy.json.JsonSlurper
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.CountDownLatch
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
import org.apache.http.util.EntityUtils
import org.elasticsearch.client.Response
import org.elasticsearch.client.ResponseListener
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestClientBuilder
import org.testcontainers.elasticsearch.ElasticsearchContainer
Expand Down Expand Up @@ -73,4 +76,66 @@ class Elasticsearch6RestClientTest extends AgentInstrumentationSpecification {
}
}
}

def "test elasticsearch status async"() {
setup:
Response requestResponse = null
Exception exception = null
CountDownLatch countDownLatch = new CountDownLatch(1)
ResponseListener responseListener = new ResponseListener() {
@Override
void onSuccess(Response response) {
runWithSpan("callback") {
requestResponse = response
countDownLatch.countDown()
}
}

@Override
void onFailure(Exception e) {
runWithSpan("callback") {
exception = e
countDownLatch.countDown()
}
}
}
runWithSpan("parent") {
client.performRequestAsync("GET", "_cluster/health", responseListener)
}
countDownLatch.await()

if (exception != null) {
throw exception
}
Map result = new JsonSlurper().parseText(EntityUtils.toString(requestResponse.entity))

expect:
result.status == "green"

assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "GET _cluster/health"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch"
"${SemanticAttributes.DB_OPERATION.key}" "GET _cluster/health"
"${SemanticAttributes.NET_PEER_NAME.key}" httpHost.hostName
"${SemanticAttributes.NET_PEER_PORT.key}" httpHost.port
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,13 @@ public static void onEnter(
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {

Context parentContext = currentContext();
context =
tracer()
.startSpan(currentContext(), null, request.getMethod() + " " + request.getEndpoint());
.startSpan(parentContext, null, request.getMethod() + " " + request.getEndpoint());
scope = context.makeCurrent();

responseListener = new RestResponseListener(responseListener, context);
responseListener = new RestResponseListener(responseListener, context, parentContext);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL

import groovy.json.JsonSlurper
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
Expand Down Expand Up @@ -83,17 +84,23 @@ class Elasticsearch7RestClientTest extends AgentInstrumentationSpecification {
ResponseListener responseListener = new ResponseListener() {
@Override
void onSuccess(Response response) {
requestResponse = response
countDownLatch.countDown()
runWithSpan("callback") {
requestResponse = response
countDownLatch.countDown()
}
}

@Override
void onFailure(Exception e) {
exception = e
countDownLatch.countDown()
runWithSpan("callback") {
exception = e
countDownLatch.countDown()
}
}
}
client.performRequestAsync(new Request("GET", "_cluster/health"), responseListener)
runWithSpan("parent") {
client.performRequestAsync(new Request("GET", "_cluster/health"), responseListener)
}
countDownLatch.await()

if (exception != null) {
Expand All @@ -105,18 +112,28 @@ class Elasticsearch7RestClientTest extends AgentInstrumentationSpecification {
result.status == "green"

assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "GET _cluster/health"
kind CLIENT
hasNoParent()
childOf(span(0))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch"
"${SemanticAttributes.DB_OPERATION.key}" "GET _cluster/health"
"${SemanticAttributes.NET_PEER_NAME.key}" httpHost.hostName
"${SemanticAttributes.NET_PEER_PORT.key}" httpHost.port
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@
import static io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.ElasticsearchRestClientTracer.tracer;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;

public class RestResponseListener implements ResponseListener {

private final ResponseListener listener;
private final Context context;
private final Context parentContext;

public RestResponseListener(ResponseListener listener, Context context) {
public RestResponseListener(ResponseListener listener, Context context, Context parentContext) {
this.listener = listener;
this.context = context;
this.parentContext = parentContext;
}

@Override
Expand All @@ -28,12 +31,16 @@ public void onSuccess(Response response) {
}
tracer().end(context);

listener.onSuccess(response);
try (Scope ignored = parentContext.makeCurrent()) {
listener.onSuccess(response);
}
}

@Override
public void onFailure(Exception e) {
tracer().endExceptionally(context, e);
listener.onFailure(e);
try (Scope ignored = parentContext.makeCurrent()) {
listener.onFailure(e);
}
}
}

0 comments on commit f05c378

Please sign in to comment.