diff --git a/qa/logging-config/src/test/java/org/elasticsearch/common/logging/JsonLoggerTests.java b/qa/logging-config/src/test/java/org/elasticsearch/common/logging/JsonLoggerTests.java index 7e13ff0306983..3d8c5ff2301ec 100644 --- a/qa/logging-config/src/test/java/org/elasticsearch/common/logging/JsonLoggerTests.java +++ b/qa/logging-config/src/test/java/org/elasticsearch/common/logging/JsonLoggerTests.java @@ -115,6 +115,7 @@ public void testDeprecatedMessageWithoutXOpaqueId() throws IOException { public void testCompatibleLog() throws Exception { withThreadContext(threadContext -> { threadContext.putHeader(Task.X_OPAQUE_ID, "someId"); + threadContext.putHeader(Task.TRACE_ID, "someTraceId"); final DeprecationLogger testLogger = DeprecationLogger.getLogger("org.elasticsearch.test"); testLogger.deprecate(DeprecationCategory.OTHER,"someKey", "deprecated message1") .compatibleApiWarning("compatibleKey","compatible API message"); @@ -143,6 +144,7 @@ public void testCompatibleLog() throws Exception { hasEntry("message", "deprecated message1"), hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "someKey"), hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"), + hasEntry(Task.TRACE_ID, "someTraceId"), hasEntry("elasticsearch.event.category", "other") ), allOf( @@ -159,6 +161,7 @@ public void testCompatibleLog() throws Exception { hasEntry("message", "compatible API message"), hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "compatibleKey"), hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"), + hasEntry(Task.TRACE_ID, "someTraceId"), hasEntry("elasticsearch.event.category", "compatible_api") ) ) @@ -172,6 +175,7 @@ public void testCompatibleLog() throws Exception { public void testParseFieldEmittingDeprecatedLogs() throws Exception { withThreadContext(threadContext -> { threadContext.putHeader(Task.X_OPAQUE_ID, "someId"); + threadContext.putHeader(Task.TRACE_ID, "someTraceId"); ParseField deprecatedField = new ParseField("new_name", "deprecated_name"); assertTrue(deprecatedField.match("deprecated_name", LoggingDeprecationHandler.INSTANCE)); @@ -208,6 +212,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception { hasEntry("message", "Deprecated field [deprecated_name] used, expected [new_name] instead"), hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_deprecated_name"), hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"), + hasEntry(Task.TRACE_ID, "someTraceId"), hasEntry("elasticsearch.event.category", "api") ), // deprecation log for field deprecated_name2 (note it is not being throttled) @@ -224,6 +229,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception { hasEntry("message", "Deprecated field [deprecated_name2] used, expected [new_name] instead"), hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_deprecated_name2"), hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"), + hasEntry(Task.TRACE_ID, "someTraceId"), hasEntry("elasticsearch.event.category", "api") ), // compatible log line @@ -240,6 +246,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception { hasEntry("message", "Deprecated field [compatible_deprecated_name] used, expected [new_name] instead"), hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_compatible_deprecated_name"), hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"), + hasEntry(Task.TRACE_ID, "someTraceId"), hasEntry("elasticsearch.event.category", "compatible_api") ) ) @@ -255,6 +262,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception { public void testDeprecatedMessage() throws Exception { withThreadContext(threadContext -> { threadContext.putHeader(Task.X_OPAQUE_ID, "someId"); + threadContext.putHeader(Task.TRACE_ID, "someTraceId"); final DeprecationLogger testLogger = DeprecationLogger.getLogger("org.elasticsearch.test"); testLogger.deprecate(DeprecationCategory.OTHER, "someKey", "deprecated message1"); diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 066334cc94423..ffae13f4fb51d 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -434,7 +434,10 @@ public ActionModule(Settings settings, IndexNameExpressionResolver indexNameExpr destructiveOperations = new DestructiveOperations(settings, clusterSettings); Set headers = Stream.concat( actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()), - Stream.of(new RestHeaderDefinition(Task.X_OPAQUE_ID, false)) + Stream.of( + new RestHeaderDefinition(Task.X_OPAQUE_ID, false), + new RestHeaderDefinition(Task.TRACE_PARENT, false) + ) ).collect(Collectors.toSet()); UnaryOperator restWrapper = null; for (ActionPlugin plugin : actionPlugins) { diff --git a/server/src/main/java/org/elasticsearch/common/logging/ECSJsonLayout.java b/server/src/main/java/org/elasticsearch/common/logging/ECSJsonLayout.java index 96597ec98d7eb..849bf96a5b13d 100644 --- a/server/src/main/java/org/elasticsearch/common/logging/ECSJsonLayout.java +++ b/server/src/main/java/org/elasticsearch/common/logging/ECSJsonLayout.java @@ -55,11 +55,12 @@ public EcsLayout build() { private KeyValuePair[] additionalFields() { return new KeyValuePair[] { new KeyValuePair("event.dataset", dataset), + new KeyValuePair("trace.id", "%trace_id"), new KeyValuePair("elasticsearch.cluster.uuid", "%cluster_id"), new KeyValuePair("elasticsearch.node.id", "%node_id"), new KeyValuePair("elasticsearch.node.name", "%ESnode_name"), new KeyValuePair("elasticsearch.cluster.name", "${sys:es.logs.cluster_name}"), }; - } + } public String getDataset() { return dataset; diff --git a/server/src/main/java/org/elasticsearch/common/logging/TraceIdConverter.java b/server/src/main/java/org/elasticsearch/common/logging/TraceIdConverter.java new file mode 100644 index 0000000000000..d57fa4a0ae1bd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/logging/TraceIdConverter.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.logging; + +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.pattern.ConverterKeys; +import org.apache.logging.log4j.core.pattern.LogEventPatternConverter; +import org.apache.logging.log4j.core.pattern.PatternConverter; +import org.elasticsearch.tasks.Task; + +import java.util.Objects; + +/** + * Pattern converter to format the trace id provided in the traceparent header into JSON fields trace.id. + */ +@Plugin(category = PatternConverter.CATEGORY, name = "TraceIdConverter") +@ConverterKeys({"trace_id"}) +public final class TraceIdConverter extends LogEventPatternConverter { + /** + * Called by log4j2 to initialize this converter. + */ + public static TraceIdConverter newInstance(@SuppressWarnings("unused") final String[] options) { + return new TraceIdConverter(); + } + + public TraceIdConverter() { + super("trace_id", "trace_id"); + } + + public static String getTraceId() { + return HeaderWarning.THREAD_CONTEXT.stream() + .map(t -> t.getHeader(Task.TRACE_ID)) + .filter(Objects::nonNull) + .findFirst() + .orElse(null); + } + + /** + * Formats the trace.id into json fields. + * + * @param event - a log event is ignored in this method as it uses the clusterId value + * from NodeAndClusterIdStateListener to format + */ + @Override + public void format(LogEvent event, StringBuilder toAppendTo) { + String traceId = getTraceId(); + if (traceId != null) { + toAppendTo.append(traceId); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java index a2e2a3faf8aa0..400abbfa461c8 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java @@ -107,13 +107,22 @@ public StoredContext stashContext() { /** * X-Opaque-ID should be preserved in a threadContext in order to propagate this across threads. * This is needed so the DeprecationLogger in another thread can see the value of X-Opaque-ID provided by a user. + * The same is applied to Task.TRACE_ID. * Otherwise when context is stash, it should be empty. */ - if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) { - ThreadContextStruct threadContextStruct = - DEFAULT_CONTEXT.putHeaders(Map.of(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID))); + + if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID) || context.requestHeaders.containsKey(Task.TRACE_ID)) { + Map map = new HashMap<>(2, 1); + if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) { + map.put(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID)); + } + if (context.requestHeaders.containsKey(Task.TRACE_ID)) { + map.put(Task.TRACE_ID, context.requestHeaders.get(Task.TRACE_ID)); + } + ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putHeaders(map); threadLocal.set(threadContextStruct); - } else { + } + else { threadLocal.set(DEFAULT_CONTEXT); } return () -> { diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index f84136c326737..eb1fb34316d8a 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -574,7 +574,7 @@ protected Node(final Environment initialEnvironment, final Transport transport = networkModule.getTransportSupplier().get(); Set taskHeaders = Stream.concat( pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()), - Stream.of(Task.X_OPAQUE_ID) + Stream.of(Task.X_OPAQUE_ID, Task.TRACE_ID) ).collect(Collectors.toSet()); final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders); diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java index 4f5fc3a378296..f2d5ff9df6206 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestController.java +++ b/server/src/main/java/org/elasticsearch/rest/RestController.java @@ -29,6 +29,7 @@ import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.rest.RestHandler.Route; +import org.elasticsearch.tasks.Task; import org.elasticsearch.usage.UsageService; import java.io.ByteArrayOutputStream; @@ -334,29 +335,15 @@ private void sendContentTypeErrorMessage(@Nullable List contentTypeHeade } private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception { - for (final RestHeaderDefinition restHeader : headersToCopy) { - final String name = restHeader.getName(); - final List headerValues = request.getAllHeaderValues(name); - if (headerValues != null && headerValues.isEmpty() == false) { - final List distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList()); - if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) { - channel.sendResponse( - BytesRestResponse. - createSimpleErrorResponse(channel, BAD_REQUEST, "multiple values for single-valued header [" + name + "].")); - return; - } else { - threadContext.putHeader(name, String.join(",", distinctHeaderValues)); - } - } - } - // error_trace cannot be used when we disable detailed errors - // we consume the error_trace parameter first to ensure that it is always consumed - if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) { - channel.sendResponse( - BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, "error traces in responses are disabled.")); + try { + copyRestHeaders(request, threadContext); + validateErrorTrace(request, channel); + } catch (IllegalArgumentException e) { + channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, e.getMessage())); return; } + final String rawPath = request.rawPath(); final String uri = request.uri(); final RestRequest.Method requestMethod; @@ -392,6 +379,34 @@ private void tryAllHandlers(final RestRequest request, final RestChannel channel handleBadRequest(uri, requestMethod, channel); } + private void validateErrorTrace(RestRequest request, RestChannel channel) { + // error_trace cannot be used when we disable detailed errors + // we consume the error_trace parameter first to ensure that it is always consumed + if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) { + throw new IllegalArgumentException("error traces in responses are disabled."); + } + } + + private void copyRestHeaders(RestRequest request, ThreadContext threadContext) throws IOException { + for (final RestHeaderDefinition restHeader : headersToCopy) { + final String name = restHeader.getName(); + final List headerValues = request.getAllHeaderValues(name); + if (headerValues != null && headerValues.isEmpty() == false) { + final List distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList()); + if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) { + throw new IllegalArgumentException("multiple values for single-valued header [" + name + "]."); + } else if (name.equals(Task.TRACE_PARENT)) { + String traceparent = distinctHeaderValues.get(0); + if (traceparent.length() >= 55) { + threadContext.putHeader(Task.TRACE_ID, traceparent.substring(3, 35)); + } + } else { + threadContext.putHeader(name, String.join(",", distinctHeaderValues)); + } + } + } + } + Iterator getAllHandlers(@Nullable Map requestParamsRef, String rawPath) { final Supplier> paramsSupplier; if (requestParamsRef == null) { diff --git a/server/src/main/java/org/elasticsearch/tasks/Task.java b/server/src/main/java/org/elasticsearch/tasks/Task.java index 9b0a77f5be9d3..b65669a110758 100644 --- a/server/src/main/java/org/elasticsearch/tasks/Task.java +++ b/server/src/main/java/org/elasticsearch/tasks/Task.java @@ -28,6 +28,19 @@ public class Task { */ public static final String X_OPAQUE_ID = "X-Opaque-Id"; + /** + * The request header which is contained in HTTP request. We parse trace.id from it and store it in thread context. + * TRACE_PARENT once parsed in RestController.tryAllHandler is not preserved + * has to be declared as a header copied over from http request. + */ + public static final String TRACE_PARENT = "traceparent"; + + /** + * Parsed part of traceparent. It is stored in thread context and emitted in logs. + * Has to be declared as a header copied over for tasks. + */ + public static final String TRACE_ID = "trace.id"; + private final long id; private final String type; diff --git a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java index 2a548476ab7ea..0f2cadace00f2 100644 --- a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java +++ b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.http.HttpStats; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.rest.RestHandler.Route; +import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpNodeClient; import org.elasticsearch.test.rest.FakeRestRequest; @@ -163,6 +164,38 @@ public void testRequestWithDisallowedMultiValuedHeader() { assertTrue(channel.getSendResponseCalled()); } + public void testTraceParentAndTraceId() throws Exception { + final ThreadContext threadContext = client.threadPool().getThreadContext(); + Set headers = new HashSet<>(Arrays.asList(new RestHeaderDefinition(Task.TRACE_PARENT, false))); + final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService); + Map> restHeaders = new HashMap<>(); + restHeaders.put(Task.TRACE_PARENT, Collections.singletonList("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01")); + RestRequest fakeRequest = new FakeRestRequest.Builder(xContentRegistry()).withHeaders(restHeaders).build(); + final RestController spyRestController = spy(restController); + when(spyRestController.getAllHandlers(null, fakeRequest.rawPath())) + .thenReturn(new Iterator<>() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public MethodHandlers next() { + return new MethodHandlers("/") + .addMethod(GET, RestApiVersion.current(), (request, channel, client) -> { + assertEquals("0af7651916cd43dd8448eb211c80319c", threadContext.getHeader(Task.TRACE_ID)); + assertNull(threadContext.getHeader(Task.TRACE_PARENT)); + }); + } + }); + AssertingChannel channel = new AssertingChannel(fakeRequest, false, RestStatus.BAD_REQUEST); + restController.dispatchRequest(fakeRequest, channel, threadContext); + // the rest controller relies on the caller to stash the context, so we should expect these values here as we didn't stash the + // context in this test + assertEquals("0af7651916cd43dd8448eb211c80319c", threadContext.getHeader(Task.TRACE_ID)); + assertNull(threadContext.getHeader(Task.TRACE_PARENT)); + } + public void testRequestWithDisallowedMultiValuedHeaderButSameValues() { final ThreadContext threadContext = client.threadPool().getThreadContext(); Set headers = new HashSet<>(Arrays.asList(new RestHeaderDefinition("header.1", true),