diff --git a/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport.java b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport.java index f0e89d921c70..3f6b1696433b 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport.java +++ b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport.java @@ -38,6 +38,7 @@ import org.springframework.core.ReactiveAdapter; import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.lang.Nullable; +import org.springframework.scheduling.SchedulingAwareRunnable; import org.springframework.scheduling.support.DefaultScheduledTaskObservationConvention; import org.springframework.scheduling.support.ScheduledTaskObservationContext; import org.springframework.scheduling.support.ScheduledTaskObservationConvention; @@ -120,8 +121,10 @@ public static Runnable createSubscriptionRunnable(Method method, Object targetBe boolean shouldBlock = (scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString())); Publisher publisher = getPublisherFor(method, targetBean); - Supplier contextSupplier = () -> new ScheduledTaskObservationContext(targetBean, method); - return new SubscribingRunnable(publisher, shouldBlock, subscriptionTrackerRegistry, observationRegistrySupplier, contextSupplier); + Supplier contextSupplier = + () -> new ScheduledTaskObservationContext(targetBean, method); + return new SubscribingRunnable(publisher, shouldBlock, scheduled.scheduler(), + subscriptionTrackerRegistry, observationRegistrySupplier, contextSupplier); } /** @@ -180,30 +183,43 @@ static Publisher getPublisherFor(Method method, Object bean) { * Utility implementation of {@code Runnable} that subscribes to a {@code Publisher} * or subscribes-then-blocks if {@code shouldBlock} is set to {@code true}. */ - static final class SubscribingRunnable implements Runnable { + static final class SubscribingRunnable implements SchedulingAwareRunnable { - private final Publisher publisher; + private static final ScheduledTaskObservationConvention DEFAULT_CONVENTION = + new DefaultScheduledTaskObservationConvention(); - private static final ScheduledTaskObservationConvention DEFAULT_CONVENTION = new DefaultScheduledTaskObservationConvention(); + private final Publisher publisher; final boolean shouldBlock; + @Nullable + private final String qualifier; + private final List subscriptionTrackerRegistry; final Supplier observationRegistrySupplier; final Supplier contextSupplier; - SubscribingRunnable(Publisher publisher, boolean shouldBlock, List subscriptionTrackerRegistry, - Supplier observationRegistrySupplier, Supplier contextSupplier) { + SubscribingRunnable(Publisher publisher, boolean shouldBlock, + @Nullable String qualifier, List subscriptionTrackerRegistry, + Supplier observationRegistrySupplier, + Supplier contextSupplier) { this.publisher = publisher; this.shouldBlock = shouldBlock; + this.qualifier = qualifier; this.subscriptionTrackerRegistry = subscriptionTrackerRegistry; this.observationRegistrySupplier = observationRegistrySupplier; this.contextSupplier = contextSupplier; } + @Override + @Nullable + public String getQualifier() { + return this.qualifier; + } + @Override public void run() { Observation observation = TASKS_SCHEDULED_EXECUTION.observation(null, DEFAULT_CONVENTION,