diff --git a/spring-core/src/main/java/org/springframework/core/CoroutinesUtils.java b/spring-core/src/main/java/org/springframework/core/CoroutinesUtils.java index c2724b507824..66917a8e7586 100644 --- a/spring-core/src/main/java/org/springframework/core/CoroutinesUtils.java +++ b/spring-core/src/main/java/org/springframework/core/CoroutinesUtils.java @@ -21,6 +21,7 @@ import java.util.Objects; import kotlin.Unit; +import kotlin.coroutines.CoroutineContext; import kotlin.jvm.JvmClassMappingKt; import kotlin.reflect.KClass; import kotlin.reflect.KClassifier; @@ -66,17 +67,39 @@ public static Deferred monoToDeferred(Mono source) { (scope, continuation) -> MonoKt.awaitSingleOrNull(source, continuation)); } + /** + * Invoke a suspending function and converts it to {@link Mono} or + * {@link Flux}. Uses an {@linkplain Dispatchers#getUnconfined() unconfined} + * dispatcher. + * @param method the suspending function to invoke + * @param target the target to invoke {@code method} on + * @param args the function arguments + * @return the method invocation result as reactive stream + */ + public static Publisher invokeSuspendingFunction(Method method, Object target, + Object... args) { + return invokeSuspendingFunction(Dispatchers.getUnconfined(), method, target, args); + } + /** * Invoke a suspending function and converts it to {@link Mono} or * {@link Flux}. + * @param context the coroutine context to use + * @param method the suspending function to invoke + * @param target the target to invoke {@code method} on + * @param args the function arguments + * @return the method invocation result as reactive stream + * @since 6.0 */ @SuppressWarnings("deprecation") - public static Publisher invokeSuspendingFunction(Method method, Object target, Object... args) { + public static Publisher invokeSuspendingFunction(CoroutineContext context, Method method, Object target, + Object... args) { + KFunction function = Objects.requireNonNull(ReflectJvmMapping.getKotlinFunction(method)); if (method.isAccessible() && !KCallablesJvm.isAccessible(function)) { KCallablesJvm.setAccessible(function, true); } - Mono mono = MonoKt.mono(Dispatchers.getUnconfined(), (scope, continuation) -> + Mono mono = MonoKt.mono(context, (scope, continuation) -> KCallables.callSuspend(function, getSuspendedFunctionArgs(target, args), continuation)) .filter(result -> !Objects.equals(result, Unit.INSTANCE)) .onErrorMap(InvocationTargetException.class, InvocationTargetException::getTargetException);