Skip to content

Commit

Permalink
Instrument Scheduled methods for observability
Browse files Browse the repository at this point in the history
This commit enhances the `ScheduledAnnotationBeanPostProcessor` to
instrument `@Scheduled` methods declared on beans. This will create
`"tasks.scheduled.execution"` observations for each execution of a
scheduled method. This supports both blocking and reactive variants.

By default, observations are no-ops; developers must configure the
current `ObservationRegistry` on the `ScheduledTaskRegistrar` by using a
`SchedulingConfigurer`.

Closes gh-29883
  • Loading branch information
bclozel committed Jun 19, 2023
1 parent 842569c commit 09cb844
Show file tree
Hide file tree
Showing 14 changed files with 899 additions and 23 deletions.
32 changes: 30 additions & 2 deletions framework-docs/modules/ROOT/pages/integration/observability.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ As outlined xref:integration/observability.adoc[at the beginning of this section
|===
|Observation name |Description

|xref:integration/observability.adoc#http-client[`"http.client.requests"`]
|xref:integration/observability.adoc#observability.http-client[`"http.client.requests"`]
|Time spent for HTTP client exchanges

|xref:integration/observability.adoc#http-server[`"http.server.requests"`]
|xref:integration/observability.adoc#observability.http-server[`"http.server.requests"`]
|Processing time for HTTP server exchanges at the Framework level

|xref:integration/observability.adoc#observability.tasks-scheduled[`"tasks.scheduled.execution"`]
|Processing time for an execution of a `@Scheduled` task
|===

NOTE: Observations are using Micrometer's official naming convention, but Metrics names will be automatically converted
Expand Down Expand Up @@ -79,6 +82,31 @@ include-code::./ServerRequestObservationFilter[]

You can configure `ObservationFilter` instances on the `ObservationRegistry`.

[[observability.tasks-scheduled]]
== @Scheduled tasks instrumentation

An Observation is created for xref:integration/scheduling.adoc#scheduling-enable-annotation-support[each execution of an `@Scheduled` task].
Applications need to configure the `ObservationRegistry` on the `ScheduledTaskRegistrar` to enable the recording of observations.
This can be done by declaring a `SchedulingConfigurer` bean that sets the observation registry:

include-code::./ObservationSchedulingConfigurer[]

It is using the `org.springframework.scheduling.config.DefaultScheduledTaskObservationConvention` by default, backed by the `ScheduledTaskObservationContext`.
You can configure a custom implementation on the `ObservationRegistry` directly.
During the execution of the scheduled method, the current observation is restored in the `ThreadLocal` context or the Reactor context (if the scheduled method returns a `Mono` or `Flux` type).

By default, the following `KeyValues` are created:

.Low cardinality Keys
[cols="a,a"]
|===
|Name | Description
|`exception` _(required)_|Name of the exception thrown during the execution, or `KeyValue#NONE_VALUE`} if no exception happened.
|`method.name` _(required)_|Name of Java `Method` that is scheduled for execution.
|`outcome` _(required)_|Outcome of the method execution. Can be `"SUCCESS"`, `"ERROR"` or `"UNKNOWN"` (if for example the operation was cancelled during execution.
|`target.type` _(required)_|Simple class name of the bean instance that holds the scheduled method.
|===


[[observability.http-server]]
== HTTP Server instrumentation
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.docs.integration.observability.tasksscheduled;


import io.micrometer.observation.ObservationRegistry;

import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;

public class ObservationSchedulingConfigurer implements SchedulingConfigurer {

private final ObservationRegistry observationRegistry;

public ObservationSchedulingConfigurer(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setObservationRegistry(this.observationRegistry);
}

}
3 changes: 3 additions & 0 deletions spring-context/spring-context.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies {
api(project(":spring-beans"))
api(project(":spring-core"))
api(project(":spring-expression"))
api("io.micrometer:micrometer-observation")
optional(project(":spring-instrument"))
optional("jakarta.annotation:jakarta.annotation-api")
optional("jakarta.ejb:jakarta.ejb-api")
Expand Down Expand Up @@ -41,6 +42,8 @@ dependencies {
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
testImplementation("io.reactivex.rxjava3:rxjava")
testImplementation('io.micrometer:context-propagation')
testImplementation("io.micrometer:micrometer-observation-test")
testRuntimeOnly("jakarta.xml.bind:jakarta.xml.bind-api")
testRuntimeOnly("org.glassfish:jakarta.el")
// Substitute for javax.management:jmxremote_optional:1.0.1_04 (not available on Maven Central)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,10 @@ protected void processScheduled(Scheduled scheduled, Method method, Object bean)
* accordingly. The Runnable can represent either a synchronous method invocation
* (see {@link #processScheduledSync(Scheduled, Method, Object)}) or an asynchronous
* one (see {@link #processScheduledAsync(Scheduled, Method, Object)}).
* @param scheduled the {@code @Scheduled} annotation
* @param runnable the runnable to be scheduled
* @param method the method that the annotation has been declared on
* @param bean the target bean instance
*/
protected void processScheduledTask(Scheduled scheduled, Runnable runnable, Method method, Object bean) {
try {
Expand Down Expand Up @@ -578,6 +582,7 @@ protected void processScheduledAsync(Scheduled scheduled, Method method, Object
Runnable task;
try {
task = ScheduledAnnotationReactiveSupport.createSubscriptionRunnable(method, bean, scheduled,
this.registrar::getObservationRegistry,
this.reactiveSubscriptions.computeIfAbsent(bean, k -> new CopyOnWriteArrayList<>()));
}
catch (IllegalArgumentException ex) {
Expand All @@ -598,7 +603,7 @@ protected void processScheduledAsync(Scheduled scheduled, Method method, Object
protected Runnable createRunnable(Object target, Method method) {
Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
return new ScheduledMethodRunnable(target, invocableMethod);
return new ScheduledMethodRunnable(target, invocableMethod, this.registrar::getObservationRegistry);
}

private static Duration toDuration(long value, TimeUnit timeUnit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.Supplier;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
Expand All @@ -34,16 +38,22 @@
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.config.DefaultScheduledTaskObservationConvention;
import org.springframework.scheduling.config.ScheduledTaskObservationContext;
import org.springframework.scheduling.config.ScheduledTaskObservationConvention;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

import static org.springframework.scheduling.config.ScheduledTaskObservationDocumentation.TASKS_SCHEDULED_EXECUTION;

/**
* Helper class for @{@link ScheduledAnnotationBeanPostProcessor} to support reactive
* cases without a dependency on optional classes.
*
* @author Simon Baslé
* @author Brian Clozel
* @since 6.1
*/
abstract class ScheduledAnnotationReactiveSupport {
Expand Down Expand Up @@ -157,11 +167,12 @@ static Publisher<?> getPublisherFor(Method method, Object bean) {
* delay is applied until the next iteration).
*/
static Runnable createSubscriptionRunnable(Method method, Object targetBean, Scheduled scheduled,
List<Runnable> subscriptionTrackerRegistry) {
Supplier<ObservationRegistry> observationRegistrySupplier, List<Runnable> subscriptionTrackerRegistry) {

boolean shouldBlock = (scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString()));
Publisher<?> publisher = getPublisherFor(method, targetBean);
return new SubscribingRunnable(publisher, shouldBlock, subscriptionTrackerRegistry);
Supplier<ScheduledTaskObservationContext> contextSupplier = () -> new ScheduledTaskObservationContext(targetBean, method);
return new SubscribingRunnable(publisher, shouldBlock, subscriptionTrackerRegistry, observationRegistrySupplier, contextSupplier);
}


Expand All @@ -173,23 +184,33 @@ static final class SubscribingRunnable implements Runnable {

private final Publisher<?> publisher;

private static final ScheduledTaskObservationConvention DEFAULT_CONVENTION = new DefaultScheduledTaskObservationConvention();

final boolean shouldBlock;

private final List<Runnable> subscriptionTrackerRegistry;

SubscribingRunnable(Publisher<?> publisher, boolean shouldBlock, List<Runnable> subscriptionTrackerRegistry) {
final Supplier<ObservationRegistry> observationRegistrySupplier;

final Supplier<ScheduledTaskObservationContext> contextSupplier;

SubscribingRunnable(Publisher<?> publisher, boolean shouldBlock, List<Runnable> subscriptionTrackerRegistry,
Supplier<ObservationRegistry> observationRegistrySupplier, Supplier<ScheduledTaskObservationContext> contextSupplier) {
this.publisher = publisher;
this.shouldBlock = shouldBlock;
this.subscriptionTrackerRegistry = subscriptionTrackerRegistry;
this.observationRegistrySupplier = observationRegistrySupplier;
this.contextSupplier = contextSupplier;
}

@Override
public void run() {
Observation observation = TASKS_SCHEDULED_EXECUTION.observation(null, DEFAULT_CONVENTION,
this.contextSupplier, this.observationRegistrySupplier.get());
if (this.shouldBlock) {
CountDownLatch latch = new CountDownLatch(1);
TrackingSubscriber subscriber = new TrackingSubscriber(this.subscriptionTrackerRegistry, latch);
this.subscriptionTrackerRegistry.add(subscriber);
this.publisher.subscribe(subscriber);
TrackingSubscriber subscriber = new TrackingSubscriber(this.subscriptionTrackerRegistry, observation, latch);
subscribe(subscriber, observation);
try {
latch.await();
}
Expand All @@ -198,8 +219,19 @@ public void run() {
}
}
else {
TrackingSubscriber subscriber = new TrackingSubscriber(this.subscriptionTrackerRegistry);
this.subscriptionTrackerRegistry.add(subscriber);
TrackingSubscriber subscriber = new TrackingSubscriber(this.subscriptionTrackerRegistry, observation);
subscribe(subscriber, observation);
}
}

private void subscribe(TrackingSubscriber subscriber, Observation observation) {
this.subscriptionTrackerRegistry.add(subscriber);
if (reactorPresent) {
Flux.from(this.publisher)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, observation))
.subscribe(subscriber);
}
else {
this.publisher.subscribe(subscriber);
}
}
Expand All @@ -215,6 +247,8 @@ private static final class TrackingSubscriber implements Subscriber<Object>, Run

private final List<Runnable> subscriptionTrackerRegistry;

private final Observation observation;

@Nullable
private final CountDownLatch blockingLatch;

Expand All @@ -225,19 +259,21 @@ private static final class TrackingSubscriber implements Subscriber<Object>, Run
@Nullable
private Subscription subscription;

TrackingSubscriber(List<Runnable> subscriptionTrackerRegistry) {
this(subscriptionTrackerRegistry, null);
TrackingSubscriber(List<Runnable> subscriptionTrackerRegistry, Observation observation) {
this(subscriptionTrackerRegistry, observation, null);
}

TrackingSubscriber(List<Runnable> subscriptionTrackerRegistry, @Nullable CountDownLatch latch) {
TrackingSubscriber(List<Runnable> subscriptionTrackerRegistry, Observation observation, @Nullable CountDownLatch latch) {
this.subscriptionTrackerRegistry = subscriptionTrackerRegistry;
this.observation = observation;
this.blockingLatch = latch;
}

@Override
public void run() {
if (this.subscription != null) {
this.subscription.cancel();
this.observation.stop();
}
if (this.blockingLatch != null) {
this.blockingLatch.countDown();
Expand All @@ -247,6 +283,7 @@ public void run() {
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.observation.start();
subscription.request(Integer.MAX_VALUE);
}

Expand All @@ -259,6 +296,8 @@ public void onNext(Object obj) {
public void onError(Throwable ex) {
this.subscriptionTrackerRegistry.remove(this);
logger.warn("Unexpected error occurred in scheduled reactive task", ex);
this.observation.error(ex);
this.observation.stop();
if (this.blockingLatch != null) {
this.blockingLatch.countDown();
}
Expand All @@ -267,6 +306,10 @@ public void onError(Throwable ex) {
@Override
public void onComplete() {
this.subscriptionTrackerRegistry.remove(this);
if (this.observation.getContext() instanceof ScheduledTaskObservationContext context) {
context.setComplete(true);
}
this.observation.stop();
if (this.blockingLatch != null) {
this.blockingLatch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.scheduling.config;

import io.micrometer.common.KeyValue;
import io.micrometer.common.KeyValues;

import org.springframework.util.StringUtils;

import static org.springframework.scheduling.config.ScheduledTaskObservationDocumentation.LowCardinalityKeyNames;

/**
* Default implementation for {@link ScheduledTaskObservationConvention}.
* @author Brian Clozel
* @since 6.1.0
*/
public class DefaultScheduledTaskObservationConvention implements ScheduledTaskObservationConvention {

private static final String DEFAULT_NAME = "tasks.scheduled.execution";

private static final KeyValue EXCEPTION_NONE = KeyValue.of(LowCardinalityKeyNames.EXCEPTION, KeyValue.NONE_VALUE);

private static final KeyValue OUTCOME_SUCCESS = KeyValue.of(LowCardinalityKeyNames.OUTCOME, "SUCCESS");

private static final KeyValue OUTCOME_ERROR = KeyValue.of(LowCardinalityKeyNames.OUTCOME, "ERROR");

private static final KeyValue OUTCOME_UNKNOWN = KeyValue.of(LowCardinalityKeyNames.OUTCOME, "UNKNOWN");

@Override
public String getName() {
return DEFAULT_NAME;
}

@Override
public String getContextualName(ScheduledTaskObservationContext context) {
return "task " + StringUtils.uncapitalize(context.getTargetClass().getSimpleName())
+ "." + context.getMethod().getName();
}

@Override
public KeyValues getLowCardinalityKeyValues(ScheduledTaskObservationContext context) {
return KeyValues.of(exception(context), methodName(context), outcome(context), targetType(context));
}

protected KeyValue exception(ScheduledTaskObservationContext context) {
if (context.getError() != null) {
return KeyValue.of(LowCardinalityKeyNames.EXCEPTION, context.getError().getClass().getSimpleName());
}
return EXCEPTION_NONE;
}

protected KeyValue methodName(ScheduledTaskObservationContext context) {
return KeyValue.of(LowCardinalityKeyNames.METHOD_NAME, context.getMethod().getName());
}

protected KeyValue outcome(ScheduledTaskObservationContext context) {
if (context.getError() != null) {
return OUTCOME_ERROR;
}
else if (!context.isComplete()) {
return OUTCOME_UNKNOWN;
}
return OUTCOME_SUCCESS;
}

protected KeyValue targetType(ScheduledTaskObservationContext context) {
return KeyValue.of(LowCardinalityKeyNames.TARGET_TYPE, context.getTargetClass().getSimpleName());
}

}
Loading

0 comments on commit 09cb844

Please sign in to comment.