You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Use case 1 (real world scenario): I need to split the given INPUT into several 'paths' using different filters, process each path, and then merge paths back again. With this pattern, the INPUT flux is 'processed' just once.
The following chain should not block:
REF_COUNTED = some INPUT Flux -> publish -> refCount(2)
A = apply transformation to REF_COUNTED
B = apply transformation to REF_COUNTED
Flux.merge(A,B) -> blockLast
Use case 2 (just another way to reproduce the problem): split the input into several paths, read results for each path in another thread.
Main Thread:
REF_COUNTED = some INPUT Flux -> publish -> refCount(2)
A = apply transformation to REF_COUNTED
B = apply transformation to REF_COUNTED
Thread1: A -> blockLast
Thread2: B -> blockLast
Actual Behavior
While the above example works fine for simple 'transformations', it blocks forever if some 'transformation' involves another thread.
E.g. A = "some Mono, finishing in another thread" -> flatMapMany(REF_COUNTED)
Steps to Reproduce
importorg.junit.Assert;
importorg.junit.Test;
importreactor.core.CoreSubscriber;
importreactor.core.publisher.Flux;
importreactor.core.publisher.Mono;
importreactor.test.StepVerifier;
importjava.time.Duration;
importjava.util.Collections;
importjava.util.HashMap;
importjava.util.Map;
importjava.util.function.Function;
publicclassRefCountFluxTest {
privatestaticfinalintTEST_ATTEMPTS = 100_000;
@TestpublicvoidwillFail() {
performTest(Function.identity());
//java.lang.AssertionError: expectation "expectNext(2)" failed (expected: onNext(2); actual: onComplete())//java.lang.AssertionError: expectation "expectNext(1)" failed (expected value: 1; actual value: 2)//java.lang.AssertionError: VerifySubscriber timed out on ...
}
@TestpublicvoidwillNotFail() {
performTest(SynchronizedSubscribeFlux::new);
}
/** * Flux.just(1,2) -> refCount(2) -> Flux.merge(odd,even) */privatevoidtryReproduce(Function<Flux<Integer>, Flux<Integer>> modifyInput) {
Flux<Integer> input = Flux.just(1, 2).publish().refCount(2);
Flux<Integer> modifiedInput = modifyInput.apply(input); //either original input or with fix applied//Mono<Long> delay = Mono.fromCompletionStage(CompletableFuture.supplyAsync(() -> 1L));Mono<Long> delay = Mono.delay(Duration.ofNanos(1));
Flux<Integer> odd = delay.flatMapMany(unused -> modifiedInput.filter(v -> v % 2 == 1)); //this will make subscribe from another threadFlux<Integer> even = modifiedInput.filter(v -> v % 2 == 0);
Flux<Integer> merged = Flux.merge(odd, even);
merged.as(StepVerifier::create).expectNext(1).expectNext(2).expectComplete().verify(Duration.ofSeconds(5));
}
//simulation of the suggested fix (see synchronized keyword)publicstaticclassSynchronizedSubscribeFlux<T> extendsFlux<T> {
privatefinalFlux<T> delegate;
publicSynchronizedSubscribeFlux(Flux<T> delegate) {
this.delegate = delegate;
}
@Overridepublicvoidsubscribe(CoreSubscriber<? superT> actual) {
synchronized (this) {
delegate.subscribe(actual);
}
}
}
//test runner code - collects errors and asserts they don't existprivatevoidperformTest(Function<Flux<Integer>, Flux<Integer>> modifyInput) {
Map<String, Integer> errorStats = performTestReportStats(modifyInput);
Assert.assertEquals(Collections.emptyMap(), errorStats);
}
privateMap<String, Integer> performTestReportStats(Function<Flux<Integer>, Flux<Integer>> modifyInput) {
Map<String, Integer> errors = newHashMap<>();
//calling tryReproduce several times and collecting errors. however, if timeout happens, exit earlyfor (inti = 0; i < TEST_ATTEMPTS; i++) {
try {
tryReproduce(modifyInput);
} catch (AssertionErrort) {
Stringmessage = t.getMessage();
booleantimeout = message.contains("VerifySubscriber timed out on ");
if (timeout) {
message = "VerifySubscriber timed out on";
}
errors.compute(message, (k, v) -> v == null ? 1 : v + 1);
if (timeout) {
//if there was timeout no need to wait for another onebreak;
}
}
}
returnerrors;
}
}
Possible Solution
Make the whole method reactor.core.publisher.FluxRefCount#subscribe synchronized . The test above shows it would help.
Your Environment
Reactor version(s) used: 3.5.6
Other relevant libraries versions (eg. netty, ...): none
JVM version (java -version): 1.8.0_201
OS and version (eg uname -a): Windows 11
The text was updated successfully, but these errors were encountered:
@gowa Thanks for finding this! This indeed is a bug.
However the operator it selves is implemented incorrectly so the problem is winder and can not be just solved by moving subscribe statement into synchronize block.
I will be reworking it fundamentally.
Thanks,
Oleh
Expected Behavior
Use case 1 (real world scenario): I need to split the given INPUT into several 'paths' using different filters, process each path, and then merge paths back again. With this pattern, the INPUT flux is 'processed' just once.
The following chain should not block:
REF_COUNTED = some INPUT Flux -> publish -> refCount(2)
A = apply transformation to REF_COUNTED
B = apply transformation to REF_COUNTED
Flux.merge(A,B) -> blockLast
Use case 2 (just another way to reproduce the problem): split the input into several paths, read results for each path in another thread.
Main Thread:
REF_COUNTED = some INPUT Flux -> publish -> refCount(2)
A = apply transformation to REF_COUNTED
B = apply transformation to REF_COUNTED
Thread1: A -> blockLast
Thread2: B -> blockLast
Actual Behavior
While the above example works fine for simple 'transformations', it blocks forever if some 'transformation' involves another thread.
E.g. A = "some Mono, finishing in another thread" -> flatMapMany(REF_COUNTED)
Steps to Reproduce
Possible Solution
Make the whole method reactor.core.publisher.FluxRefCount#subscribe synchronized . The test above shows it would help.
Your Environment
netty
, ...): nonejava -version
): 1.8.0_201uname -a
): Windows 11The text was updated successfully, but these errors were encountered: