Skip to content

Commit

Permalink
Adds minimal traceparent header support to Elasticsearch (#74210)
Browse files Browse the repository at this point in the history
This adds just enough support for the traceparent header to be useful in es8.
Since Elasticsearch already logs in ECS format extending it with support for transaction.id and trace.id is a quick win.
This allows us to surface server/deprecation slow logs from an instrumented application using the Trace Logs feature.
Parsing `traceparent` in http layer and populating tasks with `trace_id` which is preserved in thread context.
  • Loading branch information
Mpdreamz authored Jun 21, 2021
1 parent 40b1dc0 commit c412aae
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void testDeprecatedMessageWithoutXOpaqueId() throws IOException {
public void testCompatibleLog() throws Exception {
withThreadContext(threadContext -> {
threadContext.putHeader(Task.X_OPAQUE_ID, "someId");
threadContext.putHeader(Task.TRACE_ID, "someTraceId");
final DeprecationLogger testLogger = DeprecationLogger.getLogger("org.elasticsearch.test");
testLogger.deprecate(DeprecationCategory.OTHER,"someKey", "deprecated message1")
.compatibleApiWarning("compatibleKey","compatible API message");
Expand Down Expand Up @@ -143,6 +144,7 @@ public void testCompatibleLog() throws Exception {
hasEntry("message", "deprecated message1"),
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "someKey"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId"),
hasEntry("elasticsearch.event.category", "other")
),
allOf(
Expand All @@ -159,6 +161,7 @@ public void testCompatibleLog() throws Exception {
hasEntry("message", "compatible API message"),
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "compatibleKey"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId"),
hasEntry("elasticsearch.event.category", "compatible_api")
)
)
Expand All @@ -172,6 +175,7 @@ public void testCompatibleLog() throws Exception {
public void testParseFieldEmittingDeprecatedLogs() throws Exception {
withThreadContext(threadContext -> {
threadContext.putHeader(Task.X_OPAQUE_ID, "someId");
threadContext.putHeader(Task.TRACE_ID, "someTraceId");

ParseField deprecatedField = new ParseField("new_name", "deprecated_name");
assertTrue(deprecatedField.match("deprecated_name", LoggingDeprecationHandler.INSTANCE));
Expand Down Expand Up @@ -208,6 +212,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception {
hasEntry("message", "Deprecated field [deprecated_name] used, expected [new_name] instead"),
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_deprecated_name"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId"),
hasEntry("elasticsearch.event.category", "api")
),
// deprecation log for field deprecated_name2 (note it is not being throttled)
Expand All @@ -224,6 +229,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception {
hasEntry("message", "Deprecated field [deprecated_name2] used, expected [new_name] instead"),
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_deprecated_name2"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId"),
hasEntry("elasticsearch.event.category", "api")
),
// compatible log line
Expand All @@ -240,6 +246,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception {
hasEntry("message", "Deprecated field [compatible_deprecated_name] used, expected [new_name] instead"),
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_compatible_deprecated_name"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId"),
hasEntry("elasticsearch.event.category", "compatible_api")
)
)
Expand All @@ -255,6 +262,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception {
public void testDeprecatedMessage() throws Exception {
withThreadContext(threadContext -> {
threadContext.putHeader(Task.X_OPAQUE_ID, "someId");
threadContext.putHeader(Task.TRACE_ID, "someTraceId");
final DeprecationLogger testLogger = DeprecationLogger.getLogger("org.elasticsearch.test");
testLogger.deprecate(DeprecationCategory.OTHER, "someKey", "deprecated message1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,10 @@ public ActionModule(Settings settings, IndexNameExpressionResolver indexNameExpr
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<RestHeaderDefinition> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of(new RestHeaderDefinition(Task.X_OPAQUE_ID, false))
Stream.of(
new RestHeaderDefinition(Task.X_OPAQUE_ID, false),
new RestHeaderDefinition(Task.TRACE_PARENT, false)
)
).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ public EcsLayout build() {
private KeyValuePair[] additionalFields() {
return new KeyValuePair[] {
new KeyValuePair("event.dataset", dataset),
new KeyValuePair("trace.id", "%trace_id"),
new KeyValuePair("elasticsearch.cluster.uuid", "%cluster_id"),
new KeyValuePair("elasticsearch.node.id", "%node_id"),
new KeyValuePair("elasticsearch.node.name", "%ESnode_name"),
new KeyValuePair("elasticsearch.cluster.name", "${sys:es.logs.cluster_name}"), };
}
}

public String getDataset() {
return dataset;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.logging;

import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.pattern.ConverterKeys;
import org.apache.logging.log4j.core.pattern.LogEventPatternConverter;
import org.apache.logging.log4j.core.pattern.PatternConverter;
import org.elasticsearch.tasks.Task;

import java.util.Objects;

/**
* Pattern converter to format the trace id provided in the traceparent header into JSON fields <code>trace.id</code>.
*/
@Plugin(category = PatternConverter.CATEGORY, name = "TraceIdConverter")
@ConverterKeys({"trace_id"})
public final class TraceIdConverter extends LogEventPatternConverter {
/**
* Called by log4j2 to initialize this converter.
*/
public static TraceIdConverter newInstance(@SuppressWarnings("unused") final String[] options) {
return new TraceIdConverter();
}

public TraceIdConverter() {
super("trace_id", "trace_id");
}

public static String getTraceId() {
return HeaderWarning.THREAD_CONTEXT.stream()
.map(t -> t.<String>getHeader(Task.TRACE_ID))
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}

/**
* Formats the trace.id into json fields.
*
* @param event - a log event is ignored in this method as it uses the clusterId value
* from <code>NodeAndClusterIdStateListener</code> to format
*/
@Override
public void format(LogEvent event, StringBuilder toAppendTo) {
String traceId = getTraceId();
if (traceId != null) {
toAppendTo.append(traceId);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,22 @@ public StoredContext stashContext() {
/**
* X-Opaque-ID should be preserved in a threadContext in order to propagate this across threads.
* This is needed so the DeprecationLogger in another thread can see the value of X-Opaque-ID provided by a user.
* The same is applied to Task.TRACE_ID.
* Otherwise when context is stash, it should be empty.
*/
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
ThreadContextStruct threadContextStruct =
DEFAULT_CONTEXT.putHeaders(Map.of(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID)));

if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID) || context.requestHeaders.containsKey(Task.TRACE_ID)) {
Map<String, String> map = new HashMap<>(2, 1);
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
map.put(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID));
}
if (context.requestHeaders.containsKey(Task.TRACE_ID)) {
map.put(Task.TRACE_ID, context.requestHeaders.get(Task.TRACE_ID));
}
ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putHeaders(map);
threadLocal.set(threadContextStruct);
} else {
}
else {
threadLocal.set(DEFAULT_CONTEXT);
}
return () -> {
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ protected Node(final Environment initialEnvironment,
final Transport transport = networkModule.getTransportSupplier().get();
Set<String> taskHeaders = Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID)
Stream.of(Task.X_OPAQUE_ID, Task.TRACE_ID)
).collect(Collectors.toSet());
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
Expand Down
55 changes: 35 additions & 20 deletions server/src/main/java/org/elasticsearch/rest/RestController.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.rest.RestHandler.Route;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.usage.UsageService;

import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -334,29 +335,15 @@ private void sendContentTypeErrorMessage(@Nullable List<String> contentTypeHeade
}

private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {
for (final RestHeaderDefinition restHeader : headersToCopy) {
final String name = restHeader.getName();
final List<String> headerValues = request.getAllHeaderValues(name);
if (headerValues != null && headerValues.isEmpty() == false) {
final List<String> distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList());
if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) {
channel.sendResponse(
BytesRestResponse.
createSimpleErrorResponse(channel, BAD_REQUEST, "multiple values for single-valued header [" + name + "]."));
return;
} else {
threadContext.putHeader(name, String.join(",", distinctHeaderValues));
}
}
}
// error_trace cannot be used when we disable detailed errors
// we consume the error_trace parameter first to ensure that it is always consumed
if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) {
channel.sendResponse(
BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, "error traces in responses are disabled."));
try {
copyRestHeaders(request, threadContext);
validateErrorTrace(request, channel);
} catch (IllegalArgumentException e) {
channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, e.getMessage()));
return;
}


final String rawPath = request.rawPath();
final String uri = request.uri();
final RestRequest.Method requestMethod;
Expand Down Expand Up @@ -392,6 +379,34 @@ private void tryAllHandlers(final RestRequest request, final RestChannel channel
handleBadRequest(uri, requestMethod, channel);
}

private void validateErrorTrace(RestRequest request, RestChannel channel) {
// error_trace cannot be used when we disable detailed errors
// we consume the error_trace parameter first to ensure that it is always consumed
if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) {
throw new IllegalArgumentException("error traces in responses are disabled.");
}
}

private void copyRestHeaders(RestRequest request, ThreadContext threadContext) throws IOException {
for (final RestHeaderDefinition restHeader : headersToCopy) {
final String name = restHeader.getName();
final List<String> headerValues = request.getAllHeaderValues(name);
if (headerValues != null && headerValues.isEmpty() == false) {
final List<String> distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList());
if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) {
throw new IllegalArgumentException("multiple values for single-valued header [" + name + "].");
} else if (name.equals(Task.TRACE_PARENT)) {
String traceparent = distinctHeaderValues.get(0);
if (traceparent.length() >= 55) {
threadContext.putHeader(Task.TRACE_ID, traceparent.substring(3, 35));
}
} else {
threadContext.putHeader(name, String.join(",", distinctHeaderValues));
}
}
}
}

Iterator<MethodHandlers> getAllHandlers(@Nullable Map<String, String> requestParamsRef, String rawPath) {
final Supplier<Map<String, String>> paramsSupplier;
if (requestParamsRef == null) {
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/elasticsearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ public class Task {
*/
public static final String X_OPAQUE_ID = "X-Opaque-Id";

/**
* The request header which is contained in HTTP request. We parse trace.id from it and store it in thread context.
* TRACE_PARENT once parsed in RestController.tryAllHandler is not preserved
* has to be declared as a header copied over from http request.
*/
public static final String TRACE_PARENT = "traceparent";

/**
* Parsed part of traceparent. It is stored in thread context and emitted in logs.
* Has to be declared as a header copied over for tasks.
*/
public static final String TRACE_ID = "trace.id";

private final long id;

private final String type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.rest.RestHandler.Route;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpNodeClient;
import org.elasticsearch.test.rest.FakeRestRequest;
Expand Down Expand Up @@ -163,6 +164,38 @@ public void testRequestWithDisallowedMultiValuedHeader() {
assertTrue(channel.getSendResponseCalled());
}

public void testTraceParentAndTraceId() throws Exception {
final ThreadContext threadContext = client.threadPool().getThreadContext();
Set<RestHeaderDefinition> headers = new HashSet<>(Arrays.asList(new RestHeaderDefinition(Task.TRACE_PARENT, false)));
final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService);
Map<String, List<String>> restHeaders = new HashMap<>();
restHeaders.put(Task.TRACE_PARENT, Collections.singletonList("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"));
RestRequest fakeRequest = new FakeRestRequest.Builder(xContentRegistry()).withHeaders(restHeaders).build();
final RestController spyRestController = spy(restController);
when(spyRestController.getAllHandlers(null, fakeRequest.rawPath()))
.thenReturn(new Iterator<>() {
@Override
public boolean hasNext() {
return false;
}

@Override
public MethodHandlers next() {
return new MethodHandlers("/")
.addMethod(GET, RestApiVersion.current(), (request, channel, client) -> {
assertEquals("0af7651916cd43dd8448eb211c80319c", threadContext.getHeader(Task.TRACE_ID));
assertNull(threadContext.getHeader(Task.TRACE_PARENT));
});
}
});
AssertingChannel channel = new AssertingChannel(fakeRequest, false, RestStatus.BAD_REQUEST);
restController.dispatchRequest(fakeRequest, channel, threadContext);
// the rest controller relies on the caller to stash the context, so we should expect these values here as we didn't stash the
// context in this test
assertEquals("0af7651916cd43dd8448eb211c80319c", threadContext.getHeader(Task.TRACE_ID));
assertNull(threadContext.getHeader(Task.TRACE_PARENT));
}

public void testRequestWithDisallowedMultiValuedHeaderButSameValues() {
final ThreadContext threadContext = client.threadPool().getThreadContext();
Set<RestHeaderDefinition> headers = new HashSet<>(Arrays.asList(new RestHeaderDefinition("header.1", true),
Expand Down

0 comments on commit c412aae

Please sign in to comment.