From 0b988cb1cef0339df81220b937e75fff5c7c9123 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Wed, 4 Oct 2023 21:14:17 +0300 Subject: [PATCH] ensures `addCap` always returns value with flag Signed-off-by: Oleh Dokuka --- .../reactor/core/publisher/FluxCreate.java | 2 +- .../core/publisher/FluxCreateTest.java | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxCreate.java b/reactor-core/src/main/java/reactor/core/publisher/FluxCreate.java index 27564cfc37..d859259ef8 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxCreate.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxCreate.java @@ -652,7 +652,7 @@ static long addCap(BaseSink instance, long toAdd) { s = instance.requested; r = s & Long.MAX_VALUE; if (r == Long.MAX_VALUE) { - return Long.MAX_VALUE; + return s; } u = Operators.addCap(r, toAdd); if (REQUESTED.compareAndSet(instance, s, u | (s & Long.MIN_VALUE))) { diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxCreateTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxCreateTest.java index ddceb93d8e..16c602767b 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxCreateTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxCreateTest.java @@ -16,6 +16,7 @@ package reactor.core.publisher; +import java.time.Duration; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -57,6 +58,26 @@ class FluxCreateTest { + @Test + //https://github.com/reactor/reactor-core/issues/3569 + void ensuresRequestMaxPlusOneDoesNotFailOnNoRequestConsumer() { + Flux.create(sink -> { + sink.next("1"); + sink.next("2"); + sink.next("3"); + sink.complete(); + }) + .as(StepVerifier::create) + .expectNext("1") + .thenRequest(1) + .expectNext("2") + .thenRequest(1) + .expectNext("3") + .thenRequest(1) + .expectComplete() + .verify(Duration.ofMillis(1000)); + } + @Test //https://github.com/reactor/reactor-core/issues/1949 void ensuresConcurrentRequestAndSettingOnRequestAlwaysDeliversDemand() throws ExecutionException, InterruptedException {