Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce global checkpoint listeners #32696

Merged
merged 22 commits into from
Aug 15, 2018

Conversation

jasontedor
Copy link
Member

This commit introduces the ability for global checkpoint listeners to be registered at the shard level. These listeners are notified when the global checkpoint is updated, and also when the shard closes. To encapsulate these listeners, we introduce a shard-level component that handles synchronization of notification and modifications to the collection of listeners.

Relates #32651

This commit introduces the ability for global checkpoint listeners to be
registered at the shard level. These listeners are notified when the
global checkpoint is updated, and also when the shard closes. To
encapsulate these listeners, we introduce a shard-level component that
handles synchronization of notification and modifications to the
collection of listeners.
@jasontedor jasontedor added >enhancement review v7.0.0 :Distributed/Distributed A catch all label for anything in the Distributed Area. If you aren't sure, use this one. v6.5.0 labels Aug 8, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jasontedor . I think we should change the API a bit (see my comment). I also wonder if you considered sharing code between this component and RefreshListeners which is very similar in nature. I say consider as it may very well just end up with generic/boxing hell (as the underlying primitives are different).

*
* @param listener the listener
*/
synchronized void add(final GlobalCheckpointListener listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should have parameter that indicates the global checkpoint last sampled by the component trying to register the listener. We can then immediately call the listener if the last global checkpoint this component was notified about (needs to be captured) is higher. It think this would help avoiding race conditions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my POC I was doing this on the API layer (transport layer) where we have a request with the last known global checkpoint indeed but I agree it makes sense to move that to here.

@jasontedor
Copy link
Member Author

@bleskes It was the boxing/raw generics that I wanted to avoid indeed.

/**
* Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint
* will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null. If the
* global checkpoint is updated, the exception will be null.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I wonder if we should have an onFailure method here for all kind of failures and send the IndexShardClosedException down that route. The down side is of course that people wouldn't be able to pass a method references, but the method won't need to start with if (e != null) etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to use a functional interface for enabling the use of lambda expressions.

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx @jasontedor . I left some more comments.

*/
synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener) {
if (closed) {
throw new IllegalStateException("can not listen for global checkpoint changes on a closed shard [" + shardId + "]");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we throw an AlreadyClosedException like everywhere else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scratch that. I think we should be consistent and pass IndexShardClosedException to the listener in that case (like it was registered and then we were closed)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 50a9a6c.


/**
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the
* listener will fire immediately on the calling thread.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code runs the listener via the executor, which isn't inline with what the comment says. Which one do you intend to hold?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the doc went out of sync with the intent. I pushed 22d13a8.

*/
void globalCheckpointUpdated(final long globalCheckpoint) {
assert globalCheckpoint >= NO_OPS_PERFORMED;
lastKnownGlobalCheckpoint = globalCheckpoint;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has to be synchronized to avoid race conditions with add. Say an add method already sampled the lastKnownGlobalCheckpoint and concluded that it has to add a listener. It then goes on to update the listeners array but haven't done so yet (it's still null). The notifyListeners is called here and it runs the listeners != null test before the add method made it non null. It that case I think we miss the listener.

I may be missing something here, but my point is that this is too complex IMO. I don't see why we can't just make this a simple fully synchronized method. The listeners are call on executor anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I pushed 22d13a8.

…listeners

* elastic/master: (58 commits)
  [ML] Partition-wise maximum scores (elastic#32748)
  [DOCS] XContentBuilder#bytes method removed, using BytesReference.bytes(docBuilder) (elastic#32771)
  HLRC: migration get assistance API (elastic#32744)
  Add a task to run forbiddenapis using cli (elastic#32076)
  [Kerberos] Add debug log statement for exceptions (elastic#32663)
  Make x-pack core pull transport-nio (elastic#32757)
  Painless: Clean Up Whitelist Names (elastic#32791)
  Cat apis: Fix index creation time to use strict date format (elastic#32510)
  Clear Job#finished_time when it is opened (elastic#32605) (elastic#32755)
  Test: Only sniff host metadata for node_selectors (elastic#32750)
  Update scripted metric docs to use `state` variable (elastic#32695)
  Painless: Clean up PainlessCast (elastic#32754)
  [TEST] Certificate NONE not allowed in FIPS JVM (elastic#32753)
  [ML] Refactor ProcessCtrl into Autodetect and Normalizer builders (elastic#32720)
  Access build tools resources (elastic#32201)
  Tests: Disable rolling upgrade tests with system key on fips JVM (elastic#32775)
  HLRC: Ban LoggingDeprecationHandler (elastic#32756)
  Fix test reproducability in AbstractBuilderTestCase setup (elastic#32403)
  Only require java<version>_home env var if needed
  Tests: Muted ScriptDocValuesDatesTests.testJodaTimeBwc
  ...
@jasontedor
Copy link
Member Author

@ywelsch Would you take over reviewing this PR?

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left an initial question

void globalCheckpointUpdated(final long globalCheckpoint) {
assert globalCheckpoint >= NO_OPS_PERFORMED;
synchronized (this) {
lastKnownGlobalCheckpoint = globalCheckpoint;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the add method assumes that the global checkpoint is always strictly increasing. Add an assertion here that this is so?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 48089fb.

final List<GlobalCheckpointListener> currentListeners;
synchronized (this) {
currentListeners = listeners;
listeners = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks as if the listeners are only notified once and then need to reregister if they want more events? I can see why this kind of behavior makes sense for the refresh listeners, with refresh being an expensive operation that ensures that all events registered before the refresh will now see the changes they're waiting for. With global checkpoints, it's less clear to me, as they can be potentially updated many many times per second, so wouldn't you want to stay registered to receive events. If not, will this lead to a storm of reregister events? I think I need to better understand the integration point here, i.e., how this API will be used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ywelsch See #32651. The intended usage is in CCR where a remote cluster will, when the remote cluster is fully caught up, (remotely) attach a single-use listener to the local cluster for the next global checkpoint change. When the global checkpoint is updated, the listener will be invoked which will return a response to the remote cluster that will act as letting the remote cluster know that there are now additional changes to be fetched.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, given this behavior, I wonder if we can make the listener interface simpler (same as Boaz's concern). As the listeners are only notified once and then need to reregister if they want more events, I wonder if it's simpler to just signal an UNASSIGNED_SEQ_NO (or the lastKnownGlobalCheckpoint) on a closing and then have the caller fail on a repeated call to the add method (by throwing directly the exception in that method, not relaying it to the listener). This means that the listener can remain a functional interface (but just a LongConsumer). WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ywelsch Personally I do not buy the argument that

        @Override
        protected void asyncShardOperation(
                final Request request, final ShardId shardId, final ActionListener<Response> listener) throws IOException {
            final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
            final IndexShard indexShard = indexService.getShard(shardId.id());
            indexShard.addGlobalCheckpointListener(
                    request.getGlobalCheckpoint(),
                    (g, e) -> {
                        if (g != UNASSIGNED_SEQ_NO) {
                            listener.onResponse(new Response(g));
                        } else {
                            listener.onFailure(e);
                        }
            });
        }

is less clean than

        @Override
        protected void asyncShardOperation(
                final Request request, final ShardId shardId, final ActionListener<Response> listener) throws IOException {
            final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
            final IndexShard indexShard = indexService.getShard(shardId.id());
            indexShard.addGlobalCheckpointListener(
                    request.getGlobalCheckpoint(),
                    g -> {
                        if (g != UNASSIGNED_SEQ_NO) {
                            listener.onResponse(new Response(g));
                        } else {
                            listener.onFailure(new IndexShardClosedException(shardId));
                        }
            });
        }

We still have to have a check in one form or the other whether or not closing has been signaled to us, so there's always going to be an if check. At least, that is what I read the comment from @bleskes as arguing:

The down side is of course that people wouldn't be able to pass a method references, but the method won't need to start with if (e != null) etc.

In fact, I would argue the approach I have taken is cleaner as it's the shard telling us that we are closed rather than it being signaled indirectly through the value of the global checkpoint passed in the callback. Sure the actual interface is simpler but I prefer the explicit approach. So I think we should either stick with what I have, or have an onClosed callback and lose the ability for the interface to be a functional interface.

Regarding throwing on attempting to register a listener on a closed shard, that was my preferred approach too but @bleskes thought otherwise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There also still the option of adding an onClosed method to the interface with a default NOOP implementation. It will remain a functional interface, and if most tests don't care about the shard closed case, they can treat it like a functional interface. I'll leave the decision to you.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would not be a functional interface for the purposes of the production use-case for this that I have in mind (we have to handle the shard closed event). Thanks, I will leave as-is.

@jasontedor jasontedor dismissed bleskes’s stale review August 14, 2018 14:04

@ywelsch is taking over the review

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left two smaller questions around testing and one ask around simplifying synchronization. Looks good otherwise

* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the
* listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the
* shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
* checkpoint listeners.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you mention here that the listener will be deregistered on notification?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 5b63507.

notifyListeners(globalCheckpoint, null);
}

private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for simplicity, let's move this under the mutex everywhere (and add assert Thread.holdsLock(this) to the beginning of the method). It looks like whenever we call this method, we already go under a mutex for a quick operation before this. In case noone is using the listener functionality, this will therefore amount to the same overhead. In case this infrastructure will be used, listeners should not fly in by the millions in a second, so listeners will mostly be null and there should be no overhead. Let's optimize this in the future if we see any issue. I was scratching my head for a bit to check if concurrency was correct here (and I believe it is), but it's not worth the complexity imho.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I pushed 88dee76.

assertThat(count.get(), equalTo(1));
}

public void testConcurrency() throws BrokenBarrierException, InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add closing to this test? So that each listener gets notified exactly once, even under concurrent closing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, this caught an issue. I pushed 9533679.

final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger);
final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get());
final CyclicBarrier barrier = new CyclicBarrier(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 3 here? How do we ensure we end up running every thread to completion? (I have a hard time counting the barrier.await() here)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are three threads. The thread repeatedly firing updates, the thread repeatedly adding listeners, and the main test thread. We use the barrier to synchronize their start (they all wait on the barrier, then go), and to signal when the updating and adding thread are done (they all wait on the barrier again). So within the updating thread we have:

wait on barrier
do the updating loop
wait on barrier

while within the adding thread we have

wait on barrier
do the adding loop
wait on barrier

and the main test thread does two waits

wait on barrier
wait on barrier

The first wait corresponds to the top wait of the updating and adding threads, and the second wait corresponds to the bottom wait of the updating and adding threads.

Does that help?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is best to look at it like this:

Updating thread:

wait on barrier
do the updating loop
wait on barrier

Adding thread:

wait on barrier
do the adding loop
wait on barrier

Main test thread:

wait on barrier
no-op
wait on barrier

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks for the explanation, makes sense. Can you add a comment to that effect?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ywelsch I pushed 07dab4f.

@jasontedor
Copy link
Member Author

@ywelsch This is ready for another round from you.

…listeners

* elastic/master:
  Watcher: Remove unused hipchat render method (elastic#32211)
  Watcher: Remove extraneous auth classes (elastic#32300)
  Watcher: migrate PagerDuty v1 events API to v2 API (elastic#32285)
  [TEST] Select free port for Minio (elastic#32837)
  MINOR: Remove `IndexTemplateFilter` (elastic#32841)
  Core: Add java time version of rounding classes (elastic#32641)
  Aggregations/HL Rest client fix: missing scores (elastic#32774)
  HLRC: Add Delete License API (elastic#32586)
  INGEST: Create Index Before Pipeline Execute (elastic#32786)
  Fix NOOP bulk updates (elastic#32819)
  Remove client connections from TcpTransport (elastic#31886)
  Increase logging testRetentionPolicyChangeDuringRecovery
  AwaitsFix case-functions.sql-spec
  Mute security-cli tests in FIPS JVM (elastic#32812)
  SCRIPTING: Support BucketAggScript return null (elastic#32811)
  Unmute WildFly tests in FIPS JVM (elastic#32814)
  [TEST] Force a stop to save rollup state before continuing (elastic#32787)
  [test] disable packaging tests for suse boxes
  Mute IndicesRequestIT#testBulk
  [ML][DOCS] Refer to rules feature as custom rules (elastic#32785)
Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger);
final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get());
final CyclicBarrier barrier = new CyclicBarrier(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks for the explanation, makes sense. Can you add a comment to that effect?

@jasontedor jasontedor merged commit 068d03f into elastic:master Aug 15, 2018
@jasontedor jasontedor deleted the global-checkpoint-listeners branch August 15, 2018 16:22
jasontedor added a commit that referenced this pull request Aug 15, 2018
This commit introduces the ability for global checkpoint listeners to be
registered at the shard level. These listeners are notified when the
global checkpoint is updated, and also when the shard closes. To
encapsulate these listeners, we introduce a shard-level component that
handles synchronization of notification and modifications to the
collection of listeners.
jasontedor added a commit that referenced this pull request Aug 15, 2018
* elastic/master:
  Revert "cluster formation DSL - Gradle integration -  part 2 (#32028)" (#32876)
  cluster formation DSL - Gradle integration -  part 2 (#32028)
  Introduce global checkpoint listeners (#32696)
  Move connection profile into connection manager (#32858)
  [ML] Temporarily disabling rolling-upgrade tests
  Use generic AcknowledgedResponse instead of extended classes (#32859)
  [ML] Removing old per-partition normalization code (#32816)
  Use JDK 10 for 6.4 BWC builds (#32866)
  Removed flaky test. Looks like randomisation makes these assertions unreliable.
  [test] mute IndexShardTests.testDocStats
  Introduce the dissect library (#32297)
  Security: remove password hash bootstrap check (#32440)
  Move validation to server for put user requests (#32471)
  [ML] Add high level REST client docs for ML put job endpoint (#32843)
  Test: Fix forbidden uses in test framework (#32824)
  Painless: Change fqn_only to no_import (#32817)
  [test] mute testSearchWithSignificantTermsAgg
  Watcher: Remove unused hipchat render method (#32211)
  Watcher: Remove extraneous auth classes (#32300)
  Watcher: migrate PagerDuty v1 events API to v2 API (#32285)
jasontedor added a commit that referenced this pull request Aug 15, 2018
* 6.x: (96 commits)
  Introduce global checkpoint listeners (#32696)
  Use JDK 10 for 6.4 BWC builds (#32866)
  Remove unused imports - follow up to removal of test in issue 32855
  Removed flaky test. Looks like randomisation makes these assertions unreliable. This test is superfluous - it was added to address #32770 but it later turned out there was an existing test that just required a fix to provide the missing test coverage.
  [test] mute IndexShardTests.testDocStats
  Test: Fix forbidden uses in test framework (#32824)
  Security: remove password hash bootstrap check (#32440)
  Move validation to server for put user requests (#32471)
  [ML] Add high level REST client docs for ML put job endpoint (#32843)
  Painless: Change fqn_only to no_import (#32817)
  [test] mute testSearchWithSignificantTermsAgg
  Backport: CompletableContext class to avoid throwable (#32829)
  [TEST] Select free port for Minio (#32837)
  SCRIPTING: Support BucketAggScript return null (#32811) (#32833)
  HLRC: Add Delete License API (#32586)
  Aggregations/HL Rest client fix: missing scores (#32774)
  HLRC: migration get assistance API (#32744)
  Fix NOOP bulk updates (#32819)
  Increase logging testRetentionPolicyChangeDuringRecovery
  AwaitsFix case-functions.sql-spec
  ...
@jimczi jimczi added v7.0.0-beta1 and removed v7.0.0 labels Feb 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/Distributed A catch all label for anything in the Distributed Area. If you aren't sure, use this one. >enhancement v6.5.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants