Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refCount gets hung : race conditions in FluxRefCount#subscribe #3487

Closed
gowa opened this issue Jun 2, 2023 · 2 comments
Closed

refCount gets hung : race conditions in FluxRefCount#subscribe #3487

gowa opened this issue Jun 2, 2023 · 2 comments
Labels
type/bug A general bug
Milestone

Comments

@gowa
Copy link

gowa commented Jun 2, 2023

Expected Behavior

Use case 1 (real world scenario): I need to split the given INPUT into several 'paths' using different filters, process each path, and then merge paths back again. With this pattern, the INPUT flux is 'processed' just once.

The following chain should not block:
REF_COUNTED = some INPUT Flux -> publish -> refCount(2)
A = apply transformation to REF_COUNTED
B = apply transformation to REF_COUNTED
Flux.merge(A,B) -> blockLast

Use case 2 (just another way to reproduce the problem): split the input into several paths, read results for each path in another thread.
Main Thread:
REF_COUNTED = some INPUT Flux -> publish -> refCount(2)
A = apply transformation to REF_COUNTED
B = apply transformation to REF_COUNTED
Thread1: A -> blockLast
Thread2: B -> blockLast

Actual Behavior

While the above example works fine for simple 'transformations', it blocks forever if some 'transformation' involves another thread.
E.g. A = "some Mono, finishing in another thread" -> flatMapMany(REF_COUNTED)

Steps to Reproduce

import org.junit.Assert;
import org.junit.Test;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

public class RefCountFluxTest {
    private static final int TEST_ATTEMPTS = 100_000;

    @Test
    public void willFail() {
        performTest(Function.identity());
        //java.lang.AssertionError: expectation "expectNext(2)" failed (expected: onNext(2); actual: onComplete())
        //java.lang.AssertionError: expectation "expectNext(1)" failed (expected value: 1; actual value: 2)
        //java.lang.AssertionError: VerifySubscriber timed out on ...
    }

    @Test
    public void willNotFail() {
        performTest(SynchronizedSubscribeFlux::new);
    }

    /**
     * Flux.just(1,2) -> refCount(2) -> Flux.merge(odd,even)
     */
    private void tryReproduce(Function<Flux<Integer>, Flux<Integer>> modifyInput) {
        Flux<Integer> input = Flux.just(1, 2).publish().refCount(2);

        Flux<Integer> modifiedInput = modifyInput.apply(input); //either original input or with fix applied

        //Mono<Long> delay = Mono.fromCompletionStage(CompletableFuture.supplyAsync(() -> 1L));
        Mono<Long> delay = Mono.delay(Duration.ofNanos(1));

        Flux<Integer> odd = delay.flatMapMany(unused -> modifiedInput.filter(v -> v % 2 == 1)); //this will make subscribe from another thread
        Flux<Integer> even = modifiedInput.filter(v -> v % 2 == 0);

        Flux<Integer> merged = Flux.merge(odd, even);

        merged.as(StepVerifier::create).expectNext(1).expectNext(2).expectComplete().verify(Duration.ofSeconds(5));
    }

    //simulation of the suggested fix (see synchronized keyword)
    public static class SynchronizedSubscribeFlux<T> extends Flux<T> {
        private final Flux<T> delegate;

        public SynchronizedSubscribeFlux(Flux<T> delegate) {
            this.delegate = delegate;
        }

        @Override
        public void subscribe(CoreSubscriber<? super T> actual) {
            synchronized (this) {
                delegate.subscribe(actual);
            }
        }
    }

    //test runner code - collects errors and asserts they don't exist
    private void performTest(Function<Flux<Integer>, Flux<Integer>> modifyInput) {
        Map<String, Integer> errorStats = performTestReportStats(modifyInput);
        Assert.assertEquals(Collections.emptyMap(), errorStats);
    }

    private Map<String, Integer> performTestReportStats(Function<Flux<Integer>, Flux<Integer>> modifyInput) {
        Map<String, Integer> errors = new HashMap<>();

        //calling tryReproduce several times and collecting errors. however, if timeout happens, exit early
        for (int i = 0; i < TEST_ATTEMPTS; i++) {
            try {
                tryReproduce(modifyInput);
            } catch (AssertionError t) {
                String message = t.getMessage();
                boolean timeout = message.contains("VerifySubscriber timed out on ");
                if (timeout) {
                    message = "VerifySubscriber timed out on";
                }

                errors.compute(message, (k, v) -> v == null ? 1 : v + 1);

                if (timeout) {
                    //if there was timeout no need to wait for another one
                    break;
                }
            }
        }

        return errors;
    }
}

Possible Solution

Make the whole method reactor.core.publisher.FluxRefCount#subscribe synchronized . The test above shows it would help.

Your Environment

  • Reactor version(s) used: 3.5.6
  • Other relevant libraries versions (eg. netty, ...): none
  • JVM version (java -version): 1.8.0_201
  • OS and version (eg uname -a): Windows 11
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Jun 2, 2023
@gowa
Copy link
Author

gowa commented Jun 3, 2023

#3488 has better tests and correct expected results and moves just one line in FluxRefCount onSubscribe to under the synchronized section,

@OlegDokuka OlegDokuka added type/bug A general bug and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Jun 13, 2023
@OlegDokuka
Copy link
Contributor

@gowa Thanks for finding this! This indeed is a bug.

However the operator it selves is implemented incorrectly so the problem is winder and can not be just solved by moving subscribe statement into synchronize block.

I will be reworking it fundamentally.
Thanks,
Oleh

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

No branches or pull requests

3 participants