Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce global checkpoint background sync #26591

Merged
merged 42 commits into from
Sep 21, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c0e7443
Introduce global checkpoint background sync
jasontedor Aug 28, 2017
511f96e
Cleanup after test
jasontedor Sep 11, 2017
e1dc814
Recollect stats
jasontedor Sep 12, 2017
86ddf79
Merge branch 'master' into global-checkpoint-sync
jasontedor Sep 19, 2017
e0657a7
Add after-op sync
jasontedor Sep 20, 2017
b79b13f
Merge branch 'master' into global-checkpoint-sync
jasontedor Sep 20, 2017
f7a76cd
Iteration
jasontedor Sep 20, 2017
3881d76
Iteration
jasontedor Sep 20, 2017
89bbf84
Remove comments
jasontedor Sep 20, 2017
1082a31
Close it
jasontedor Sep 20, 2017
15453b4
Merge branch 'master' into global-checkpoint-sync
jasontedor Sep 20, 2017
aa0c62c
refresh needed
jasontedor Sep 20, 2017
721f725
remove ensure green
jasontedor Sep 20, 2017
9bc5155
remove background sync test setting
jasontedor Sep 20, 2017
63e9d80
Revert "remove background sync test setting"
jasontedor Sep 21, 2017
6ade720
test iteratoin
jasontedor Sep 21, 2017
354df1c
disable bwc tests
jasontedor Sep 21, 2017
79742ea
imports
jasontedor Sep 21, 2017
d9fc19d
Formatting of method
jasontedor Sep 21, 2017
afb082f
Remove leftover code
jasontedor Sep 21, 2017
9bf875e
State handling
jasontedor Sep 21, 2017
007d5c4
Logging on failed sync
jasontedor Sep 21, 2017
cb9373b
no fallthrough
jasontedor Sep 21, 2017
7e6d1bf
Setting
jasontedor Sep 21, 2017
f7295ac
Critical fix
jasontedor Sep 21, 2017
f237d88
fix comment
jasontedor Sep 21, 2017
e88b92c
Revert move, add javadocs
jasontedor Sep 21, 2017
030156d
Handle execute future
jasontedor Sep 21, 2017
0343372
Only check in-sync shards
jasontedor Sep 21, 2017
cf4e67b
Test iteration
jasontedor Sep 21, 2017
f82df30
More testing
jasontedor Sep 21, 2017
507806b
refactor shard creation
jasontedor Sep 21, 2017
530addd
handle closed
jasontedor Sep 21, 2017
f3b04dc
assertSeqNos
jasontedor Sep 21, 2017
5030a77
Add distruption test
jasontedor Sep 21, 2017
b640b10
revert formatting changes
jasontedor Sep 21, 2017
cccdec6
checkstyle
jasontedor Sep 21, 2017
d43f794
imports
jasontedor Sep 21, 2017
26e4c76
remove dead code from create index
jasontedor Sep 21, 2017
b80b728
everything in the right place
jasontedor Sep 21, 2017
b8adcce
revert formatting changes
jasontedor Sep 21, 2017
c041ea2
Merge branch 'master' into global-checkpoint-sync
jasontedor Sep 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), globalCheckpoint);
} catch (final AlreadyClosedException e) {
// okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
Expand Down Expand Up @@ -315,6 +316,14 @@ public interface Primary<
*/
void updateLocalCheckpointForShard(String allocationId, long checkpoint);

/**
* Update the local knowledge of the global checkpoint for the specified allocation ID.
*
* @param allocationId the allocation ID to update the global checkpoint for
* @param globalCheckpoint the global checkpoint
*/
void updateGlobalCheckpointForShard(String allocationId, long globalCheckpoint);

/**
* Returns the local checkpoint on the primary shard.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,11 @@ public void updateLocalCheckpointForShard(String allocationId, long checkpoint)
indexShard.updateLocalCheckpointForShard(allocationId, checkpoint);
}

@Override
public void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) {
indexShard.updateGlobalCheckpointForShard(allocationId, globalCheckpoint);
}

@Override
public long localCheckpoint() {
return indexShard.getLocalCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
(tmpImd.getNumberOfReplicas() + 1) + "]");
}
// create the index here (on the master) to validate it can be created, as well as adding the mapping
final IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList());
final IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList(), s -> {});
createdIndex = indexService.index();
// now add the mappings
MapperService mapperService = indexService.mapperService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ ClusterState innerExecute(ClusterState currentState, Iterable<AliasAction> actio
if (indexService == null) {
// temporarily create the index and add mappings so we can parse the filter
try {
indexService = indicesService.createIndex(index, emptyList());
indexService = indicesService.createIndex(index, emptyList(), s -> {});
indicesToClose.add(index.getIndex());
} catch (IOException e) {
throw new ElasticsearchException("Failed to create temporary index for parsing the alias", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private static void validateAndAddTemplate(final PutRequest request, IndexTempla
.build();

final IndexMetaData tmpIndexMetadata = IndexMetaData.builder(temporaryIndexName).settings(dummySettings).build();
IndexService dummyIndexService = indicesService.createIndex(tmpIndexMetadata, Collections.emptyList());
IndexService dummyIndexService = indicesService.createIndex(tmpIndexMetadata, Collections.emptyList(), s -> {});
createdIndex = dummyIndexService.index();

templateBuilder.order(request.order);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ ClusterState executeRefresh(final ClusterState currentState, final List<RefreshT
IndexService indexService = indicesService.indexService(indexMetaData.getIndex());
if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetaData, Collections.emptyList());
indexService = indicesService.createIndex(indexMetaData, Collections.emptyList(), s -> {});
removeIndex = true;
indexService.mapperService().merge(indexMetaData, MergeReason.MAPPING_RECOVERY, true);
}
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.BM25SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService;
Expand Down Expand Up @@ -330,7 +331,8 @@ public IndexService newIndexService(
IndicesQueryCache indicesQueryCache,
MapperRegistry mapperRegistry,
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry)
NamedWriteableRegistry namedWriteableRegistry,
Consumer<ShardId> globalCheckpointSyncer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this and remove all changes to IndicesService.createIndex etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 26e4c76.

throws IOException {
final IndexEventListener eventListener = freeze();
IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null
Expand Down Expand Up @@ -365,7 +367,7 @@ public IndexService newIndexService(
new SimilarityService(indexSettings, scriptService, similarities),
shardStoreDeleter, analysisRegistry, engineFactory.get(), circuitBreakerService, bigArrays, threadPool, scriptService,
client, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry,
indicesFieldDataCache, searchOperationListeners, indexOperationListeners, namedWriteableRegistry);
indicesFieldDataCache, searchOperationListeners, indexOperationListeners, namedWriteableRegistry, globalCheckpointSyncer);
}

/**
Expand Down
85 changes: 80 additions & 5 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -109,10 +110,12 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean deleted = new AtomicBoolean(false);
private final IndexSettings indexSettings;
private final List<IndexingOperationListener> indexingOperationListeners;
private final List<SearchOperationListener> searchOperationListeners;
private final List<IndexingOperationListener> indexingOperationListeners;
private final Consumer<ShardId> globalCheckpointSyncer;
private volatile AsyncRefreshTask refreshTask;
private volatile AsyncTranslogFSync fsyncTask;
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;

// don't convert to Setting<> and register... we only set this in tests and register via a plugin
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
Expand Down Expand Up @@ -145,7 +148,8 @@ public IndexService(
IndicesFieldDataCache indicesFieldDataCache,
List<SearchOperationListener> searchOperationListeners,
List<IndexingOperationListener> indexingOperationListeners,
NamedWriteableRegistry namedWriteableRegistry) throws IOException {
NamedWriteableRegistry namedWriteableRegistry,
Consumer<ShardId> globalCheckpointSyncer) throws IOException {
super(indexSettings);
this.indexSettings = indexSettings;
this.xContentRegistry = xContentRegistry;
Expand Down Expand Up @@ -182,11 +186,13 @@ public IndexService(
this.engineFactory = engineFactory;
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this);
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
this.globalCheckpointSyncer = globalCheckpointSyncer;
// kick off async ops for the first shard in this index
this.refreshTask = new AsyncRefreshTask(this);
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
rescheduleFsyncTask(indexSettings.getTranslogDurability());
}

Expand Down Expand Up @@ -268,7 +274,15 @@ public synchronized void close(final String reason, boolean delete) throws IOExc
}
}
} finally {
IOUtils.close(bitsetFilterCache, indexCache, indexFieldData, mapperService, refreshTask, fsyncTask, trimTranslogTask);
IOUtils.close(
bitsetFilterCache,
indexCache,
indexFieldData,
mapperService,
refreshTask,
fsyncTask,
trimTranslogTask,
globalCheckpointTask);
}
}
}
Expand Down Expand Up @@ -365,7 +379,7 @@ public synchronized IndexShard createShard(ShardRouting routing) throws IOExcept
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier,
indexCache, mapperService, similarityService, engineFactory,
eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
searchOperationListeners, indexingOperationListeners);
searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId));
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
Expand Down Expand Up @@ -710,6 +724,30 @@ private void maybeTrimTranslog() {
}
}

private void syncGlobalCheckpoints() {
for (final IndexShard shard : this.shards.values()) {
if (shard.routingEntry().active() && shard.routingEntry().primary()) {
switch (shard.state()) {
case CREATED:
case RECOVERING:
case CLOSED:
continue;
case POST_RECOVERY:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

routing entry cannot be active while shard state is POST_RECOVERY ;-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 9bf875e.

case STARTED:
case RELOCATED:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case its relocated, we won't get the operation permit, so I think we can just skip it (i.e. treat it same way as CLOSED).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 9bf875e.

try {
shard.maybeSyncGlobalCheckpoint();
} catch (final AlreadyClosedException | IndexShardClosedException e) {
// the shard was closed concurrently, continue
}
continue;
default:
throw new IllegalStateException("unknown state [" + shard.state() + "]");
}
}
}
}

abstract static class BaseAsyncTask implements Runnable, Closeable {
protected final IndexService indexService;
protected final ThreadPool threadPool;
Expand Down Expand Up @@ -877,6 +915,39 @@ public String toString() {
}
}

private static final TimeValue GLOBAL_CHECKPOINT_SYNC_INTERVAL = TimeValue.timeValueSeconds(30);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having this constant here, I would put the Setting<TimeValue> constant here, even if the setting is not registered (and add comment here).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I pushed 7e6d1bf.


/**
* Background task that syncs the global checkpoint to replicas.
*/
final class AsyncGlobalCheckpointTask extends BaseAsyncTask {

AsyncGlobalCheckpointTask(final IndexService indexService) {
// index.global_checkpoint_sync_interval is not a real setting, it is only registered in tests
super(
indexService,
indexService
.getIndexSettings()
.getSettings()
.getAsTime("index.global_checkpoint_sync.interval", GLOBAL_CHECKPOINT_SYNC_INTERVAL));
}

@Override
protected void runInternal() {
indexService.syncGlobalCheckpoints();
}

@Override
protected String getThreadPool() {
return ThreadPool.Names.GENERIC;
}

@Override
public String toString() {
return "global_checkpoint_sync";
}
}

AsyncRefreshTask getRefreshTask() { // for tests
return refreshTask;
}
Expand All @@ -885,6 +956,10 @@ AsyncTranslogFSync getFsyncTask() { // for tests
return fsyncTask;
}

AsyncGlobalCheckpointTask getGlobalCheckpointTask() {
return globalCheckpointTask;
}

/**
* Clears the caches for the given shard id if the shard is still allocated on this node
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
public class GlobalCheckpointSyncAction extends TransportReplicationAction<
GlobalCheckpointSyncAction.Request,
GlobalCheckpointSyncAction.Request,
ReplicationResponse> implements IndexEventListener {
ReplicationResponse> {

public static String ACTION_NAME = "indices:admin/seq_no/global_checkpoint_sync";

Expand Down Expand Up @@ -76,6 +76,10 @@ public GlobalCheckpointSyncAction(
ThreadPool.Names.SAME);
}

public void updateGlobalCheckpointForShard(final ShardId shardId) {
execute(new Request(shardId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute returns a future, which we just ignore here. This means that we won't be able to log information if a gcp sync failed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 030156d.

}

@Override
protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
Expand All @@ -93,11 +97,6 @@ protected void sendReplicaRequest(
}
}

@Override
public void onShardInactive(final IndexShard indexShard) {
execute(new Request(indexShard.shardId()));
}

@Override
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
final Request request, final IndexShard indexShard) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.index.seqno;

import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -496,6 +498,38 @@ public synchronized void updateLocalCheckpoint(final String allocationId, final
assert invariant();
}

private ObjectLongMap<String> globalCheckpoints = new ObjectLongHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left over from the first version, sorry. I removed this in afb082f.


/**
* Update the local knowledge of the global checkpoint for the specified allocation ID.
*
* @param allocationId the allocation ID to update the global checkpoint for
* @param globalCheckpoint the global checkpoint
*/
synchronized void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) {
assert primaryMode;
assert handoffInProgress == false;
final long current = globalCheckpoints.getOrDefault(allocationId, SequenceNumbers.UNASSIGNED_SEQ_NO);
if (globalCheckpoint > current) {
globalCheckpoints.put(allocationId, globalCheckpoint);
}
}

