Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reworks FluxPublish internals to relay on predictable state machine #3538

Merged
merged 12 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.openjdk.jcstress.infra.results.IIIIII_Result;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;
import org.openjdk.jcstress.infra.results.III_Result;
import reactor.core.Disposable;

import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;

Expand Down Expand Up @@ -445,4 +447,50 @@ public void arbiter(IIIIII_Result r) {
r.r6 = subscriber2.onErrorCalls.get();
}
}

// TODO: uncomment me. Proper discard is not supported yet since we dont have stable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add tests that validate the changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYM? For now discard is weak and the commented test is going to fail

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can uncomment the test once we have an explicit discard hook for hot producers. E.g.

 source.publish(discardedValue -> {..})
 ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is just: do we have any way to show these changes improve anything? Is improper discarding the only flaw right now? It would seem there can be regular omissions to deliver items due to improper concurrency handling right now. The original issue for refCount had a test suggestion validating the delivery of signals is appropriate, can we use a similar test for publish?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, right. I wanted to perform jmh test. Will post results in a moment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chemicL added reasoning for this PR in the header.

// downstream context available all the time. This should be uncommented once we have
// an explicitly passed onDiscard handler
// @JCStressTest
// @Outcome(id = {"10, 1, 0"}, expect = ACCEPTABLE, desc = "all values and completion delivered")
// @Outcome(id = {"10, 0, 1"}, expect = ACCEPTABLE, desc = "some values are delivered some dropped since overflow")
// @State
// public static class ConcurrentDisposeAndProduceStressTest {
//
// final Sinks.Many<Integer> producer = Sinks.unsafe().many().multicast().directAllOrNothing();
//
// final ConnectableFlux<Integer> sharedSource = producer.asFlux().publish(5);
//
// final StressSubscriber<Integer> subscriber1 = new StressSubscriber<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for "1" suffix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

//
// final Disposable disposable;
//
// {
// sharedSource.subscribe(subscriber1);
// disposable = sharedSource.connect();
// }
//
// @Actor
// public void subscribe1() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider changing the name to dispose

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

// disposable.dispose();
// }
//
// @Actor
// public void subscribe2() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider changing the name to emitItems

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

// for (int i = 0; i < 10; i++) {
// if (producer.tryEmitNext(i) != Sinks.EmitResult.OK) {
// Operators.onDiscard(i, subscriber1.context);
// }
// }
//
// producer.tryEmitComplete();
// }
//
// @Arbiter
// public void arbiter(III_Result r) {
// r.r1 = subscriber1.onNextCalls.get() + subscriber1.onNextDiscarded.get();
// r.r2 = subscriber1.onCompleteCalls.get();
// r.r3 = subscriber1.onErrorCalls.get();
// }
// }
}
Loading
Loading