Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reworks FluxPublish internals to relay on predictable state machine #3538

Merged
merged 12 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;

@BenchmarkMode({Mode.AverageTime})
@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
public class FluxPublishBenchmark {
@Param({"0", "10", "1000", "100000"})
int rangeSize;

Flux<Integer> source;

@Setup(Level.Invocation)
public void setup() {
source = Flux.range(0, rangeSize)
.hide()
.publish()
.autoConnect(Runtime.getRuntime()
.availableProcessors());
}


@State(Scope.Thread)
public static class JmhSubscriber<T> extends CountDownLatch implements CoreSubscriber<T> {

Blackhole blackhole;

Subscription s;

public JmhSubscriber() {
super(1);
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
s.request(Long.MAX_VALUE);
}
}

@Override
public void onNext(T t) {
blackhole.consume(t);
}

@Override
public void onError(Throwable t) {
blackhole.consume(t);
countDown();
}

@Override
public void onComplete() {
countDown();
}
}

@SuppressWarnings("unused")
@Benchmark
@Threads(Threads.MAX)
public Object measureThroughput(Blackhole blackhole, JmhSubscriber<Integer> subscriber) throws InterruptedException {
subscriber.blackhole = blackhole;
source.subscribe(subscriber);
subscriber.await();
return subscriber;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.openjdk.jcstress.infra.results.IIIIII_Result;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;
import org.openjdk.jcstress.infra.results.III_Result;
import reactor.core.Disposable;

import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;

Expand Down Expand Up @@ -445,4 +447,50 @@ public void arbiter(IIIIII_Result r) {
r.r6 = subscriber2.onErrorCalls.get();
}
}

// TODO: uncomment me. Proper discard is not supported yet since we dont have stable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add tests that validate the changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYM? For now discard is weak and the commented test is going to fail

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can uncomment the test once we have an explicit discard hook for hot producers. E.g.

 source.publish(discardedValue -> {..})
 ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is just: do we have any way to show these changes improve anything? Is improper discarding the only flaw right now? It would seem there can be regular omissions to deliver items due to improper concurrency handling right now. The original issue for refCount had a test suggestion validating the delivery of signals is appropriate, can we use a similar test for publish?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, right. I wanted to perform jmh test. Will post results in a moment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chemicL added reasoning for this PR in the header.

// downstream context available all the time. This should be uncommented once we have
// an explicitly passed onDiscard handler
// @JCStressTest
// @Outcome(id = {"10, 1, 0"}, expect = ACCEPTABLE, desc = "all values and completion delivered")
// @Outcome(id = {"10, 0, 1"}, expect = ACCEPTABLE, desc = "some values are delivered some dropped since overflow")
// @State
// public static class ConcurrentDisposeAndProduceStressTest {
//
// final Sinks.Many<Integer> producer = Sinks.unsafe().many().multicast().directAllOrNothing();
//
// final ConnectableFlux<Integer> sharedSource = producer.asFlux().publish(5);
//
// final StressSubscriber<Integer> subscriber = new StressSubscriber<>();
//
// final Disposable disposable;
//
// {
// sharedSource.subscribe(subscriber);
// disposable = sharedSource.connect();
// }
//
// @Actor
// public void dispose() {
// disposable.dispose();
// }
//
// @Actor
// public void emitValues() {
// for (int i = 0; i < 10; i++) {
// if (producer.tryEmitNext(i) != Sinks.EmitResult.OK) {
// Operators.onDiscard(i, subscriber.context);
// }
// }
//
// producer.tryEmitComplete();
// }
//
// @Arbiter
// public void arbiter(III_Result r) {
// r.r1 = subscriber.onNextCalls.get() + subscriber.onNextDiscarded.get();
// r.r2 = subscriber.onCompleteCalls.get();
// r.r3 = subscriber.onErrorCalls.get();
// }
// }
}
Loading
Loading