/**
* Returns the global checkpoints for all shards.
*
* @param allocationId the allocationId to use for the global checkpoint on the primary
*
* @return the global checkpoints for all shards
*/
synchronized ObjectLongMap<String> getGlobalCheckpoints(final String allocationId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can set the allocation id of the primary when creating the tracker in the constructor (we then also don't need to pass it anymore to the activatePrimaryMode method).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by #26630.

assert primaryMode;
assert handoffInProgress == false;
final ObjectLongMap<String> copy = new ObjectLongHashMap<>(globalCheckpoints);
copy.put(allocationId, globalCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the above suggestion (allocation id of primary set during construction), we can just have a globalCheckpoints map that replaces the current globalCheckpoint variable. The global checkpoint of the primary is then tracked in the same map as the other shard copies. An alternative is to generalize "localCheckPoints" to just "checkPoints" and put the global checkpoint info into that same map entry used for the local checkpoint tracking. This would also allow that info to be transferred during primary relocation handoff (neat?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by #26666.

return copy;
}

/**
* Computes the global checkpoint based on the given local checkpoints. In case where there are entries preventing the
* computation to happen (for example due to blocking), it returns the fallback value.
Expand Down Expand Up @@ -585,6 +619,7 @@ public synchronized void completeRelocationHandoff() {
lcps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
});
globalCheckpoints.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about removing map entries when replicas fail etc. and are removed by the cluster state (and cleaned from the localCheckpoints map)?

Note that in the above scenario maybeSyncGlobalCheckpoint will execute the sync again and again (also a good test case?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by #26666.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left over from the first version, sorry. I removed this in afb082f.

assert invariant();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.elasticsearch.index.seqno;

import com.carrotsearch.hppc.ObjectLongMap;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.common.collect.Tuple;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left over from the first version, sorry. I removed this in afb082f.

import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ReplicationGroup;
Expand Down Expand Up @@ -131,6 +133,16 @@ public void updateLocalCheckpointForShard(final String allocationId, final long
globalCheckpointTracker.updateLocalCheckpoint(allocationId, checkpoint);
}

/**
* Update the local knowledge of the global checkpoint for the specified allocation ID.
*
* @param allocationId the allocation ID to update the global checkpoint for
* @param globalCheckpoint the global checkpoint
*/
public void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this also be called from RecoverySourceHandler after calling recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint())?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by #26666.

globalCheckpointTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint);
}

/**
* Called when the recovery process for a shard is ready to open the engine on the target shard.
* See {@link GlobalCheckpointTracker#initiateTracking(String)} for details.
Expand Down Expand Up @@ -179,6 +191,17 @@ public long getGlobalCheckpoint() {
return globalCheckpointTracker.getGlobalCheckpoint();
}

/**
* Returns the global checkpoints for all shards.
*
* @param allocationId the allocationId to use for the global checkpoint on the primary
*
* @return the global checkpoints for all shards
*/
public ObjectLongMap<String> getGlobalCheckpoints(final String allocationId) {
return globalCheckpointTracker.getGlobalCheckpoints(allocationId);
}

/**
* Updates the global checkpoint on a replica shard after it has been updated by the primary.
*
Expand Down
Loading