diff --git a/libs/telemetry/build.gradle b/libs/telemetry/build.gradle index ce94698836b4f..7f4325fa3d6fe 100644 --- a/libs/telemetry/build.gradle +++ b/libs/telemetry/build.gradle @@ -10,6 +10,9 @@ */ dependencies { + // logging + api "org.apache.logging.log4j:log4j-api:${versions.log4j}" + testImplementation "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" testImplementation "junit:junit:${versions.junit}" testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}" @@ -21,3 +24,18 @@ dependencies { tasks.named('forbiddenApisMain').configure { replaceSignatureFiles 'jdk-signatures' } + +thirdPartyAudit { + ignoreMissingClasses( + 'org.osgi.framework.Bundle', + 'org.osgi.framework.BundleActivator', + 'org.osgi.framework.BundleContext', + 'org.osgi.framework.BundleEvent', + 'org.osgi.framework.FrameworkUtil', + 'org.osgi.framework.ServiceReference', + 'org.osgi.framework.ServiceRegistration', + 'org.osgi.framework.SynchronousBundleListener', + 'org.osgi.framework.wiring.BundleWire', + 'org.osgi.framework.wiring.BundleWiring' + ) +} diff --git a/libs/telemetry/licenses/log4j-api-2.20.0.jar.sha1 b/libs/telemetry/licenses/log4j-api-2.20.0.jar.sha1 new file mode 100644 index 0000000000000..37154d9861ac0 --- /dev/null +++ b/libs/telemetry/licenses/log4j-api-2.20.0.jar.sha1 @@ -0,0 +1 @@ +1fe6082e660daf07c689a89c94dc0f49c26b44bb \ No newline at end of file diff --git a/libs/telemetry/licenses/log4j-api-LICENSE.txt b/libs/telemetry/licenses/log4j-api-LICENSE.txt new file mode 100644 index 0000000000000..6279e5206de13 --- /dev/null +++ b/libs/telemetry/licenses/log4j-api-LICENSE.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 1999-2005 The Apache Software Foundation + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/libs/telemetry/licenses/log4j-api-NOTICE.txt b/libs/telemetry/licenses/log4j-api-NOTICE.txt new file mode 100644 index 0000000000000..0375732360047 --- /dev/null +++ b/libs/telemetry/licenses/log4j-api-NOTICE.txt @@ -0,0 +1,5 @@ +Apache log4j +Copyright 2007 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/DiagnosticSpan.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/DiagnosticSpan.java new file mode 100644 index 0000000000000..c41f28599e38a --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/DiagnosticSpan.java @@ -0,0 +1,204 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.diagnostics; + +import org.opensearch.telemetry.metrics.MetricPoint; +import org.opensearch.telemetry.tracing.Span; + +import java.util.HashMap; +import java.util.Map; + +/** + * Represents a diagnostic span that wraps an underlying span implementation. + * Provides additional functionality to store metrics and baggage for retrieving the span attributes. + */ +public class DiagnosticSpan implements Span { + + private final Span delegateSpan; + private final Map baggage; + private final Map metricMap; + + private final static String SPAN_NAME = "span_name"; + + /** + * Constructs a DiagnosticSpan object with the specified underlying span. + * + * @param span the underlying span to wrap + */ + public DiagnosticSpan(Span span) { + this.delegateSpan = span; + this.baggage = new HashMap<>(); + this.metricMap = new HashMap<>(); + baggage.put(SPAN_NAME, span.getSpanName()); + } + + /** + * Removes the metric associated with the given ID from the metric map. + * + * @param id the ID of the metric to remove + * @return the removed metric, or null if no metric was found for the given ID + */ + public MetricPoint removeMetric(String id) { + return metricMap.remove(id); + } + + /** + * Associates the specified metric with the given ID in the metric map. + * + * @param id the ID of the metric + * @param metric the metric to be associated with the ID + */ + public void putMetric(String id, MetricPoint metric) { + metricMap.put(id, metric); + } + + /** + * Ends the diagnostic span by calling the endSpan method of the underlying span. + */ + @Override + public void endSpan() { + delegateSpan.endSpan(); + } + + /** + * Returns the parent span of the diagnostic span. + * + * @return the parent span of the diagnostic span. + */ + @Override + public Span getParentSpan() { + return delegateSpan.getParentSpan(); + } + + /** + * Returns the name of the diagnostic span. + * + * @return the name of the diagnostic span. + */ + @Override + public String getSpanName() { + return delegateSpan.getSpanName(); + } + + /** + * Adds a string type attribute to the diagnostic span's baggage and the underlying span. + * + * @param key the key of the attribute. + * @param value the value of the attribute. + */ + @Override + public void addAttribute(String key, String value) { + baggage.put(key, value); + delegateSpan.addAttribute(key, value); + } + + /** + * Adds a long type attribute to the diagnostic span's baggage and the underlying span. + * + * @param key the key of the attribute. + * @param value the value of the attribute. + */ + @Override + public void addAttribute(String key, Long value) { + baggage.put(key, value); + delegateSpan.addAttribute(key, value); + } + + /** + * Adds a double type attribute to the diagnostic span's baggage and the underlying span. + * + * @param key the key of the attribute. + * @param value the value of the attribute. + */ + @Override + public void addAttribute(String key, Double value) { + baggage.put(key, value); + delegateSpan.addAttribute(key, value); + } + + /** + * Adds a boolean type attribute to the diagnostic span's baggage and the underlying span. + * + * @param key the key of the attribute. + * @param value the value of the attribute. + */ + @Override + public void addAttribute(String key, Boolean value) { + baggage.put(key, value); + delegateSpan.addAttribute(key, value); + } + + /** + * Returns the attributes in the diagnostic span's baggage. + * + * @return the attributes in the diagnostic span's baggage. + */ + public Map getAttributes() { + return baggage; + } + + /** + * Records an error in the diagnostic span by calling the setError method of the underlying span. + * + * @param exception the exception to be recorded. + */ + @Override + public void setError(Exception exception) { + delegateSpan.setError(exception); + } + + /** + * Adds an event to the diagnostic span by calling the addEvent method of the underlying span. + * + * @param event the name of the event. + */ + @Override + public void addEvent(String event) { + delegateSpan.addEvent(event); + } + + /** + * Returns the trace ID of the diagnostic span. + * + * @return the trace ID of the diagnostic span. + */ + @Override + public String getTraceId() { + return delegateSpan.getTraceId(); + } + + /** + * Returns the span ID of the diagnostic span. + * + * @return the span ID of the diagnostic span. + */ + @Override + public String getSpanId() { + return delegateSpan.getSpanId(); + } + + /** + * Checks if the diagnostic span has ended. + * + * @return true if the diagnostic span has ended, false otherwise. + */ + @Override + public boolean hasEnded() { + return delegateSpan.hasEnded(); + } + + /** + * Returns the original underlying span. + * + * @return the original underlying span. + */ + public Span unwrap() { + return delegateSpan; + } +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/DiagnosticsEventListener.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/DiagnosticsEventListener.java new file mode 100644 index 0000000000000..8c6b9f388733d --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/DiagnosticsEventListener.java @@ -0,0 +1,152 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.diagnostics; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.telemetry.metrics.Measurement; +import org.opensearch.telemetry.metrics.MetricPoint; +import org.opensearch.telemetry.metrics.MetricEmitter; +import org.opensearch.telemetry.tracing.listeners.TraceEventListener; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * One of the pre-defined TraceEventListener for recording the thread usage using {@link ThreadResourceRecorder} and + * emitting metrics using {@link MetricEmitter}. + * When a {@link Tracer} is wrapped with {@link TraceEventsService#wrapAndSetTracer(Tracer)} + * and all Runnable associated with a trace are wrapped with {@link TraceEventsService#wrapRunnable(Runnable)}, + * this class records the resource consumption of the complete trace using provided {@link ThreadResourceRecorder} and emits corresponding metrics using + * {@link MetricEmitter}. + * The span created by {@link org.opensearch.telemetry.tracing.TracingTelemetry#createSpan(String, Span)} must be wrapped with {@link DiagnosticSpan} + * using {@link TraceEventsService#wrapWithDiagnosticSpan(Span)} + */ +public class DiagnosticsEventListener implements TraceEventListener { + private static final Logger logger = LogManager.getLogger(DiagnosticsEventListener.class); + private final ThreadResourceRecorder threadResourceRecorder; + private final MetricEmitter metricEmitter; + + /** + * Key used to store the start time of a span in the span attributes (timestamp in milliseconds). + */ + public final static String START_SPAN_TIME = "start_span_time"; + + /** + * Key used to store the elapsed time of a span in the span attributes (duration in milliseconds). + */ + public final static String ELAPSED_TIME = "elapsed_time"; + + /** + * Constructs a new DiagnosticsTraceEventListener with the specified tracer, thread resource recorder, + * and metric emitter. + * + * @param threadResourceRecorder the thread resource recorder responsible for recording resource usage + * @param metricEmitter the metric emitter used for emitting diagnostic metrics + */ + public DiagnosticsEventListener(ThreadResourceRecorder threadResourceRecorder, MetricEmitter metricEmitter) { + this.threadResourceRecorder = threadResourceRecorder; + this.metricEmitter = metricEmitter; + } + + /** + * Called when a span is started. It starts recording resources for the associated thread. + * + * @param span the current span + * @param t the thread which started the span + */ + @Override + public void onSpanStart(Span span, Thread t) { + if (!ensureDiagnosticSpan(span)) { + return; + } + DiagnosticSpan diagnosticSpan = (DiagnosticSpan) span; + threadResourceRecorder.startRecording(diagnosticSpan, t, true); + diagnosticSpan.putMetric(START_SPAN_TIME, new MetricPoint(Collections.emptyMap(), null, System.currentTimeMillis())); + } + + /** + * Called when a span is completed for a thread. It emits the metric reported by + * {@link ThreadResourceRecorder#endRecording} + * + * @param span the current span + * @param t the thread which completed the span + */ + @Override + public void onSpanComplete(Span span, Thread t) { + if (!ensureDiagnosticSpan(span)) { + return; + } + DiagnosticSpan diagnosticSpan = (DiagnosticSpan) span; + MetricPoint diffMetric = threadResourceRecorder.endRecording(diagnosticSpan, t, true); + metricEmitter.emitMetric(addElapsedTimeMeasurement(diagnosticSpan, diffMetric)); + } + + /** + * Called when a runnable is started within a span. + * It starts recording resource usage for the thread. + * + * @param span the current span + * @param t the thread for which the runnable is started + */ + @Override + public void onRunnableStart(Span span, Thread t) { + if (!ensureDiagnosticSpan(span)) { + return; + } + threadResourceRecorder.startRecording((DiagnosticSpan) span, t, false); + } + + /** + * Called when a runnable is finished by a thread within a span. It emits the metric reported by + * {@link ThreadResourceRecorder#endRecording} + * + * @param span the current span + * @param t the thread for which the runnable is finished + */ + @Override + public void onRunnableComplete(Span span, Thread t) { + if (!ensureDiagnosticSpan(span)) { + return; + } + metricEmitter.emitMetric(threadResourceRecorder.endRecording((DiagnosticSpan) span, t, false)); + } + + /** + * Check if TraceEventListener is enabled + * TODO - replace with operation based logic + * @param span the current span + * @return true is this event listener is active + */ + @Override + public boolean isEnabled(Span span) { + return span instanceof DiagnosticSpan; + } + + private boolean ensureDiagnosticSpan(Span span) { + if (span instanceof DiagnosticSpan) { + return true; + } else { + logger.debug("Non diagnostic span detected while processing DiagnosticEventListener for span {} {}", span, new Throwable()); + return false; + } + } + + private MetricPoint addElapsedTimeMeasurement(DiagnosticSpan span, MetricPoint diffMetric) { + long elapsedTime = System.currentTimeMillis() - span.removeMetric(START_SPAN_TIME).getObservationTime(); + Measurement elapsedTimeMeasurement = new Measurement<>(ELAPSED_TIME, elapsedTime); + Map> diffMeasurements = new HashMap<>(diffMetric.getMeasurements()); + diffMeasurements.put(ELAPSED_TIME, elapsedTimeMeasurement); + return new MetricPoint(diffMeasurements, span.getAttributes(), diffMetric.getObservationTime()); + } +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/ThreadResourceObserver.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/ThreadResourceObserver.java new file mode 100644 index 0000000000000..022b2d48b9579 --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/ThreadResourceObserver.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.diagnostics; + +import org.opensearch.telemetry.metrics.MetricPoint; + +/** + * Reports observed {@link MetricPoint} of a resource. It is supposed to be a singleton class + */ +public interface ThreadResourceObserver { + /** + * Observed gauge associated with a resource + * Multiple threads can call observe at same time, ensure the thread safety among shared state if any + * @param t thread to be observed + * @return observed usage + */ + MetricPoint observe(Thread t); +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/ThreadResourceRecorder.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/ThreadResourceRecorder.java new file mode 100644 index 0000000000000..6da1f23693e34 --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/ThreadResourceRecorder.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.diagnostics; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.telemetry.metrics.MetricPoint; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.listeners.TraceEventListener; + +/** + * Records the {@link MetricPoint} for a {@link ThreadResourceObserver}. + * It abstracts out the diff logic between gauge {@link MetricPoint} reported by {@link ThreadResourceRecorder#startRecording} and + * {@link ThreadResourceRecorder#endRecording} when endRecording is invoked. + * Implementation of this class should be thread-safe. + * It maintains the state between {@link #startRecording(DiagnosticSpan, Thread, boolean)} and {@link #endRecording(DiagnosticSpan, Thread, boolean)} + * using {@link DiagnosticSpan#putMetric(String, MetricPoint)} assuming {@link Span} context propagation is taken care by tracing framework. + * @param the type of ThreadResourceObserver + */ +public abstract class ThreadResourceRecorder { + + private static final Logger logger = LogManager.getLogger(ThreadResourceRecorder.class); + + private final T observer; + + /** + * Constructs a ThreadResourceRecorder with the specified ThreadResourceObserver. + * + * @param observer the ThreadResourceObserver responsible for observing the thread's resource + */ + public ThreadResourceRecorder(T observer) { + this.observer = observer; + } + + /** + * Starts recording the metric for the given Span and thread. + * The observation is obtained from the associated ThreadResourceObserver. + * + * @param span the DiagnosticSpan to record the metric for + * @param t the thread for which to record the metric + * @param startSpanEvent true if it is invoked as a result of start span trace event + */ + public void startRecording(DiagnosticSpan span, Thread t, boolean startSpanEvent) { + MetricPoint observation = observer.observe(t); + span.putMetric(String.valueOf(t.getId()), observation); + } + + /** + * Ends recording the metric for the given Span and thread. + * The start metric is retrieved from the DiagnosticSpan and the end metric is obtained + * from the associated ThreadResourceObserver. + * The computed diff metric is returned. + * + * @param span the DiagnosticSpan to end the recording for + * @param t the thread for which to end the recording + * @param endSpan a flag indicating whether its invoked as a result of {@link TraceEventListener#onSpanComplete(Span, Thread)} + * @return the computed diff metric between the start and end metrics + */ + public MetricPoint endRecording(DiagnosticSpan span, Thread t, boolean endSpan) { + MetricPoint startMetric = span.removeMetric(String.valueOf(t.getId())); + MetricPoint endMetric = observer.observe(t); + if (startMetric == null) { + logger.debug("Start metric is missing for span:{} {}", span, new Throwable()); + // this scenario should never happen. We don't throw an exception instead return zero usage + return computeDiff(endMetric, endMetric); + } + return computeDiff(startMetric, endMetric); + } + + /** + * Computes the diff metric between the start and end metrics. + * Subclasses should implement this method to define the specific diff logic. + * + * @param startMetric the start metric + * @param endMetric the end metric + * @return the computed diff metric + */ + protected abstract MetricPoint computeDiff(MetricPoint startMetric, MetricPoint endMetric); +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/package-info.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/package-info.java new file mode 100644 index 0000000000000..69ecbafc3a437 --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Contains all diagnostics related class and abstract implementations + */ +package org.opensearch.telemetry.diagnostics; diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/Measurement.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/Measurement.java new file mode 100644 index 0000000000000..79dd6b953be80 --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/Measurement.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.metrics; + +/** + * Represents a measurement with a name and a corresponding value. + * + * @param the type of the value + */ +public class Measurement { + String name; + T value; + + /** + * Constructs a new Measurement with the specified name and value. + * + * @param name the name of the measurement + * @param value the value of the measurement + */ + public Measurement(String name, T value) { + this.name = name; + this.value = value; + } + + /** + * Returns the name of the measurement. + * + * @return the name of the measurement + */ + public String getName() { + return name; + } + + /** + * Returns the value of the measurement. + * + * @return the value of the measurement + */ + public T getValue() { + return value; + } +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricEmitter.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricEmitter.java new file mode 100644 index 0000000000000..8357a322f2472 --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricEmitter.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.metrics; + +/** + * Consumers of this interface can implement custom metric emission logic. For example, it can be integrated + * with OpenTelemetry meters or can be used just for printing on console for testing purposes + */ +public interface MetricEmitter { + + /** + * Emits the provided metric according to the custom implementation logic. + * + * @param metric the metric to emit + */ + void emitMetric(MetricPoint metric); +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricPoint.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricPoint.java new file mode 100644 index 0000000000000..62af88b0c6a3a --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricPoint.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.metrics; + +import java.util.Map; + +/** + * Represents a metric point with a list of measurements, attributes, and their observation time. + */ +public class MetricPoint { + private final Map> measurements; + private final long observationTime; + private final Map attributes; + + /** + * Constructs a new Metric with the specified measurements, attributes, and observation time. + * + * @param measurements the measurements associated with the metric + * @param attributes the attributes associated with the metric + * @param observationTime the observation time of the metric in milliseconds + * @throws IllegalArgumentException if any of the input parameters are null + */ + public MetricPoint(Map> measurements, Map attributes, long observationTime) { + if (measurements == null) { + throw new IllegalArgumentException("Measurements cannot be null"); + } + + this.measurements = measurements; + this.attributes = attributes; + this.observationTime = observationTime; + } + + /** + * Returns the observation time of the metric. + * + * @return the observation time in milliseconds + */ + public long getObservationTime() { + return observationTime; + } + + /** + * Returns the measurement associated with the specified name. + * + * @param name the name of the measurement + * @return the measurement object, or null if not found + */ + public Measurement getMeasurement(String name) { + return measurements.get(name); + } + + /** + * Returns an unmodifiable map of all the measurements associated with the metric. + * + * @return an unmodifiable map of measurement names to measurement objects + */ + public Map> getMeasurements() { + return measurements; + } + + /** + * Returns an unmodifiable map of the attributes associated with the metric. + * + * @return an unmodifiable map of attribute keys to attribute values + */ + public Map getAttributes() { + return attributes; + } + +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/AbstractSpan.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/AbstractSpan.java index 150a32b14d0f8..7e2eaf23f11ca 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/AbstractSpan.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/AbstractSpan.java @@ -19,8 +19,9 @@ public abstract class AbstractSpan implements Span { * name of the span */ private final String spanName; + /** - * span's parent span + * The parent span of this span. */ private final Span parentSpan; @@ -34,14 +35,23 @@ protected AbstractSpan(String spanName, Span parentSpan) { this.parentSpan = parentSpan; } + /** + * Gets the parent span of this span. + * + * @return the parent span of this span, or {@code null} if there's no parent. + */ @Override public Span getParentSpan() { return parentSpan; } + /** + * Gets the name of the span. + * + * @return the name of the span. + */ @Override public String getSpanName() { return spanName; } - } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java index ea59eec645420..dfa125f3f3b0a 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java @@ -10,6 +10,8 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Collections; +import java.util.Map; /** * @@ -37,9 +39,19 @@ public DefaultTracer(TracingTelemetry tracingTelemetry, TracerContextStorage attributes) { Span span = createSpan(spanName, getCurrentSpan()); setCurrentSpanInContext(span); addDefaultAttributes(span); + if (attributes != null) { + for (String name : attributes.keySet()) { + span.addAttribute(name, attributes.get(name)); + } + } return new DefaultSpanScope(span, (scopeSpan) -> endSpan(scopeSpan)); } @@ -48,8 +60,8 @@ public void close() throws IOException { ((Closeable) tracingTelemetry).close(); } - // Visible for testing - Span getCurrentSpan() { + @Override + public Span getCurrentSpan() { return tracerContextStorage.get(TracerContextStorage.CURRENT_SPAN); } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Span.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Span.java index 6cb1c8234f3de..9038ce5ed1663 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Span.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Span.java @@ -90,4 +90,10 @@ public interface Span { */ String getSpanId(); + /** + * Checks if the span has ended + * @return true if span has ended + */ + boolean hasEnded(); + } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java index d422b58aa0a9f..10c80d3f1afaf 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java @@ -9,6 +9,7 @@ package org.opensearch.telemetry.tracing; import java.io.Closeable; +import java.util.Map; /** * Tracer is the interface used to create a {@link Span} @@ -26,4 +27,18 @@ public interface Tracer extends Closeable { */ SpanScope startSpan(String spanName); + /** + * Starts the {@link Span} with given name + * + * @param spanName span name + * @param attributes initial attributes + * @return scope of the span, must be closed with explicit close or with try-with-resource + */ + SpanScope startSpan(String spanName, Map attributes); + + /** + * Get the current span. Should return null if there is no active span + */ + Span getCurrentSpan(); + } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/RunnableEventListener.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/RunnableEventListener.java new file mode 100644 index 0000000000000..a38f5e916987c --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/RunnableEventListener.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.listeners; + +import org.opensearch.telemetry.tracing.Span; + +/** + * The RunnableEventListener interface defines the contract for listeners that handle events related to the execution + * of Runnables in a traced environment. + */ +public interface RunnableEventListener { + + /** + * Called when a runnable starts executing on a given span + * + * @param span the current span associated with the execution context + * null span implies there is no active span + * @param t the thread executing the runnable + */ + void onRunnableStart(final Span span, Thread t); + + /** + * Called when a runnable completes execution on a given span + * + * @param span the current span associated with the execution context + * null span implies there is no active span + * @param t the thread executing the runnable + */ + void onRunnableComplete(final Span span, Thread t); + + /** + * Checks whether the listener is enabled for the given span. + * TODO - replace with operation based flag + * @param span the span associated with the execution context + * @return true if the listener is enabled for the span, false otherwise + */ + boolean isEnabled(final Span span); +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/SpanEventListener.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/SpanEventListener.java new file mode 100644 index 0000000000000..b72533f01aa9c --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/SpanEventListener.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.listeners; + +import org.opensearch.telemetry.tracing.Span; + +/** + * The TraceEventListener interface defines the contract for listeners that handle events related to trace. + */ +public interface SpanEventListener { + + /** + * Called when a span starts. + * + * @param span the span that has started + * @param t the thread associated with the span + */ + void onSpanStart(final Span span, Thread t); + + /** + * Called when a span completes. + * + * @param span the span that has completed + * @param t the thread associated with the span + */ + void onSpanComplete(final Span span, Thread t); + + /** + * Checks whether the listener is enabled for the given span. + * TODO - replace with operation based flag + * @param span the span to check + * @return true if the listener is enabled for the span, false otherwise + */ + boolean isEnabled(final Span span); +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventListener.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventListener.java new file mode 100644 index 0000000000000..4fe5b572bc117 --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventListener.java @@ -0,0 +1,14 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.listeners; + +/** + * Combines both RunnableEventListener and SpanEventListener. Usually both are used in conjunction. + */ +public interface TraceEventListener extends RunnableEventListener, SpanEventListener {} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventsRunnable.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventsRunnable.java new file mode 100644 index 0000000000000..22c7fd591ad46 --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventsRunnable.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.listeners; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.telemetry.tracing.Span; + +/** + * Runnable implementation that wraps another Runnable and adds trace event listener functionality. + */ +public class TraceEventsRunnable implements Runnable { + private static final Logger logger = LogManager.getLogger(TraceEventsRunnable.class); + + private final Runnable delegate; + private final TraceEventsService traceEventsService; + + /** + * Constructs a TraceEventsRunnable with the provided delegate Runnable. + * Tracer is used to get current span information and + * {@link RunnableEventListener} events are invoked for all traceEventListeners. + * @param delegate the underlying Runnable to be executed + * @param traceEventsService traceEventListenerService + */ + TraceEventsRunnable(Runnable delegate, TraceEventsService traceEventsService) { + this.delegate = delegate; + this.traceEventsService = traceEventsService; + } + + /** + * Wraps the delegate runnable run method with {@link TraceEventListener#onRunnableStart} and + * {@link TraceEventListener#onRunnableComplete} + */ + @Override + public void run() { + try { + invokeOnRunnableStart(traceEventsService); + } catch (Exception e) { + logger.debug("Error in onRunnableStart", e); + } finally { + delegate.run(); + } + try { + invokeOnRunnableComplete(traceEventsService); + } catch (Exception e) { + logger.debug("Error in onRunnableEnd", e); + } + } + + /** + * Unwraps and returns the underlying Runnable instance. + * + * @return the underlying Runnable instance + */ + public Runnable unwrap() { + return delegate; + } + + /** + * Invokes all registered trace event listeners registered with traceEventsService onRunnable start event. + * Exposing this function as invoking runnable events can be useful for other consumers. + * @param traceEventsService trace events service + */ + public static void invokeOnRunnableStart(TraceEventsService traceEventsService) { + if (traceEventsService.isTracingEnabled()) { + Span span = traceEventsService.getTracer().getCurrentSpan(); + // repeat it for all the spans in the hierarchy + while (span != null) { + if (!span.hasEnded()) { + Span finalSpan = span; + traceEventsService.executeListeners( + span, + traceEventListener -> traceEventListener.onRunnableStart(finalSpan, Thread.currentThread()) + ); + } + span = span.getParentSpan(); + } + } + } + + /** + * Invokes all registered trace event listeners registered with traceEventsService OnRunnable complete event. + * Exposing this function as invoking runnable events can be useful for other consumers. + * @param traceEventsService trace events service + */ + public static void invokeOnRunnableComplete(TraceEventsService traceEventsService) { + if (traceEventsService.isTracingEnabled()) { + Span span = traceEventsService.getTracer().getCurrentSpan(); + while (span != null) { + if (!span.hasEnded()) { + Span finalSpan = span; + traceEventsService.executeListeners( + span, + traceEventListener -> traceEventListener.onRunnableComplete(finalSpan, Thread.currentThread()) + ); + } + span = span.getParentSpan(); + } + } + } +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventsService.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventsService.java new file mode 100644 index 0000000000000..faf5f61ffcedf --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventsService.java @@ -0,0 +1,221 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.listeners; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.telemetry.diagnostics.DiagnosticSpan; +import org.opensearch.telemetry.diagnostics.DiagnosticsEventListener; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.Tracer; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.unmodifiableMap; + +/** + * The TraceEventService manages trace event listeners and provides their registration and de-registration functionality. + * + * It also provides wrapper utility to wrap {@link Tracer} using {@link #wrapAndSetTracer(Tracer)}, Runnable using {@link #wrapRunnable(Runnable)} + * and {@link Span} using {@link #wrapWithDiagnosticSpan(Span)}. + * + * This is the core service for trace event listeners and must be instantiated at application start. + * Once the telemetry and tracing is instantiated, this service should be used to wrap Tracer, Span and Runnables. Wrap will not have any effect until + * trace event listeners are registered using {@link #registerTraceEventListener} and tracing is set as enabled using {@link #setTracingEnabled(boolean)}. + * + * The application must ensure that this service is updated with the latest set of TraceEventListener and latest value of {@link #tracingEnabled} and {@link #diagnosisEnabled} + * are set, or it may produce undesirable results. + */ +public class TraceEventsService { + + private volatile Map traceEventListeners; + private volatile Tracer tracer; + + private volatile boolean tracingEnabled; + + private volatile boolean diagnosisEnabled; + + private static final Logger logger = LogManager.getLogger(TraceEventsService.class); + + /** + * Constructs a new TraceEventService with the specified tracer. + * + */ + public TraceEventsService() { + traceEventListeners = emptyMap(); + this.tracingEnabled = false; + this.diagnosisEnabled = false; + } + + /** + * Registers a trace event listener with the specified name. + * + * @param name the name of the trace event listener + * @param traceEventListener the trace event listener to be registered + */ + public synchronized void registerTraceEventListener(String name, TraceEventListener traceEventListener) { + HashMap newTraceEventListeners = new HashMap<>(traceEventListeners); + newTraceEventListeners.put(name, traceEventListener); + traceEventListeners = unmodifiableMap(newTraceEventListeners); + } + + /** + * de-registers the trace event listener with the specified name. + * + * @param name the name of the trace event listener to be deregistered + */ + public synchronized void deregisterTraceEventListener(String name) { + HashMap newTraceEventListeners = new HashMap<>(traceEventListeners); + newTraceEventListeners.remove(name); + traceEventListeners = unmodifiableMap(newTraceEventListeners); + } + + /** + * Returns a map of all the registered trace event listeners. + * + * @return a map of trace event listeners, where the keys are the listener names and the values are the listener objects + */ + public Map getTraceEventListeners() { + return traceEventListeners; + } + + /** + * Returns the tracer associated with the TraceEventService. + * + * @return the tracer object + */ + public Tracer getTracer() { + return tracer; + } + + /** + * Set the diagnosis enabled post which any {@link org.opensearch.telemetry.diagnostics.DiagnosticsEventListener} comes into effect. + * @param diagnosisEnabled true to enable + */ + public synchronized void setDiagnosisEnabled(boolean diagnosisEnabled) { + this.diagnosisEnabled = diagnosisEnabled; + } + + /** + * true is diagnosis is enabled in the service + */ + public boolean isDiagnosisEnabled() { + return diagnosisEnabled; + } + + /** + * Set the tracing enable + * @param tracingEnabled true to enable + */ + public synchronized void setTracingEnabled(boolean tracingEnabled) { + this.tracingEnabled = tracingEnabled; + } + + /** + * Checks if tracing is enabled. + * @return true if enabled. + */ + public boolean isTracingEnabled() { + return tracingEnabled; + } + + /** + * Wraps the given Runnable with trace event listeners registered with {@link TraceEventsService} + * Note: Runnable should be wrapped using this method only after thread context has been restored so that when + * {@link TraceEventsRunnable#run()} is called, it has the right thread context with current Span information. + * @param runnable the Runnable to wrap + * @return the wrapped TraceEventsRunnable + */ + public Runnable wrapRunnable(Runnable runnable) { + if (runnable instanceof TraceEventsRunnable) { + return runnable; + } + if (tracingEnabled) { + return new TraceEventsRunnable(runnable, this); + } else { + return runnable; + } + } + + /** + * Unwraps the given TraceEventsRunnable to retrieve the original Runnable. + * Note: Runnable should be unwrapped using this method before context is stashed so that when + * {@link TraceEventsRunnable} delegate is complete and {@link TraceEventListener#onRunnableComplete(Span, Thread)} is called, + * it has the right context with current Span information. + * @param runnableWrapper the TraceEventsRunnable to unwrap + * @return the original Runnable + */ + public Runnable unwrapRunnable(Runnable runnableWrapper) { + if (runnableWrapper instanceof TraceEventsRunnable) { + return ((TraceEventsRunnable) runnableWrapper).unwrap(); + } else { + return runnableWrapper; + } + } + + /** + * Wraps the given Tracer with trace event listeners. + * + * @param tracer the Tracer to wrap + * @return the wrapped TracerWrapper + */ + public synchronized TracerWrapper wrapAndSetTracer(Tracer tracer) { + TracerWrapper tracerWrapper = new TracerWrapper(tracer, this); + this.tracer = tracerWrapper; + return tracerWrapper; + } + + /** + * Unwraps the given TracerWrapper to retrieve the original Tracer. + * + * @param tracerWrapper the TracerWrapper to unwrap + * @return the original Tracer + */ + public Tracer unwrapTracer(TracerWrapper tracerWrapper) { + return tracerWrapper.unwrap(); + } + + /** + * Wraps the given Span with diagnostic capabilities. + * Ideally, {@link Tracer#startSpan} should be wrapped using this wrapper. + * @param span the Span to wrap + * @return the wrapped DiagnosticSpan + */ + public static DiagnosticSpan wrapWithDiagnosticSpan(Span span) { + return new DiagnosticSpan(span); + } + + /** + * Invokes the provided event for all TraceEventListener registered with the service + * @param span associated span + * @param listenerMethod the listener method to be invoked + */ + public void executeListeners(Span span, Consumer listenerMethod) { + if (span == null || traceEventListeners == null) { + return; + } + for (TraceEventListener traceEventListener : traceEventListeners.values()) { + try { + if (!traceEventListener.isEnabled(span)) { + continue; + } + if (traceEventListener instanceof DiagnosticsEventListener && !diagnosisEnabled) { + continue; + } + listenerMethod.accept(traceEventListener); + } catch (Exception e) { + // failing trace event listener shouldn't impact the application + logger.debug("Error for TraceEventListener: {} {}", traceEventListener.getClass().getName(), e); + } + } + } +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TracerWrapper.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TracerWrapper.java new file mode 100644 index 0000000000000..64ff17ba32eff --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TracerWrapper.java @@ -0,0 +1,168 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.listeners; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +/** + * This class invokes all events associated with {@link SpanEventListener}. + * The TracerWrapper acts as a wrapper around an underlying Tracer implementation and + * provides additional functionality to manage TraceEventListeners and trace-related settings. + * + * @opensearch.internal + */ +public class TracerWrapper implements Tracer { + + private static final Logger logger = LogManager.getLogger(TracerWrapper.class); + + private final Tracer tracer; + + private final TraceEventsService traceEventsService; + + /** + * Constructs a TracerWrapper with the provided TraceEventService + * + * @param delegate the underlying Tracer implementation + * @param traceEventsService traceEventListenerService + */ + TracerWrapper(Tracer delegate, TraceEventsService traceEventsService) { + assert delegate != null; + this.tracer = delegate; + this.traceEventsService = traceEventsService; + } + + /** + * Starts a new span with the specified name and no attributes. + * + * @param spanName the name of the new span + * @return the created SpanScope for the new span + */ + @Override + public SpanScope startSpan(String spanName) { + return this.startSpan(spanName, Collections.emptyMap()); + } + + /** + * Starts a new span with the specified name and attributes. + * + * @param spanName the name of the new span + * @param attributes the attributes to be associated with the new span + * @return the created SpanScope for the new span + */ + @Override + public SpanScope startSpan(String spanName, Map attributes) { + SpanScope scope = tracer.startSpan(spanName, attributes); + if (!traceEventsService.isTracingEnabled()) { + return scope; + } + Span span = tracer.getCurrentSpan(); + try { + traceEventsService.executeListeners(span, traceEventListener -> traceEventListener.onSpanStart(span, Thread.currentThread())); + return new SpanScopeWrapper(span, scope, traceEventsService); + } catch (Exception e) { + // failing silently + logger.debug("Exception while invoking TraceEventListener for span {} {}", span, e); + } + return scope; + } + + /** + * Retrieves the current active span. + * + * @return the current active span + */ + @Override + public Span getCurrentSpan() { + return tracer.getCurrentSpan(); + } + + /** + * Closes the TracerWrapper and releases any resources associated with it. + * + * @throws IOException if an I/O error occurs while closing the TracerWrapper + */ + @Override + public void close() throws IOException { + tracer.close(); + } + + /** + * Unwraps and returns the underlying Tracer instance. + * + * @return the underlying Tracer instance + */ + public Tracer unwrap() { + return tracer; + } + + private static class SpanScopeWrapper implements SpanScope { + private final SpanScope scope; + private final Span span; + private final TraceEventsService traceEventsService; + + SpanScopeWrapper(Span span, SpanScope delegate, TraceEventsService traceEventsService) { + this.span = span; + this.scope = delegate; + this.traceEventsService = traceEventsService; + } + + @Override + public void addSpanAttribute(String key, String value) { + scope.addSpanAttribute(key, value); + } + + @Override + public void addSpanAttribute(String key, long value) { + scope.addSpanAttribute(key, value); + } + + @Override + public void addSpanAttribute(String key, double value) { + scope.addSpanAttribute(key, value); + } + + @Override + public void addSpanAttribute(String key, boolean value) { + scope.addSpanAttribute(key, value); + } + + @Override + public void addSpanEvent(String event) { + scope.addSpanEvent(event); + } + + @Override + public void setError(Exception exception) { + scope.setError(exception); + } + + @Override + public void close() { + scope.close(); + try { + if (traceEventsService.isTracingEnabled()) { + traceEventsService.executeListeners( + span, + traceEventListener -> traceEventListener.onSpanComplete(span, Thread.currentThread()) + ); + } + } catch (Exception e) { + logger.debug("Exception on Scope close while invoking TraceEventListener for span:{} {}", span, e); + } + } + } +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/package-info.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/package-info.java new file mode 100644 index 0000000000000..f1c67e74b6284 --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Contains all TraceEvents generic listeners interfaces and TraceEventService + */ +package org.opensearch.telemetry.tracing.listeners; diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpanScope.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpanScope.java index a1d16d1d80d00..7ceeb303a61ff 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpanScope.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpanScope.java @@ -11,7 +11,10 @@ import org.opensearch.telemetry.tracing.SpanScope; /** - * No-op implementation of SpanScope + * No-op implementation of {@link SpanScope}. + * + *

This class is used as a placeholder when actual span-scoping is not required. + * It provides empty implementations for all methods in the {@link SpanScope} interface. * * @opensearch.internal */ @@ -22,36 +25,73 @@ public final class NoopSpanScope implements SpanScope { */ public NoopSpanScope() {} + /** + * No-op implementation of {@link SpanScope#addSpanAttribute(String, String)}. + * + * @param key attribute key + * @param value attribute value + */ @Override public void addSpanAttribute(String key, String value) { } + /** + * No-op implementation of {@link SpanScope#addSpanAttribute(String, long)}. + * + * @param key attribute key + * @param value attribute value + */ @Override public void addSpanAttribute(String key, long value) { } + /** + * No-op implementation of {@link SpanScope#addSpanAttribute(String, double)}. + * + * @param key attribute key + * @param value attribute value + */ @Override public void addSpanAttribute(String key, double value) { } + /** + * No-op implementation of {@link SpanScope#addSpanAttribute(String, boolean)}. + * + * @param key attribute key + * @param value attribute value + */ @Override public void addSpanAttribute(String key, boolean value) { } + /** + * No-op implementation of {@link SpanScope#addSpanEvent(String)}. + * + * @param event event name + */ @Override public void addSpanEvent(String event) { } + /** + * No-op implementation of {@link SpanScope#setError(Exception)}. + * + * @param exception exception to be recorded + */ @Override public void setError(Exception exception) { } + /** + * No-op implementation of {@link SpanScope#close()}. + */ @Override public void close() { diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java index a1768d7d59116..4d59ad2a3d186 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java @@ -8,28 +8,74 @@ package org.opensearch.telemetry.tracing.noop; +import org.opensearch.telemetry.tracing.Span; import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; +import java.util.Map; + /** - * No-op implementation of Tracer + * No-op implementation of Tracer. + * This class provides a Tracer implementation that does nothing and is used as a placeholder + * when actual tracing functionality is not required. * * @opensearch.internal */ public class NoopTracer implements Tracer { /** - * No-op Tracer instance + * The singleton instance of the NoopTracer. */ public static final Tracer INSTANCE = new NoopTracer(); + /** + * Private constructor for the NoopTracer to prevent external instantiation. + * Use the provided INSTANCE constant to access the singleton instance. + */ private NoopTracer() {} + /** + * Starts a new no-op span with the given spanName. + * The method always returns a {@link SpanScope#NO_OP} instance, indicating that no actual + * span-scoping is required. + * + * @param spanName the name of the no-op span. + * @return a {@link SpanScope#NO_OP} instance. + */ @Override public SpanScope startSpan(String spanName) { return SpanScope.NO_OP; } + /** + * Starts a new no-op span with the given spanName and attributes. + * The method always returns a {@link SpanScope#NO_OP} instance, indicating that no actual + * span-scoping is required. + * + * @param spanName the name of the no-op span. + * @param attributes a map of attributes to be associated with the no-op span. + * @return a {@link SpanScope#NO_OP} instance. + */ + @Override + public SpanScope startSpan(String spanName, Map attributes) { + return SpanScope.NO_OP; + } + + /** + * Returns the current no-op span. + * Since the NoopTracer does not perform any actual tracing, this method always returns null. + * + * @return null, as there is no current span in the NoopTracer. + */ + @Override + public Span getCurrentSpan() { + return null; + } + + /** + * Closes the NoopTracer. + * Since the NoopTracer does not have any resources that need to be released, this method does nothing. + */ @Override public void close() { diff --git a/libs/telemetry/src/test/java/org/opensearch/telemetry/diagnostics/DiagnosticsEventListenerTests.java b/libs/telemetry/src/test/java/org/opensearch/telemetry/diagnostics/DiagnosticsEventListenerTests.java new file mode 100644 index 0000000000000..26cf64ee348e3 --- /dev/null +++ b/libs/telemetry/src/test/java/org/opensearch/telemetry/diagnostics/DiagnosticsEventListenerTests.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.diagnostics; + +import org.mockito.ArgumentCaptor; +import org.opensearch.telemetry.metrics.Measurement; +import org.opensearch.telemetry.metrics.MetricEmitter; +import org.opensearch.telemetry.metrics.MetricPoint; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; + +import org.mockito.Mockito; + +import static org.mockito.ArgumentMatchers.any; +import static org.opensearch.telemetry.diagnostics.DiagnosticsEventListener.ELAPSED_TIME; + +public class DiagnosticsEventListenerTests extends OpenSearchTestCase { + + private ThreadResourceRecorder threadResourceRecorder; + private MetricEmitter metricEmitter; + private Span span; + private DiagnosticSpan diagnosticSpan; + private DiagnosticsEventListener diagnosticsEventListener; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadResourceRecorder = Mockito.mock(ThreadResourceRecorder.class); + metricEmitter = Mockito.mock(MetricEmitter.class); + span = Mockito.mock(Span.class); + diagnosticSpan = Mockito.mock(DiagnosticSpan.class); + diagnosticsEventListener = new DiagnosticsEventListener(threadResourceRecorder, metricEmitter); + } + + public void testOnSpanStart() { + Thread t = Thread.currentThread(); + diagnosticsEventListener.onSpanStart(diagnosticSpan, t); + // Verify expected interactions + Mockito.verify(threadResourceRecorder).startRecording(Mockito.eq(diagnosticSpan), Mockito.eq(t), Mockito.eq(true)); + Mockito.verify(diagnosticSpan).putMetric(Mockito.eq(DiagnosticsEventListener.START_SPAN_TIME), any(MetricPoint.class)); + } + + public void testOnSpanComplete() { + Thread t = Thread.currentThread(); + MetricPoint diffMetric = new MetricPoint(Collections.emptyMap(), null, System.currentTimeMillis()); + MetricPoint startMetric = new MetricPoint(Collections.emptyMap(), null, System.currentTimeMillis()); + Mockito.when(threadResourceRecorder.endRecording(any(DiagnosticSpan.class), Mockito.eq(t), Mockito.eq(true))) + .thenReturn(diffMetric); + Mockito.when(diagnosticSpan.removeMetric(Mockito.anyString())).thenReturn(startMetric); + ArgumentCaptor metricCaptor = ArgumentCaptor.forClass(MetricPoint.class); + diagnosticsEventListener.onSpanComplete(diagnosticSpan, t); + Mockito.verify(metricEmitter).emitMetric(metricCaptor.capture()); + + // Check if diffMetric contains "elapsed_time" measurement + MetricPoint emittedMetric = metricCaptor.getValue(); + Measurement elapsedTimeMeasurement = emittedMetric.getMeasurement(ELAPSED_TIME); + assertNotNull(elapsedTimeMeasurement); + } + + public void testOnRunnableStart() { + Thread t = Thread.currentThread(); + diagnosticsEventListener.onRunnableStart(diagnosticSpan, t); + Mockito.verify(threadResourceRecorder).startRecording(Mockito.eq(diagnosticSpan), Mockito.eq(t), Mockito.eq(false)); + } + + public void testOnRunnableComplete() { + Thread t = Thread.currentThread(); + MetricPoint diffMetric = new MetricPoint(Collections.emptyMap(), null, System.currentTimeMillis()); + Mockito.when(threadResourceRecorder.endRecording(any(DiagnosticSpan.class), Mockito.eq(t), Mockito.eq(false))) + .thenReturn(diffMetric); + + diagnosticsEventListener.onRunnableComplete(diagnosticSpan, t); + + Mockito.verify(metricEmitter).emitMetric(Mockito.eq(diffMetric)); + } + + public void testIsEnabled() { + boolean isEnabled = diagnosticsEventListener.isEnabled(diagnosticSpan); + assertTrue(isEnabled); + + isEnabled = diagnosticsEventListener.isEnabled(span); + assertFalse(isEnabled); + } +} diff --git a/libs/telemetry/src/test/java/org/opensearch/telemetry/diagnostics/ThreadResourceRecorderTests.java b/libs/telemetry/src/test/java/org/opensearch/telemetry/diagnostics/ThreadResourceRecorderTests.java new file mode 100644 index 0000000000000..5f72e63d3ec09 --- /dev/null +++ b/libs/telemetry/src/test/java/org/opensearch/telemetry/diagnostics/ThreadResourceRecorderTests.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.diagnostics; + +import org.mockito.Mockito; + +import java.util.Collections; + +import org.opensearch.telemetry.metrics.MetricPoint; +import org.opensearch.test.OpenSearchTestCase; + +public class ThreadResourceRecorderTests extends OpenSearchTestCase { + + private ThreadResourceObserver observer; + private DiagnosticSpan span; + private Thread thread; + + private static class TestThreadResourceRecorder extends ThreadResourceRecorder { + public TestThreadResourceRecorder(ThreadResourceObserver observer) { + super(observer); + } + + @Override + protected MetricPoint computeDiff(MetricPoint startMetric, MetricPoint endMetric) { + // We simply return a new MetricPoint object with the same values as endMetric + return new MetricPoint(endMetric.getMeasurements(), null, endMetric.getObservationTime()); + } + } + + @Override + public void setUp() throws Exception { + super.setUp(); + observer = Mockito.mock(ThreadResourceObserver.class); + span = Mockito.mock(DiagnosticSpan.class); + thread = Mockito.mock(Thread.class); + } + + public void testStartRecording() { + MetricPoint observation = new MetricPoint(Collections.emptyMap(), null, System.currentTimeMillis()); + Mockito.when(observer.observe(thread)).thenReturn(observation); + + ThreadResourceRecorder recorder = new TestThreadResourceRecorder(observer); + + recorder.startRecording(span, thread, true); + + Mockito.verify(span).putMetric(Mockito.eq(String.valueOf(thread.getId())), Mockito.eq(observation)); + } + + public void testEndRecording() { + MetricPoint startMetric = new MetricPoint(Collections.emptyMap(), null, System.currentTimeMillis()); + MetricPoint endMetric = new MetricPoint(Collections.emptyMap(), null, System.currentTimeMillis() + 1000); + Mockito.when(observer.observe(thread)).thenReturn(endMetric); + + ThreadResourceRecorder recorder = new TestThreadResourceRecorder(observer); + + Mockito.when(span.removeMetric(String.valueOf(thread.getId()))).thenReturn(startMetric); + + MetricPoint diffMetric = recorder.endRecording(span, thread, true); + + assertEquals(endMetric.getObservationTime(), diffMetric.getObservationTime()); + } +} diff --git a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/listeners/TraceEventsRunnableTests.java b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/listeners/TraceEventsRunnableTests.java new file mode 100644 index 0000000000000..dceec72250390 --- /dev/null +++ b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/listeners/TraceEventsRunnableTests.java @@ -0,0 +1,115 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.listeners; + +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.test.OpenSearchTestCase; + +import org.mockito.Mockito; + +public class TraceEventsRunnableTests extends OpenSearchTestCase { + + private Tracer tracer; + private TraceEventsService traceEventsService; + private TraceEventListener traceEventListener1; + private TraceEventListener traceEventListener2; + private Span span; + private Thread currentThread; + private Runnable delegate; + + @Override + public void setUp() throws Exception { + super.setUp(); + + tracer = Mockito.mock(Tracer.class); + traceEventsService = Mockito.spy(new TraceEventsService()); // Use spy here + traceEventListener1 = Mockito.mock(TraceEventListener.class); + traceEventListener2 = Mockito.mock(TraceEventListener.class); + span = Mockito.mock(Span.class); + currentThread = Mockito.mock(Thread.class); + delegate = Mockito.mock(Runnable.class); + + Mockito.when(traceEventsService.getTracer()).thenReturn(tracer); + Mockito.when(tracer.getCurrentSpan()).thenReturn(span); + Mockito.when(span.getParentSpan()).thenReturn(null); + + traceEventsService.registerTraceEventListener("listener1", traceEventListener1); + traceEventsService.registerTraceEventListener("listener2", traceEventListener2); + Mockito.when(traceEventListener1.isEnabled(Mockito.any(Span.class))).thenReturn(true); + Mockito.when(traceEventListener2.isEnabled(Mockito.any(Span.class))).thenReturn(true); + + traceEventsService.setTracingEnabled(true); + } + + public void testRun_InvokeOnRunnableStartAndOnRunnableComplete() { + Span span1 = Mockito.mock(Span.class); + Span span2 = Mockito.mock(Span.class); + Mockito.when(traceEventsService.getTracer().getCurrentSpan()).thenReturn(span1, span1); + Mockito.when(span1.hasEnded()).thenReturn(false); + Mockito.when(span2.hasEnded()).thenReturn(false); + Mockito.when(span1.getParentSpan()).thenReturn(span2); + Mockito.when(span2.getParentSpan()).thenReturn(null); + + TraceEventsRunnable traceEventsRunnable = new TraceEventsRunnable(delegate, traceEventsService); + + traceEventsRunnable.run(); + + Mockito.verify(traceEventListener1, Mockito.times(2)).onRunnableStart(Mockito.any(Span.class), Mockito.any(Thread.class)); + Mockito.verify(traceEventListener2, Mockito.times(2)).onRunnableStart(Mockito.any(Span.class), Mockito.any(Thread.class)); + Mockito.verify(traceEventListener1, Mockito.times(2)).onRunnableComplete(Mockito.any(Span.class), Mockito.any(Thread.class)); + Mockito.verify(traceEventListener2, Mockito.times(2)).onRunnableComplete(Mockito.any(Span.class), Mockito.any(Thread.class)); + + // Ensure that delegate.run() was invoked + Mockito.verify(delegate).run(); + } + + public void testRun_TracingNotEnabled_NoInteractionsWithListeners() { + Mockito.when(traceEventsService.isTracingEnabled()).thenReturn(false); + + TraceEventsRunnable traceEventsRunnable = new TraceEventsRunnable(delegate, traceEventsService); + + traceEventsRunnable.run(); + + // Verify that no interactions with listeners occurred + Mockito.verifyNoInteractions(traceEventListener1); + Mockito.verifyNoInteractions(traceEventListener2); + } + + public void testRun_ExceptionInOnRunnableStart_NoImpactOnExecution() { + Mockito.doThrow(new RuntimeException("Listener 1 exception")) + .when(traceEventListener1) + .onRunnableStart(Mockito.eq(span), Mockito.eq(currentThread)); + TraceEventsRunnable traceEventsRunnable = new TraceEventsRunnable(delegate, traceEventsService); + traceEventsRunnable.run(); + + // Ensure that delegate.run() was invoked + Mockito.verify(delegate).run(); + } + + public void testRun_ExceptionInOnRunnableComplete_NoImpactOnExecution() { + // trace event listener to throw an exception in onRunnableComplete + Mockito.doThrow(new RuntimeException("Listener 1 exception")) + .when(traceEventListener1) + .onRunnableComplete(Mockito.eq(span), Mockito.eq(currentThread)); + TraceEventsRunnable traceEventsRunnable = new TraceEventsRunnable(delegate, traceEventsService); + traceEventsRunnable.run(); + + // Verify that onRunnableStart was called for the listener despite the exception + Mockito.verify(traceEventListener1).onRunnableStart(Mockito.any(Span.class), Mockito.any(Thread.class)); + Mockito.verify(delegate).run(); + } + + public void testUnwrap() { + TraceEventsRunnable traceEventsRunnable = new TraceEventsRunnable(delegate, traceEventsService); + + Runnable unwrappedRunnable = traceEventsRunnable.unwrap(); + assertSame(delegate, unwrappedRunnable); + } +} diff --git a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/listeners/TraceEventsServiceTests.java b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/listeners/TraceEventsServiceTests.java new file mode 100644 index 0000000000000..0d0773f5aef53 --- /dev/null +++ b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/listeners/TraceEventsServiceTests.java @@ -0,0 +1,135 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.listeners; + +import org.opensearch.telemetry.diagnostics.DiagnosticSpan; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Map; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.doThrow; + +public class TraceEventsServiceTests extends OpenSearchTestCase { + + private Tracer tracer; + private TraceEventsService traceEventsService; + private TraceEventListener traceEventListener1; + private TraceEventListener traceEventListener2; + private Span span; + private Thread thread; + private Runnable runnable; + private DiagnosticSpan diagnosticSpan; + + @Override + public void setUp() throws Exception { + super.setUp(); + + tracer = mock(Tracer.class); + traceEventsService = new TraceEventsService(); + traceEventListener1 = mock(TraceEventListener.class); + traceEventListener2 = mock(TraceEventListener.class); + span = mock(Span.class); + thread = mock(Thread.class); + runnable = mock(Runnable.class); + diagnosticSpan = mock(DiagnosticSpan.class); + + traceEventsService.registerTraceEventListener("listener1", traceEventListener1); + traceEventsService.registerTraceEventListener("listener2", traceEventListener2); + } + + public void testRegisterAndDeregisterTraceEventListener() { + Map traceEventListeners = traceEventsService.getTraceEventListeners(); + assertEquals(2, traceEventListeners.size()); + + // Deregister traceEventListener1 + traceEventsService.deregisterTraceEventListener("listener1"); + traceEventListeners = traceEventsService.getTraceEventListeners(); + assertEquals(1, traceEventListeners.size()); + assertNull(traceEventListeners.get("listener1")); + assertNotNull(traceEventListeners.get("listener2")); + } + + public void testWrapRunnable() { + // Tracing is not enabled, the original runnable should be returned + traceEventsService.setTracingEnabled(false); + Runnable wrappedRunnable = traceEventsService.wrapRunnable(runnable); + assertSame(runnable, wrappedRunnable); + + // Tracing is enabled, the wrapped TraceEventsRunnable should be returned + traceEventsService.setTracingEnabled(true); + wrappedRunnable = traceEventsService.wrapRunnable(runnable); + assertTrue(wrappedRunnable instanceof TraceEventsRunnable); + } + + public void testUnwrapRunnable() { + // Runnable is not wrapped, should return the same runnable + traceEventsService.setTracingEnabled(false); + Runnable unwrappedRunnable = traceEventsService.unwrapRunnable(runnable); + assertSame(runnable, unwrappedRunnable); + + // Runnable is wrapped, should return the original runnable from TraceEventsRunnable + traceEventsService.setTracingEnabled(true); + Runnable wrappedRunnable = traceEventsService.wrapRunnable(runnable); + unwrappedRunnable = traceEventsService.unwrapRunnable(wrappedRunnable); + assertSame(runnable, unwrappedRunnable); + } + + public void testWrapAndSetTracer() { + traceEventsService.setTracingEnabled(true); + traceEventsService.wrapAndSetTracer(tracer); + assertTrue(traceEventsService.getTracer() instanceof TracerWrapper); + } + + public void testUnwrapTracer() { + traceEventsService.setTracingEnabled(true); + TracerWrapper wrappedTracer = traceEventsService.wrapAndSetTracer(tracer); + Tracer unwrappedTracer = traceEventsService.unwrapTracer(wrappedTracer); + assertSame(tracer, unwrappedTracer); + } + + public void testExecuteListeners() { + when(traceEventListener1.isEnabled(any(Span.class))).thenReturn(true); + when(traceEventListener2.isEnabled(any(Span.class))).thenReturn(false); + + traceEventsService.setTracingEnabled(true); + traceEventsService.setDiagnosisEnabled(true); + + traceEventsService.executeListeners(span, traceEventListener -> traceEventListener.onRunnableStart(span, thread)); + traceEventsService.executeListeners(span, traceEventListener -> traceEventListener.onRunnableComplete(span, thread)); + + // Verify listener1 is invoked once for onRunnableStart and onRunnableComplete + verify(traceEventListener1).onRunnableStart(eq(span), eq(thread)); + verify(traceEventListener1).onRunnableComplete(eq(span), eq(thread)); + + // Verify listener2 is not invoked + verify(traceEventListener2, never()).onRunnableStart(any(Span.class), any(Thread.class)); + verify(traceEventListener2, never()).onRunnableComplete(any(Span.class), any(Thread.class)); + } + + public void testExecuteListeners_ExceptionInListener() { + doThrow(new RuntimeException("Listener 1 exception")).when(traceEventListener1).isEnabled(any()); + + traceEventsService.setTracingEnabled(true); + traceEventsService.setDiagnosisEnabled(true); + + traceEventsService.executeListeners(span, traceEventListener -> traceEventListener.isEnabled(span)); + + // Verify that both listeners were called despite the exception in listener1 + verify(traceEventListener1).isEnabled(span); + verify(traceEventListener2).isEnabled(span); + } +} diff --git a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/listeners/TracerWrapperTests.java b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/listeners/TracerWrapperTests.java new file mode 100644 index 0000000000000..bc2838c8d71da --- /dev/null +++ b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/listeners/TracerWrapperTests.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.listeners; + +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.verifyNoInteractions; + +public class TracerWrapperTests extends OpenSearchTestCase { + + private Tracer tracer; + private TraceEventsService traceEventsService; + private TraceEventListener traceEventListener1; + private TraceEventListener traceEventListener2; + private Span span; + private SpanScope spanScope; + + @Override + public void setUp() throws Exception { + super.setUp(); + + tracer = mock(Tracer.class); + traceEventsService = spy(new TraceEventsService()); + traceEventListener1 = mock(TraceEventListener.class); + traceEventListener2 = mock(TraceEventListener.class); + span = mock(Span.class); + spanScope = mock(SpanScope.class); + + when(tracer.startSpan(anyString(), anyMap())).thenReturn(spanScope); + when(tracer.getCurrentSpan()).thenReturn(span); + when(traceEventsService.isTracingEnabled()).thenReturn(true); + when(traceEventsService.getTracer()).thenReturn(tracer); + + traceEventsService.registerTraceEventListener("listener1", traceEventListener1); + traceEventsService.registerTraceEventListener("listener2", traceEventListener2); + } + + public void testStartSpan_WithTracingEnabled_InvokeOnSpanStartAndOnSpanComplete() { + TracerWrapper tracerWrapper = new TracerWrapper(tracer, traceEventsService); + when(traceEventListener1.isEnabled(any(Span.class))).thenReturn(true); + when(traceEventListener2.isEnabled(any(Span.class))).thenReturn(true); + + SpanScope scope = tracerWrapper.startSpan("test_span", Collections.emptyMap()); + + verify(traceEventListener1).onSpanStart(eq(span), any(Thread.class)); + verify(traceEventListener2).onSpanStart(eq(span), any(Thread.class)); + + scope.close(); + + verify(traceEventListener1).onSpanComplete(eq(span), any(Thread.class)); + verify(traceEventListener2).onSpanComplete(eq(span), any(Thread.class)); + } + + public void testStartSpan_WithTracingDisabled_NoInteractionsWithListeners() { + when(traceEventsService.isTracingEnabled()).thenReturn(false); + + TracerWrapper tracerWrapper = new TracerWrapper(tracer, traceEventsService); + SpanScope scope = tracerWrapper.startSpan("test_span", Collections.emptyMap()); + + scope.close(); + + verifyNoInteractions(traceEventListener1); + verifyNoInteractions(traceEventListener2); + } + + public void testUnwrap() { + TracerWrapper tracerWrapper = new TracerWrapper(tracer, traceEventsService); + Tracer unwrappedTracer = tracerWrapper.unwrap(); + assertSame(tracer, unwrappedTracer); + } +} diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetryPlugin.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetryPlugin.java index a1ca3adf4d2a2..37f0debe3c685 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetryPlugin.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetryPlugin.java @@ -16,9 +16,12 @@ import org.opensearch.telemetry.tracing.OTelResourceProvider; import org.opensearch.telemetry.tracing.OTelTelemetry; import org.opensearch.telemetry.tracing.OTelTracingTelemetry; +import org.opensearch.telemetry.tracing.listeners.TraceEventListener; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; /** @@ -58,6 +61,11 @@ public String getName() { return OTEL_TRACER_NAME; } + @Override + public Map getTraceEventListeners(Telemetry telemetry) { + return Collections.emptyMap(); + } + private Telemetry telemetry() { return new OTelTelemetry(new OTelTracingTelemetry(OTelResourceProvider.get(settings)), new MetricsTelemetry() { }); diff --git a/server/src/main/java/org/opensearch/plugins/TelemetryPlugin.java b/server/src/main/java/org/opensearch/plugins/TelemetryPlugin.java index 33dc9b7a0c843..047671f69098a 100644 --- a/server/src/main/java/org/opensearch/plugins/TelemetryPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/TelemetryPlugin.java @@ -10,7 +10,11 @@ import org.opensearch.telemetry.Telemetry; import org.opensearch.telemetry.TelemetrySettings; +import org.opensearch.telemetry.diagnostics.DiagnosticsEventListener; +import org.opensearch.telemetry.tracing.listeners.TraceEventListener; +import java.util.Collections; +import java.util.Map; import java.util.Optional; /** @@ -22,4 +26,6 @@ public interface TelemetryPlugin { String getName(); + Map getTraceEventListeners(Telemetry telemetry); + } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java b/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java index 0ba9a8ea5fd88..47dac885343bc 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java @@ -12,6 +12,7 @@ import org.opensearch.telemetry.tracing.noop.NoopTracer; import java.io.IOException; +import java.util.Map; /** * Wrapper implementation of Tracer. This delegates call to right tracer based on the tracer settings @@ -36,8 +37,18 @@ public WrappedTracer(TelemetrySettings telemetrySettings, Tracer defaultTracer) @Override public SpanScope startSpan(String spanName) { + return startSpan(spanName, null); + } + + @Override + public SpanScope startSpan(String spanName, Map attributes) { Tracer delegateTracer = getDelegateTracer(); - return delegateTracer.startSpan(spanName); + return delegateTracer.startSpan(spanName, attributes); + } + + @Override + public Span getCurrentSpan() { + return getDelegateTracer().getCurrentSpan(); } @Override diff --git a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java index 41cc5c1e77a34..17225f3a930fa 100644 --- a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java +++ b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java @@ -8,11 +8,14 @@ package org.opensearch.test.telemetry; +import java.util.Collections; +import java.util.Map; import java.util.Optional; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.TelemetryPlugin; import org.opensearch.telemetry.Telemetry; import org.opensearch.telemetry.TelemetrySettings; +import org.opensearch.telemetry.tracing.listeners.TraceEventListener; /** * Mock {@link TelemetryPlugin} implementation for testing. @@ -36,4 +39,9 @@ public Optional getTelemetry(TelemetrySettings settings) { public String getName() { return MOCK_TRACER_NAME; } + + @Override + public Map getTraceEventListeners(Telemetry telemetry) { + return Collections.emptyMap(); + } }