From 5c2572c2bbdb03ff1402a8ce2186eb9270f822e4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 21 Sep 2017 15:34:13 -0400 Subject: [PATCH] Introduce global checkpoint background sync It is the exciting return of the global checkpoint background sync. Long, long ago, in snapshot version far, far away we had and only had a global checkpoint background sync. This sync would fire periodically and send the global checkpoint from the primary shard to the replicas so that they could update their local knowledge of the global checkpoint. Later in time, as we sped ahead towards finalizing the initial version of sequence IDs, we realized that we need the global checkpoint updates to be inline. This means that on a replication operation, the primary shard would piggy back the global checkpoint with the replication operation to the replicas. The replicas would update their local knowledge of the global checkpoint and reply with their local checkpoint. However, this could allow the global checkpoint on the primary to advance again and the replicas would fall behind in their local knowledge of the global checkpoint. If another replication operation never fired, then the replicas would be permanently behind. To account for this, we added one more sync that would fire when the primary shard fell idle. However, this has problems: - the shard idle timer defaults to five minutes, a long time to wait for the replicas to learn of the new global checkpoint - if a replica missed the sync, there was no follow-up sync to catch them up - there is an inherent race condition where the primary shard could fall idle mid-operation (after having sent the replication request to the replicas); in this case, there would never be a background sync after the operation completes - tying the global checkpoint sync to the idle timer was never natural To fix this, we add two additional changes for the global checkpoint to be synced to the replicas. The first is that we add a post-operation sync that only fires if there are no operations in flight and there is a lagging replica. This gives us a chance to sync the global checkpoint to the replicas immediately after an operation so that they are always kept up to date. The second is that we add back a global checkpoint background sync that fires on a timer. This timer fires every thirty seconds, and is not configurable (for simplicity). This background sync is smarter than what we had previously in the sense that it only sends a sync if the global checkpoint on at least one replica is lagging that of the primary. When the timer fires, we can compare the global checkpoint on the primary to its knowledge of the global checkpoint on the replicas and only send a sync if there is a shard behind. Relates #26591 --- build.gradle | 2 +- .../TransportReplicationAction.java | 32 ++- .../replication/TransportWriteAction.java | 2 +- .../org/elasticsearch/index/IndexModule.java | 2 +- .../org/elasticsearch/index/IndexService.java | 103 ++++++++- .../seqno/GlobalCheckpointSyncAction.java | 23 +- .../index/seqno/GlobalCheckpointTracker.java | 17 +- .../index/seqno/SequenceNumbersService.java | 9 +- .../elasticsearch/index/shard/IndexShard.java | 73 +++++- .../elasticsearch/indices/IndicesService.java | 35 +-- .../cluster/IndicesClusterStateService.java | 42 ++-- .../discovery/ClusterDisruptionIT.java | 2 + .../ESIndexLevelReplicationTestCase.java | 8 +- .../index/seqno/GlobalCheckpointSyncIT.java | 210 ++++++++++++++++++ .../index/shard/IndexShardIT.java | 2 +- .../index/shard/IndexShardTests.java | 65 +++++- ...dicesLifecycleListenerSingleNodeTests.java | 2 +- ...actIndicesClusterStateServiceTestCase.java | 3 +- ...ClusterStateServiceRandomUpdatesTests.java | 28 +-- .../elasticsearch/recovery/RelocationIT.java | 79 ++----- .../elasticsearch/backwards/IndexingIT.java | 1 - .../index/shard/IndexShardTestCase.java | 47 ++-- .../elasticsearch/test/ESIntegTestCase.java | 51 +++++ .../test/InternalSettingsPlugin.java | 10 +- 24 files changed, 675 insertions(+), 173 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java diff --git a/build.gradle b/build.gradle index 601debbc3dd14..209773f631b8e 100644 --- a/build.gradle +++ b/build.gradle @@ -175,7 +175,7 @@ task verifyVersions { * after the backport of the backcompat code is complete. */ allprojects { - ext.bwc_tests_enabled = true + ext.bwc_tests_enabled = false } task verifyBwcTestsEnabled { diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 516554d92e8cd..77f7ff1d4460a 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -20,7 +20,9 @@ package org.elasticsearch.action.support.replication; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -55,6 +57,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; @@ -108,12 +111,26 @@ public abstract class TransportReplicationAction< protected final String transportReplicaAction; protected final String transportPrimaryAction; + private final boolean syncGlobalCheckpointAfterOperation; + protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, Supplier replicaRequest, String executor) { + this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, + indexNameExpressionResolver, request, replicaRequest, executor, false); + } + + + protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, IndicesService indicesService, + ThreadPool threadPool, ShardStateAction shardStateAction, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, + Supplier replicaRequest, String executor, + boolean syncGlobalCheckpointAfterOperation) { super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager()); this.transportService = transportService; this.clusterService = clusterService; @@ -126,6 +143,8 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans registerRequestHandlers(actionName, transportService, request, replicaRequest, executor); this.transportOptions = transportOptions(); + + this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation; } protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, @@ -150,7 +169,7 @@ protected void doExecute(Task task, Request request, ActionListener li new ReroutePhase((ReplicationTask) task, request, listener).run(); } - protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) { + protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) { return new ReplicasProxy(primaryTerm); } @@ -359,6 +378,17 @@ private ActionListener createResponseListener(final PrimaryShardRefere return new ActionListener() { @Override public void onResponse(Response response) { + if (syncGlobalCheckpointAfterOperation) { + try { + primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation"); + } catch (final Exception e) { + // only log non-closed exceptions + if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { + logger.info("post-operation global checkpoint sync failed", e); + // intentionally swallow, a missed global checkpoint sync should not fail this operation + } + } + } primaryShardReference.close(); // release shard operation lock before responding to caller setPhase(replicationTask, "finished"); try { diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 31c72108ecf65..ec3dcd94d3084 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -71,7 +71,7 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, Supplier replicaRequest, String executor) { super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, request, replicaRequest, executor); + indexNameExpressionResolver, request, replicaRequest, executor, true); } /** Syncs operation result to the translog or throws a shard not available failure */ diff --git a/core/src/main/java/org/elasticsearch/index/IndexModule.java b/core/src/main/java/org/elasticsearch/index/IndexModule.java index 02b1f84776887..1097071368aab 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/core/src/main/java/org/elasticsearch/index/IndexModule.java @@ -22,7 +22,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -40,6 +39,7 @@ import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.IndexingOperationListener; import org.elasticsearch.index.shard.SearchOperationListener; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.similarity.BM25SimilarityProvider; import org.elasticsearch.index.similarity.SimilarityProvider; import org.elasticsearch.index.similarity.SimilarityService; diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index a4d03929cbb57..ae5ea432855af 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -25,11 +25,15 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; @@ -82,6 +86,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -109,10 +114,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean deleted = new AtomicBoolean(false); private final IndexSettings indexSettings; - private final List indexingOperationListeners; private final List searchOperationListeners; + private final List indexingOperationListeners; private volatile AsyncRefreshTask refreshTask; private volatile AsyncTranslogFSync fsyncTask; + private volatile AsyncGlobalCheckpointTask globalCheckpointTask; // don't convert to Setting<> and register... we only set this in tests and register via a plugin private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval"; @@ -182,11 +188,12 @@ public IndexService( this.engineFactory = engineFactory; // initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE this.searcherWrapper = wrapperFactory.newWrapper(this); - this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners); this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners); + this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners); // kick off async ops for the first shard in this index this.refreshTask = new AsyncRefreshTask(this); this.trimTranslogTask = new AsyncTrimTranslogTask(this); + this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); rescheduleFsyncTask(indexSettings.getTranslogDurability()); } @@ -268,7 +275,15 @@ public synchronized void close(final String reason, boolean delete) throws IOExc } } } finally { - IOUtils.close(bitsetFilterCache, indexCache, indexFieldData, mapperService, refreshTask, fsyncTask, trimTranslogTask); + IOUtils.close( + bitsetFilterCache, + indexCache, + indexFieldData, + mapperService, + refreshTask, + fsyncTask, + trimTranslogTask, + globalCheckpointTask); } } } @@ -293,8 +308,7 @@ private long getAvgShardSizeInBytes() throws IOException { } } - public synchronized IndexShard createShard(ShardRouting routing) throws IOException { - final boolean primary = routing.primary(); + public synchronized IndexShard createShard(ShardRouting routing, Consumer globalCheckpointSyncer) throws IOException { /* * TODO: we execute this in parallel but it's a synced method. Yet, we might * be able to serialize the execution via the cluster state in the future. for now we just @@ -365,7 +379,7 @@ public synchronized IndexShard createShard(ShardRouting routing) throws IOExcept indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier, indexCache, mapperService, similarityService, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer, - searchOperationListeners, indexingOperationListeners); + searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId)); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); @@ -710,6 +724,44 @@ private void maybeTrimTranslog() { } } + private void maybeSyncGlobalCheckpoints() { + for (final IndexShard shard : this.shards.values()) { + if (shard.routingEntry().active() && shard.routingEntry().primary()) { + switch (shard.state()) { + case CLOSED: + case CREATED: + case RECOVERING: + case RELOCATED: + continue; + case POST_RECOVERY: + assert false : "shard " + shard.shardId() + " is in post-recovery but marked as active"; + continue; + case STARTED: + try { + shard.acquirePrimaryOperationPermit( + ActionListener.wrap( + releasable -> { + try (Releasable ignored = releasable) { + shard.maybeSyncGlobalCheckpoint("background"); + } + }, + e -> { + if (!(e instanceof AlreadyClosedException || e instanceof IndexShardClosedException)) { + logger.info("failed to execute background global checkpoint sync", e); + } + }), + ThreadPool.Names.SAME); + } catch (final AlreadyClosedException | IndexShardClosedException e) { + // the shard was closed concurrently, continue + } + continue; + default: + throw new IllegalStateException("unknown state [" + shard.state() + "]"); + } + } + } + } + abstract static class BaseAsyncTask implements Runnable, Closeable { protected final IndexService indexService; protected final ThreadPool threadPool; @@ -877,6 +929,41 @@ public String toString() { } } + // this setting is intentionally not registered, it is only used in tests + public static final Setting GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING = + Setting.timeSetting( + "index.global_checkpoint_sync.interval", + new TimeValue(30, TimeUnit.SECONDS), + new TimeValue(0, TimeUnit.MILLISECONDS), + Property.Dynamic, + Property.IndexScope); + + /** + * Background task that syncs the global checkpoint to replicas. + */ + final class AsyncGlobalCheckpointTask extends BaseAsyncTask { + + AsyncGlobalCheckpointTask(final IndexService indexService) { + // index.global_checkpoint_sync_interval is not a real setting, it is only registered in tests + super(indexService, GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings())); + } + + @Override + protected void runInternal() { + indexService.maybeSyncGlobalCheckpoints(); + } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.GENERIC; + } + + @Override + public String toString() { + return "global_checkpoint_sync"; + } + } + AsyncRefreshTask getRefreshTask() { // for tests return refreshTask; } @@ -885,6 +972,10 @@ AsyncTranslogFSync getFsyncTask() { // for tests return fsyncTask; } + AsyncGlobalCheckpointTask getGlobalCheckpointTask() { + return globalCheckpointTask; + } + /** * Clears the caches for the given shard id if the shard is still allocated on this node */ diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index e6d8b8e8d3ff8..60b61ccefa51c 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -19,7 +19,10 @@ package org.elasticsearch.index.seqno; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationOperation; @@ -34,6 +37,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; @@ -47,7 +51,7 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction< GlobalCheckpointSyncAction.Request, GlobalCheckpointSyncAction.Request, - ReplicationResponse> implements IndexEventListener { + ReplicationResponse> { public static String ACTION_NAME = "indices:admin/seq_no/global_checkpoint_sync"; @@ -73,7 +77,17 @@ public GlobalCheckpointSyncAction( indexNameExpressionResolver, Request::new, Request::new, - ThreadPool.Names.SAME); + ThreadPool.Names.MANAGEMENT); + } + + public void updateGlobalCheckpointForShard(final ShardId shardId) { + execute( + new Request(shardId), + ActionListener.wrap(r -> {}, e -> { + if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { + logger.info(shardId + " global checkpoint sync failed", e); + } + })); } @Override @@ -94,11 +108,6 @@ protected void sendReplicaRequest( } } - @Override - public void onShardInactive(final IndexShard indexShard) { - execute(new Request(indexShard.shardId())); - } - @Override protected PrimaryResult shardOperationOnPrimary( final Request request, final IndexShard indexShard) throws Exception { diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java index 34c731edaaf8b..d2b53aac1a045 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java @@ -209,13 +209,20 @@ public int hashCode() { } } - synchronized ObjectLongMap getGlobalCheckpoints() { + /** + * Get the local knowledge of the global checkpoints for all in-sync allocation IDs. + * + * @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID + */ + synchronized ObjectLongMap getInSyncGlobalCheckpoints() { assert primaryMode; assert handoffInProgress == false; - final ObjectLongMap globalCheckpoints = new ObjectLongHashMap<>(checkpoints.size()); - for (final Map.Entry cps : checkpoints.entrySet()) { - globalCheckpoints.put(cps.getKey(), cps.getValue().globalCheckpoint); - } + final ObjectLongMap globalCheckpoints = new ObjectLongHashMap<>(checkpoints.size()); // upper bound on the size + checkpoints + .entrySet() + .stream() + .filter(e -> e.getValue().inSync) + .forEach(e -> globalCheckpoints.put(e.getKey(), e.getValue().globalCheckpoint)); return globalCheckpoints; } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 760fbe0a5fc07..1c8911a0cd886 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -138,8 +138,13 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpointTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint); } - public ObjectLongMap getGlobalCheckpoints() { - return globalCheckpointTracker.getGlobalCheckpoints(); + /** + * Get the local knowledge of the global checkpoints for all in-sync allocation IDs. + * + * @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID + */ + public ObjectLongMap getInSyncGlobalCheckpoints() { + return globalCheckpointTracker.getInSyncGlobalCheckpoints(); } /** diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f4a771a3b3f4f..edd37aa5c1739 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -156,6 +156,7 @@ import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.elasticsearch.index.mapper.SourceToParse.source; @@ -197,6 +198,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl protected final EngineFactory engineFactory; private final IndexingOperationListener indexingOperationListeners; + private final Runnable globalCheckpointSyncer; + + Runnable getGlobalCheckpointSyncer() { + return globalCheckpointSyncer; + } @Nullable private RecoveryState recoveryState; @@ -233,11 +239,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl */ private final RefreshListeners refreshListeners; - public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardPath path, Store store, - Supplier indexSortSupplier, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService, - @Nullable EngineFactory engineFactory, - IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, ThreadPool threadPool, BigArrays bigArrays, - Engine.Warmer warmer, List searchOperationListener, List listeners) throws IOException { + public IndexShard( + ShardRouting shardRouting, + IndexSettings indexSettings, + ShardPath path, + Store store, + Supplier indexSortSupplier, + IndexCache indexCache, + MapperService mapperService, + SimilarityService similarityService, + @Nullable EngineFactory engineFactory, + IndexEventListener indexEventListener, + IndexSearcherWrapper indexSearcherWrapper, + ThreadPool threadPool, + BigArrays bigArrays, + Engine.Warmer warmer, + List searchOperationListener, + List listeners, + Runnable globalCheckpointSyncer) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); this.shardRouting = shardRouting; @@ -257,6 +276,7 @@ public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardP final List listenersList = new ArrayList<>(listeners); listenersList.add(internalIndexingStats); this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger); + this.globalCheckpointSyncer = globalCheckpointSyncer; final List searchListenersList = new ArrayList<>(searchOperationListener); searchListenersList.add(searchStats); this.searchOperationListener = new SearchOperationListener.CompositeListener(searchListenersList, logger); @@ -1723,11 +1743,6 @@ public void initiateTracking(final String allocationId) { public void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException { verifyPrimary(); getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint); - /* - * We could have blocked so long waiting for the replica to catch up that we fell idle and there will not be a background sync to - * the replica; mark our self as active to force a future background sync. - */ - active.compareAndSet(false, true); } /** @@ -1748,10 +1763,44 @@ public long getGlobalCheckpoint() { return getEngine().seqNoService().getGlobalCheckpoint(); } - public ObjectLongMap getGlobalCheckpoints() { + /** + * Get the local knowledge of the global checkpoints for all in-sync allocation IDs. + * + * @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID + */ + public ObjectLongMap getInSyncGlobalCheckpoints() { + verifyPrimary(); + verifyNotClosed(); + return getEngine().seqNoService().getInSyncGlobalCheckpoints(); + } + + /** + * Syncs the global checkpoint to the replicas if the global checkpoint on at least one replica is behind the global checkpoint on the + * primary. + */ + public void maybeSyncGlobalCheckpoint(final String reason) { verifyPrimary(); verifyNotClosed(); - return getEngine().seqNoService().getGlobalCheckpoints(); + if (state == IndexShardState.RELOCATED) { + return; + } + // only sync if there are not operations in flight + final SeqNoStats stats = getEngine().seqNoService().stats(); + if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint()) { + final ObjectLongMap globalCheckpoints = getInSyncGlobalCheckpoints(); + final String allocationId = routingEntry().allocationId().getId(); + assert globalCheckpoints.containsKey(allocationId); + final long globalCheckpoint = globalCheckpoints.get(allocationId); + final boolean syncNeeded = + StreamSupport + .stream(globalCheckpoints.values().spliterator(), false) + .anyMatch(v -> v.value < globalCheckpoint); + // only sync if there is a shard lagging the primary + if (syncNeeded) { + logger.trace("syncing global checkpoint for [{}]", reason); + globalCheckpointSyncer.run(); + } + } } /** diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index 7370e63e8c3b0..b5503de59e5ac 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -43,7 +43,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -375,12 +374,15 @@ public IndexService indexServiceSafe(Index index) { /** * Creates a new {@link IndexService} for the given metadata. - * @param indexMetaData the index metadata to create the index for - * @param builtInListeners a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with the per-index listeners + * + * @param indexMetaData the index metadata to create the index for + * @param builtInListeners a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with the + * per-index listeners * @throws ResourceAlreadyExistsException if the index already exists. */ @Override - public synchronized IndexService createIndex(IndexMetaData indexMetaData, List builtInListeners) throws IOException { + public synchronized IndexService createIndex( + final IndexMetaData indexMetaData, final List builtInListeners) throws IOException { ensureChangesAllowed(); if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) { throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]"); @@ -399,13 +401,13 @@ public void onStoreClosed(ShardId shardId) { finalListeners.add(onStoreClose); finalListeners.add(oldShardsStats); final IndexService indexService = - createIndexService( - "create index", - indexMetaData, - indicesQueryCache, - indicesFieldDataCache, - finalListeners, - indexingMemoryController); + createIndexService( + "create index", + indexMetaData, + indicesQueryCache, + indicesFieldDataCache, + finalListeners, + indexingMemoryController); boolean success = false; try { indexService.getIndexEventListener().afterIndexCreated(indexService); @@ -423,7 +425,8 @@ public void onStoreClosed(ShardId shardId) { * This creates a new IndexService without registering it */ private synchronized IndexService createIndexService(final String reason, - IndexMetaData indexMetaData, IndicesQueryCache indicesQueryCache, + IndexMetaData indexMetaData, + IndicesQueryCache indicesQueryCache, IndicesFieldDataCache indicesFieldDataCache, List builtInListeners, IndexingOperationListener... indexingOperationListeners) throws IOException { @@ -454,7 +457,8 @@ private synchronized IndexService createIndexService(final String reason, indicesQueryCache, mapperRegistry, indicesFieldDataCache, - namedWriteableRegistry); + namedWriteableRegistry + ); } /** @@ -499,10 +503,11 @@ public synchronized void verifyIndexMetadata(IndexMetaData metaData, IndexMetaDa @Override public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService, PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, - Consumer onShardFailure) throws IOException { + Consumer onShardFailure, + Consumer globalCheckpointSyncer) throws IOException { ensureChangesAllowed(); IndexService indexService = indexService(shardRouting.index()); - IndexShard indexShard = indexService.createShard(shardRouting); + IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, (type, mapping) -> { diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 3c1ee5b841293..5aa8b5f3ee1b3 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -118,35 +118,44 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final boolean sendRefreshMapping; private final List buildInIndexListener; private final PrimaryReplicaSyncer primaryReplicaSyncer; + private final Consumer globalCheckpointSyncer; @Inject - public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService, - ThreadPool threadPool, PeerRecoveryTargetService recoveryTargetService, + public IndicesClusterStateService(Settings settings, + IndicesService indicesService, + ClusterService clusterService, + ThreadPool threadPool, + PeerRecoveryTargetService recoveryTargetService, ShardStateAction shardStateAction, NodeMappingRefreshAction nodeMappingRefreshAction, RepositoriesService repositoriesService, - SearchService searchService, SyncedFlushService syncedFlushService, - PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService, - GlobalCheckpointSyncAction globalCheckpointSyncAction, - PrimaryReplicaSyncer primaryReplicaSyncer) { + SearchService searchService, + SyncedFlushService syncedFlushService, + PeerRecoverySourceService peerRecoverySourceService, + SnapshotShardsService snapshotShardsService, + PrimaryReplicaSyncer primaryReplicaSyncer, + GlobalCheckpointSyncAction globalCheckpointSyncAction) { this(settings, (AllocatedIndices>) indicesService, clusterService, threadPool, recoveryTargetService, shardStateAction, nodeMappingRefreshAction, repositoriesService, searchService, syncedFlushService, peerRecoverySourceService, - snapshotShardsService, globalCheckpointSyncAction, primaryReplicaSyncer); + snapshotShardsService, primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard); } // for tests IndicesClusterStateService(Settings settings, AllocatedIndices> indicesService, ClusterService clusterService, - ThreadPool threadPool, PeerRecoveryTargetService recoveryTargetService, + ThreadPool threadPool, + PeerRecoveryTargetService recoveryTargetService, ShardStateAction shardStateAction, NodeMappingRefreshAction nodeMappingRefreshAction, RepositoriesService repositoriesService, - SearchService searchService, SyncedFlushService syncedFlushService, - PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService, - GlobalCheckpointSyncAction globalCheckpointSyncAction, - PrimaryReplicaSyncer primaryReplicaSyncer) { + SearchService searchService, + SyncedFlushService syncedFlushService, + PeerRecoverySourceService peerRecoverySourceService, + SnapshotShardsService snapshotShardsService, + PrimaryReplicaSyncer primaryReplicaSyncer, + Consumer globalCheckpointSyncer) { super(settings); this.buildInIndexListener = Arrays.asList( @@ -154,8 +163,7 @@ public IndicesClusterStateService(Settings settings, IndicesService indicesServi recoveryTargetService, searchService, syncedFlushService, - snapshotShardsService, - globalCheckpointSyncAction); + snapshotShardsService); this.indicesService = indicesService; this.clusterService = clusterService; this.threadPool = threadPool; @@ -164,6 +172,7 @@ public IndicesClusterStateService(Settings settings, IndicesService indicesServi this.nodeMappingRefreshAction = nodeMappingRefreshAction; this.repositoriesService = repositoriesService; this.primaryReplicaSyncer = primaryReplicaSyncer; + this.globalCheckpointSyncer = globalCheckpointSyncer; this.sendRefreshMapping = this.settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); } @@ -541,7 +550,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR logger.debug("{} creating shard", shardRouting.shardId()); RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode); indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting), - repositoriesService, failedShardHandler); + repositoriesService, failedShardHandler, globalCheckpointSyncer); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); } @@ -830,7 +839,8 @@ U createIndex(IndexMetaData indexMetaData, */ T createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService, PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, - Consumer onShardFailure) throws IOException; + Consumer onShardFailure, + Consumer globalCheckpointSyncer) throws IOException; /** * Returns shard for the specified id if it exists otherwise returns null. diff --git a/core/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/core/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index 38c9bcb72459f..0701d9d10e3ac 100644 --- a/core/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -215,6 +215,8 @@ public void testAckedIndexing() throws Exception { } }, 30, TimeUnit.SECONDS); + assertSeqNos(); + logger.info("done validating (iteration [{}])", iter); } } finally { diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 3faa2da7b4ffe..7e8949cd15fbf 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -141,7 +141,7 @@ protected class ReplicationGroup implements AutoCloseable, Iterable ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); - primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting)); + primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {}); replicas = new ArrayList<>(); this.indexMetaData = indexMetaData; updateAllocationIDsOnPrimary(); @@ -238,7 +238,7 @@ public void startPrimary() throws IOException { public IndexShard addReplica() throws IOException { final ShardRouting replicaRouting = createShardRouting("s" + replicaId.incrementAndGet(), false); final IndexShard replica = - newShard(replicaRouting, indexMetaData, null, getEngineFactory(replicaRouting)); + newShard(replicaRouting, indexMetaData, null, getEngineFactory(replicaRouting), () -> {}); addReplica(replica); return replica; } @@ -259,8 +259,8 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP false, ShardRoutingState.INITIALIZING, RecoverySource.PeerRecoverySource.INSTANCE); - final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null, - getEngineFactory(shardRouting)); + final IndexShard newReplica = + newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}); replicas.add(newReplica); updateAllocationIDsOnPrimary(); return newReplica; diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java new file mode 100644 index 0000000000000..b2c828cb73f0c --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java @@ -0,0 +1,210 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.equalTo; + +public class GlobalCheckpointSyncIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Stream.concat( + super.nodePlugins().stream(), + Stream.of(InternalSettingsPlugin.class, MockTransportService.TestPlugin.class)) + .collect(Collectors.toList()); + } + + public void testPostOperationGlobalCheckpointSync() throws Exception { + // set the sync interval high so it does not execute during this test + runGlobalCheckpointSyncTest(TimeValue.timeValueHours(24), client -> {}, client -> {}); + } + + /* + * This test swallows the post-operation global checkpoint syncs, and then restores the ability to send these requests at the end of the + * test so that a background sync can fire and sync the global checkpoint. + */ + public void testBackgroundGlobalCheckpointSync() throws Exception { + runGlobalCheckpointSyncTest( + TimeValue.timeValueSeconds(randomIntBetween(1, 3)), + client -> { + // prevent global checkpoint syncs between all nodes + final DiscoveryNodes nodes = client.admin().cluster().prepareState().get().getState().getNodes(); + for (final DiscoveryNode node : nodes) { + for (final DiscoveryNode other : nodes) { + if (node == other) { + continue; + } + final MockTransportService senderTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, node.getName()); + final MockTransportService receiverTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, other.getName()); + + senderTransportService.addDelegate(receiverTransportService, + new MockTransportService.DelegateTransport(senderTransportService.original()) { + @Override + protected void sendRequest( + final Connection connection, + final long requestId, + final String action, + final TransportRequest request, + final TransportRequestOptions options) throws IOException { + if ("indices:admin/seq_no/global_checkpoint_sync[r]".equals(action)) { + throw new IllegalStateException("blocking indices:admin/seq_no/global_checkpoint_sync[r]"); + } else { + super.sendRequest(connection, requestId, action, request, options); + } + } + }); + } + } + }, + client -> { + // restore global checkpoint syncs between all nodes + final DiscoveryNodes nodes = client.admin().cluster().prepareState().get().getState().getNodes(); + for (final DiscoveryNode node : nodes) { + for (final DiscoveryNode other : nodes) { + if (node == other) { + continue; + } + final MockTransportService senderTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, node.getName()); + final MockTransportService receiverTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, other.getName()); + senderTransportService.clearRule(receiverTransportService); + } + } + }); + } + + private void runGlobalCheckpointSyncTest( + final TimeValue globalCheckpointSyncInterval, + final Consumer beforeIndexing, + final Consumer afterIndexing) throws Exception { + final int numberOfReplicas = randomIntBetween(1, 4); + internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); + prepareCreate( + "test", + Settings.builder() + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), globalCheckpointSyncInterval) + .put("index.number_of_replicas", numberOfReplicas)) + .get(); + if (randomBoolean()) { + ensureGreen(); + } + + beforeIndexing.accept(client()); + + final int numberOfDocuments = randomIntBetween(0, 256); + + final int numberOfThreads = randomIntBetween(1, 4); + final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); + + // start concurrent indexing threads + final List threads = new ArrayList<>(numberOfThreads); + for (int i = 0; i < numberOfThreads; i++) { + final int index = i; + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + for (int j = 0; j < numberOfDocuments; j++) { + final String id = Integer.toString(index * numberOfDocuments + j); + client().prepareIndex("test", "test", id).setSource("{\"foo\": " + id + "}", XContentType.JSON).get(); + } + try { + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + threads.add(thread); + thread.start(); + } + + // synchronize the start of the threads + barrier.await(); + + // wait for the threads to finish + barrier.await(); + + afterIndexing.accept(client()); + + assertBusy(() -> { + final IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get(); + final IndexStats indexStats = stats.getIndex("test"); + for (final IndexShardStats indexShardStats : indexStats.getIndexShards().values()) { + Optional maybePrimary = + Stream.of(indexShardStats.getShards()) + .filter(s -> s.getShardRouting().active() && s.getShardRouting().primary()) + .findFirst(); + if (!maybePrimary.isPresent()) { + continue; + } + final ShardStats primary = maybePrimary.get(); + final SeqNoStats primarySeqNoStats = primary.getSeqNoStats(); + for (final ShardStats shardStats : indexShardStats) { + final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); + if (seqNoStats == null) { + // the shard is initializing + continue; + } + assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint())); + } + } + }); + + for (final Thread thread : threads) { + thread.join(); + } + } + +} diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 2346ba290ae4e..ed5f31f4ff3da 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -539,7 +539,7 @@ public static final IndexShard newIndexShard(IndexService indexService, IndexSha IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.getIndexSortSupplier(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, - indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners)); + indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners), () -> {}); return newShard; } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b275da702b083..c2864ea68b8e7 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -136,6 +136,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static java.util.Collections.max; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; @@ -711,6 +712,66 @@ private void finish() { closeShards(indexShard); } + public void testGlobalCheckpointSync() throws IOException { + // create the primary shard with a callback that sets a boolean when the global checkpoint sync is invoked + final ShardId shardId = new ShardId("index", "_na_", 0); + final ShardRouting shardRouting = + TestShardRouting.newShardRouting( + shardId, + randomAlphaOfLength(8), + true, + ShardRoutingState.INITIALIZING, + RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE); + final Settings settings = Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + final IndexMetaData.Builder indexMetadata = IndexMetaData.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1); + final AtomicBoolean synced = new AtomicBoolean(); + final IndexShard primaryShard = newShard(shardRouting, indexMetadata.build(), null, null, () -> { synced.set(true); }); + // add a replicas + recoverShardFromStore(primaryShard); + final IndexShard replicaShard = newShard(shardId, false); + recoverReplica(replicaShard, primaryShard); + final int maxSeqNo = randomIntBetween(0, 128); + for (int i = 0; i < maxSeqNo; i++) { + primaryShard.getEngine().seqNoService().generateSeqNo(); + } + final long checkpoint = rarely() ? maxSeqNo - scaledRandomIntBetween(0, maxSeqNo) : maxSeqNo; + + // set up local checkpoints on the shard copies + primaryShard.updateLocalCheckpointForShard(shardRouting.allocationId().getId(), checkpoint); + final int replicaLocalCheckpoint = randomIntBetween(0, Math.toIntExact(checkpoint)); + final String replicaAllocationId = replicaShard.routingEntry().allocationId().getId(); + primaryShard.updateLocalCheckpointForShard(replicaAllocationId, replicaLocalCheckpoint); + + // initialize the local knowledge on the primary of the global checkpoint on the replica shards + final int replicaGlobalCheckpoint = Math.toIntExact(primaryShard.getGlobalCheckpoint()); + primaryShard.updateGlobalCheckpointForShard( + replicaAllocationId, + randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), replicaGlobalCheckpoint)); + + // simulate a background maybe sync; it should only run if the knowledge on the replica of the global checkpoint lags the primary + primaryShard.maybeSyncGlobalCheckpoint("test"); + assertThat( + synced.get(), + equalTo(maxSeqNo == primaryShard.getGlobalCheckpoint() && (replicaGlobalCheckpoint < checkpoint))); + + // simulate that the background sync advanced the global checkpoint on the replica + primaryShard.updateGlobalCheckpointForShard(replicaAllocationId, primaryShard.getGlobalCheckpoint()); + + // reset our boolean so that we can assert after another simulated maybe sync + synced.set(false); + + primaryShard.maybeSyncGlobalCheckpoint("test"); + + // this time there should not be a sync since all the replica copies are caught up with the primary + assertFalse(synced.get()); + + closeShards(replicaShard, primaryShard); + } + public void testRestoreLocalCheckpointTrackerFromTranslogOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); final int operations = 1024 - scaledRandomIntBetween(0, 1024); @@ -1678,7 +1739,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { closeShards(shard); IndexShard newShard = newShard( ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE), - shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null); + shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {}); recoverShardFromStore(newShard); @@ -1824,7 +1885,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { closeShards(shard); IndexShard newShard = newShard( ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE), - shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null); + shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {}); recoverShardFromStore(newShard); diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 0dc760d63bfe7..787c6c815dc52 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -130,7 +130,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting = newRouting.moveToUnassigned(unassignedInfo) .updateUnassigned(unassignedInfo, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE); newRouting = ShardRoutingHelper.initialize(newRouting, nodeId); - IndexShard shard = index.createShard(newRouting); + IndexShard shard = index.createShard(newRouting, s -> {}); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 208e7443c7daf..35bbc497838f2 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -226,7 +226,8 @@ public MockIndexShard createShard(ShardRouting shardRouting, RecoveryState recov PeerRecoveryTargetService recoveryTargetService, PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, - Consumer onShardFailure) throws IOException { + Consumer onShardFailure, + Consumer globalCheckpointSyncer) throws IOException { failRandomly(); MockIndexService indexService = indexService(recoveryState.getShardId().getIndex()); MockIndexShard indexShard = indexService.createShard(shardRouting); diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index a356693213f35..bc5a5b95b958a 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -410,20 +410,20 @@ private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNod final ShardStateAction shardStateAction = mock(ShardStateAction.class); final PrimaryReplicaSyncer primaryReplicaSyncer = mock(PrimaryReplicaSyncer.class); return new IndicesClusterStateService( - settings, - indicesService, - clusterService, - threadPool, - recoveryTargetService, - shardStateAction, - null, - repositoriesService, - null, - null, - null, - null, - null, - primaryReplicaSyncer); + settings, + indicesService, + clusterService, + threadPool, + recoveryTargetService, + shardStateAction, + null, + repositoriesService, + null, + null, + null, + null, + primaryReplicaSyncer, + s -> {}); } private class RecordingIndicesService extends MockIndicesService { diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 04e1a846bd64f..1c7032fa02e87 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -20,22 +20,16 @@ package org.elasticsearch.recovery; import com.carrotsearch.hppc.IntHashSet; -import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.hppc.procedures.IntProcedure; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.util.English; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.indices.stats.IndexShardStats; -import org.elasticsearch.action.admin.indices.stats.IndexStats; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; @@ -47,13 +41,10 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.seqno.SeqNoStats; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; import org.elasticsearch.plugins.Plugin; @@ -63,6 +54,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.MockIndexEventListener; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; @@ -81,7 +73,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; @@ -104,48 +95,13 @@ public class RelocationIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class, MockIndexEventListener.TestPlugin.class); + return Arrays.asList(InternalSettingsPlugin.class, MockTransportService.TestPlugin.class, MockIndexEventListener.TestPlugin.class); } @Override protected void beforeIndexDeletion() throws Exception { super.beforeIndexDeletion(); - assertBusy(() -> { - IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get(); - for (IndexStats indexStats : stats.getIndices().values()) { - for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) { - Optional maybePrimary = Stream.of(indexShardStats.getShards()) - .filter(s -> s.getShardRouting().active() && s.getShardRouting().primary()) - .findFirst(); - if (maybePrimary.isPresent() == false) { - continue; - } - ShardStats primary = maybePrimary.get(); - final SeqNoStats primarySeqNoStats = primary.getSeqNoStats(); - final ShardRouting primaryShardRouting = primary.getShardRouting(); - assertThat(primaryShardRouting + " should have set the global checkpoint", - primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO))); - final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId()); - final IndicesService indicesService = - internalCluster().getInstance(IndicesService.class, node.getName()); - final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId()); - final ObjectLongMap globalCheckpoints = indexShard.getGlobalCheckpoints(); - for (ShardStats shardStats : indexShardStats) { - final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); - assertThat(shardStats.getShardRouting() + " local checkpoint mismatch", - seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint())); - assertThat(shardStats.getShardRouting() + " global checkpoint mismatch", - seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint())); - assertThat(shardStats.getShardRouting() + " max seq no mismatch", - seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo())); - // the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard - assertThat( - seqNoStats.getGlobalCheckpoint(), - equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId()))); - } - } - } - }); + assertSeqNos(); } public void testSimpleRelocationNoIndexing() { @@ -301,11 +257,14 @@ public void testRelocationWhileRefreshing() throws Exception { nodes[0] = internalCluster().startNode(); logger.info("--> creating test index ..."); - prepareCreate("test", Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", numberOfReplicas) - .put("index.refresh_interval", -1) // we want to control refreshes c - ).get(); + prepareCreate( + "test", + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", numberOfReplicas) + .put("index.refresh_interval", -1) // we want to control refreshes + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")) + .get(); for (int i = 1; i < numberOfNodes; i++) { logger.info("--> starting [node_{}] ...", i); @@ -383,9 +342,6 @@ public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardSt } - // refresh is a replication action so this forces a global checkpoint sync which is needed as these are asserted on in tear down - client().admin().indices().prepareRefresh("test").get(); - } public void testCancellationCleansTempFiles() throws Exception { @@ -481,11 +437,12 @@ public void testIndexAndRelocateConcurrently() throws ExecutionException, Interr logger.info("red nodes: {}", (Object)redNodes); ensureStableCluster(halfNodes * 2); - assertAcked(prepareCreate("test", Settings.builder() - .put("index.routing.allocation.exclude.color", "blue") - .put(indexSettings()) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1)) - )); + final Settings.Builder settings = Settings.builder() + .put("index.routing.allocation.exclude.color", "blue") + .put(indexSettings()) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1)) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms"); + assertAcked(prepareCreate("test", settings)); assertAllShardsOnNodes("test", redNodes); int numDocs = randomIntBetween(100, 150); ArrayList ids = new ArrayList<>(); @@ -526,8 +483,6 @@ public void testIndexAndRelocateConcurrently() throws ExecutionException, Interr assertSearchHits(afterRelocation, ids.toArray(new String[ids.size()])); } - // refresh is a replication action so this forces a global checkpoint sync which is needed as these are asserted on in tear down - client().admin().indices().prepareRefresh("test").get(); } class RecoveryCorruption extends MockTransportService.DelegateTransport { diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java index 1bbc53cf8aeb9..c9be8214cbeed 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -236,7 +236,6 @@ public void testSeqNoCheckpoints() throws Exception { logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary); numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterMovingPrimary); numDocs += numberOfDocsAfterMovingPrimary; - assertOK(client().performRequest("POST", index + "/_refresh")); // this forces a global checkpoint sync assertSeqNoOnShards(index, nodes, numDocsOnNewPrimary, newNodeClient); /* * Dropping the number of replicas to zero, and then increasing it to one triggers a recovery thus exercising any BWC-logic in diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 00ae39b6b356f..a448444edce1c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -160,7 +160,9 @@ protected IndexShard newShard(boolean primary) throws IOException { * @param shardRouting the {@link ShardRouting} to use for this shard * @param listeners an optional set of listeners to add to the shard */ - protected IndexShard newShard(ShardRouting shardRouting, IndexingOperationListener... listeners) throws IOException { + protected IndexShard newShard( + final ShardRouting shardRouting, + final IndexingOperationListener... listeners) throws IOException { assert shardRouting.initializing() : shardRouting; Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) @@ -197,9 +199,7 @@ protected IndexShard newShard(ShardId shardId, boolean primary, IndexingOperatio */ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, IndexMetaData indexMetaData, @Nullable IndexSearcherWrapper searcherWrapper) throws IOException { - ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING, - primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); - return newShard(shardRouting, indexMetaData, searcherWrapper, null); + return newShard(shardId, primary, nodeId, indexMetaData, searcherWrapper, () -> {}); } /** @@ -211,11 +211,10 @@ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, I * (ready to recover from another shard) */ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, IndexMetaData indexMetaData, - Runnable globalCheckpointSyncer, - @Nullable IndexSearcherWrapper searcherWrapper) throws IOException { + @Nullable IndexSearcherWrapper searcherWrapper, Runnable globalCheckpointSyncer) throws IOException { ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING, primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); - return newShard(shardRouting, indexMetaData, searcherWrapper, null); + return newShard(shardRouting, indexMetaData, searcherWrapper, null, globalCheckpointSyncer); } @@ -229,40 +228,45 @@ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, I */ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, IndexingOperationListener... listeners) throws IOException { - return newShard(routing, indexMetaData, null, null, listeners); + return newShard(routing, indexMetaData, null, null, () -> {}, listeners); } /** * creates a new initializing shard. The shard will will be put in its proper path under the * current node id the shard is assigned to. - * @param routing shard routing to use + * @param routing shard routing to use * @param indexMetaData indexMetaData for the shard, including any mapping * @param indexSearcherWrapper an optional wrapper to be used during searchers + * @param globalCheckpointSyncer callback for syncing global checkpoints * @param listeners an optional set of listeners to add to the shard */ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, @Nullable IndexSearcherWrapper indexSearcherWrapper, @Nullable EngineFactory engineFactory, + Runnable globalCheckpointSyncer, IndexingOperationListener... listeners) throws IOException { // add node id as name to settings for proper logging final ShardId shardId = routing.shardId(); final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); - return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, listeners); + return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, globalCheckpointSyncer, listeners); } /** * creates a new initializing shard. - * @param routing shard routing to use - * @param shardPath path to use for shard data - * @param indexMetaData indexMetaData for the shard, including any mapping - * @param indexSearcherWrapper an optional wrapper to be used during searchers - * @param listeners an optional set of listeners to add to the shard + * + * @param routing shard routing to use + * @param shardPath path to use for shard data + * @param indexMetaData indexMetaData for the shard, including any mapping + * @param indexSearcherWrapper an optional wrapper to be used during searchers + * @param globalCheckpointSyncer callback for syncing global checkpoints + * @param listeners an optional set of listeners to add to the shard */ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData, @Nullable IndexSearcherWrapper indexSearcherWrapper, @Nullable EngineFactory engineFactory, + Runnable globalCheckpointSyncer, IndexingOperationListener... listeners) throws IOException { final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings); @@ -279,9 +283,9 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe }; final Engine.Warmer warmer = searcher -> { }; - indexShard = new IndexShard(routing, indexSettings, shardPath, store, () ->null, indexCache, mapperService, similarityService, + indexShard = new IndexShard(routing, indexSettings, shardPath, store, () -> null, indexCache, mapperService, similarityService, engineFactory, indexEventListener, indexSearcherWrapper, threadPool, - BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners)); + BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer); success = true; } finally { if (success == false) { @@ -311,7 +315,14 @@ protected IndexShard reinitShard(IndexShard current, IndexingOperationListener.. */ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException { closeShards(current); - return newShard(routing, current.shardPath(), current.indexSettings().getIndexMetaData(), null, current.engineFactory, listeners); + return newShard( + routing, + current.shardPath(), + current.indexSettings().getIndexMetaData(), + null, + current.engineFactory, + current.getGlobalCheckpointSyncer(), + listeners); } /** diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 49ed6ab25afc9..50df2d428a18a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -19,6 +19,7 @@ package org.elasticsearch.test; +import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.randomizedtesting.RandomizedContext; import com.carrotsearch.randomizedtesting.annotations.TestGroup; import com.carrotsearch.randomizedtesting.generators.RandomNumbers; @@ -49,6 +50,10 @@ import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; import org.elasticsearch.action.admin.indices.segments.ShardSegments; +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -69,6 +74,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -115,6 +121,9 @@ import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.IndicesRequestCache; @@ -163,6 +172,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; @@ -193,6 +203,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.startsWith; @@ -2196,4 +2207,44 @@ public static Index resolveIndex(String index) { String uuid = getIndexResponse.getSettings().get(index).get(IndexMetaData.SETTING_INDEX_UUID); return new Index(index, uuid); } + + protected void assertSeqNos() throws Exception { + assertBusy(() -> { + IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get(); + for (IndexStats indexStats : stats.getIndices().values()) { + for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) { + Optional maybePrimary = Stream.of(indexShardStats.getShards()) + .filter(s -> s.getShardRouting().active() && s.getShardRouting().primary()) + .findFirst(); + if (maybePrimary.isPresent() == false) { + continue; + } + ShardStats primary = maybePrimary.get(); + final SeqNoStats primarySeqNoStats = primary.getSeqNoStats(); + final ShardRouting primaryShardRouting = primary.getShardRouting(); + assertThat(primaryShardRouting + " should have set the global checkpoint", + primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO))); + final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId()); + final IndicesService indicesService = + internalCluster().getInstance(IndicesService.class, node.getName()); + final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId()); + final ObjectLongMap globalCheckpoints = indexShard.getInSyncGlobalCheckpoints(); + for (ShardStats shardStats : indexShardStats) { + final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); + assertThat(shardStats.getShardRouting() + " local checkpoint mismatch", + seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint())); + assertThat(shardStats.getShardRouting() + " global checkpoint mismatch", + seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint())); + assertThat(shardStats.getShardRouting() + " max seq no mismatch", + seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo())); + // the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard + assertThat( + seqNoStats.getGlobalCheckpoint(), + equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId()))); + } + } + } + }); + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java index 12920a5f1504e..e1c555b811064 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexService; import org.elasticsearch.plugins.Plugin; import java.util.Arrays; @@ -44,7 +45,12 @@ public final class InternalSettingsPlugin extends Plugin { @Override public List> getSettings() { - return Arrays.asList(VERSION_CREATED, MERGE_ENABLED, - INDEX_CREATION_DATE_SETTING, PROVIDED_NAME_SETTING, TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING); + return Arrays.asList( + VERSION_CREATED, + MERGE_ENABLED, + INDEX_CREATION_DATE_SETTING, + PROVIDED_NAME_SETTING, + TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, + IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING); } }