-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reworks FluxPublish internals to relay on predictable state machine #3538
Changes from 5 commits
5a428c5
7dd2b28
7284be7
931d470
e824062
1d93148
95185dc
714b633
e73d3cb
6d1c8a7
fb69884
58b0e42
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,8 @@ | |
import org.openjdk.jcstress.infra.results.IIIIII_Result; | ||
import reactor.core.scheduler.Schedulers; | ||
import reactor.util.annotation.Nullable; | ||
import org.openjdk.jcstress.infra.results.III_Result; | ||
import reactor.core.Disposable; | ||
|
||
import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE; | ||
|
||
|
@@ -445,4 +447,50 @@ public void arbiter(IIIIII_Result r) { | |
r.r6 = subscriber2.onErrorCalls.get(); | ||
} | ||
} | ||
|
||
// TODO: uncomment me. Proper discard is not supported yet since we dont have stable | ||
// downstream context available all the time. This should be uncommented once we have | ||
// an explicitly passed onDiscard handler | ||
// @JCStressTest | ||
// @Outcome(id = {"10, 1, 0"}, expect = ACCEPTABLE, desc = "all values and completion delivered") | ||
// @Outcome(id = {"10, 0, 1"}, expect = ACCEPTABLE, desc = "some values are delivered some dropped since overflow") | ||
// @State | ||
// public static class ConcurrentDisposeAndProduceStressTest { | ||
// | ||
// final Sinks.Many<Integer> producer = Sinks.unsafe().many().multicast().directAllOrNothing(); | ||
// | ||
// final ConnectableFlux<Integer> sharedSource = producer.asFlux().publish(5); | ||
// | ||
// final StressSubscriber<Integer> subscriber1 = new StressSubscriber<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need for "1" suffix There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
// | ||
// final Disposable disposable; | ||
// | ||
// { | ||
// sharedSource.subscribe(subscriber1); | ||
// disposable = sharedSource.connect(); | ||
// } | ||
// | ||
// @Actor | ||
// public void subscribe1() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. consider changing the name to dispose There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
// disposable.dispose(); | ||
// } | ||
// | ||
// @Actor | ||
// public void subscribe2() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. consider changing the name to emitItems There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
// for (int i = 0; i < 10; i++) { | ||
// if (producer.tryEmitNext(i) != Sinks.EmitResult.OK) { | ||
// Operators.onDiscard(i, subscriber1.context); | ||
// } | ||
// } | ||
// | ||
// producer.tryEmitComplete(); | ||
// } | ||
// | ||
// @Arbiter | ||
// public void arbiter(III_Result r) { | ||
// r.r1 = subscriber1.onNextCalls.get() + subscriber1.onNextDiscarded.get(); | ||
// r.r2 = subscriber1.onCompleteCalls.get(); | ||
// r.r3 = subscriber1.onErrorCalls.get(); | ||
// } | ||
// } | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add tests that validate the changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYM? For now discard is weak and the commented test is going to fail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can uncomment the test once we have an explicit discard hook for hot producers. E.g.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is just: do we have any way to show these changes improve anything? Is improper discarding the only flaw right now? It would seem there can be regular omissions to deliver items due to improper concurrency handling right now. The original issue for refCount had a test suggestion validating the delivery of signals is appropriate, can we use a similar test for publish?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, right. I wanted to perform jmh test. Will post results in a moment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chemicL added reasoning for this PR in the header.