-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce global checkpoint background sync #26591
Changes from 3 commits
c0e7443
511f96e
e1dc814
86ddf79
e0657a7
b79b13f
f7a76cd
3881d76
89bbf84
1082a31
15453b4
aa0c62c
721f725
9bc5155
63e9d80
6ade720
354df1c
79742ea
d9fc19d
afb082f
9bf875e
007d5c4
cb9373b
7e6d1bf
f7295ac
f237d88
e88b92c
030156d
0343372
cf4e67b
f82df30
507806b
530addd
f3b04dc
5030a77
b640b10
cccdec6
d43f794
26e4c76
b80b728
b8adcce
c041ea2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -82,6 +82,7 @@ | |
import java.util.concurrent.ScheduledFuture; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.Consumer; | ||
import java.util.function.LongSupplier; | ||
import java.util.function.Supplier; | ||
|
||
|
@@ -109,10 +110,12 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust | |
private final AtomicBoolean closed = new AtomicBoolean(false); | ||
private final AtomicBoolean deleted = new AtomicBoolean(false); | ||
private final IndexSettings indexSettings; | ||
private final List<IndexingOperationListener> indexingOperationListeners; | ||
private final List<SearchOperationListener> searchOperationListeners; | ||
private final List<IndexingOperationListener> indexingOperationListeners; | ||
private final Consumer<ShardId> globalCheckpointSyncer; | ||
private volatile AsyncRefreshTask refreshTask; | ||
private volatile AsyncTranslogFSync fsyncTask; | ||
private volatile AsyncGlobalCheckpointTask globalCheckpointTask; | ||
|
||
// don't convert to Setting<> and register... we only set this in tests and register via a plugin | ||
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval"; | ||
|
@@ -145,7 +148,8 @@ public IndexService( | |
IndicesFieldDataCache indicesFieldDataCache, | ||
List<SearchOperationListener> searchOperationListeners, | ||
List<IndexingOperationListener> indexingOperationListeners, | ||
NamedWriteableRegistry namedWriteableRegistry) throws IOException { | ||
NamedWriteableRegistry namedWriteableRegistry, | ||
Consumer<ShardId> globalCheckpointSyncer) throws IOException { | ||
super(indexSettings); | ||
this.indexSettings = indexSettings; | ||
this.xContentRegistry = xContentRegistry; | ||
|
@@ -182,11 +186,13 @@ public IndexService( | |
this.engineFactory = engineFactory; | ||
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE | ||
this.searcherWrapper = wrapperFactory.newWrapper(this); | ||
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners); | ||
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners); | ||
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners); | ||
this.globalCheckpointSyncer = globalCheckpointSyncer; | ||
// kick off async ops for the first shard in this index | ||
this.refreshTask = new AsyncRefreshTask(this); | ||
this.trimTranslogTask = new AsyncTrimTranslogTask(this); | ||
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); | ||
rescheduleFsyncTask(indexSettings.getTranslogDurability()); | ||
} | ||
|
||
|
@@ -268,7 +274,15 @@ public synchronized void close(final String reason, boolean delete) throws IOExc | |
} | ||
} | ||
} finally { | ||
IOUtils.close(bitsetFilterCache, indexCache, indexFieldData, mapperService, refreshTask, fsyncTask, trimTranslogTask); | ||
IOUtils.close( | ||
bitsetFilterCache, | ||
indexCache, | ||
indexFieldData, | ||
mapperService, | ||
refreshTask, | ||
fsyncTask, | ||
trimTranslogTask, | ||
globalCheckpointTask); | ||
} | ||
} | ||
} | ||
|
@@ -365,7 +379,7 @@ public synchronized IndexShard createShard(ShardRouting routing) throws IOExcept | |
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier, | ||
indexCache, mapperService, similarityService, engineFactory, | ||
eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer, | ||
searchOperationListeners, indexingOperationListeners); | ||
searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId)); | ||
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); | ||
eventListener.afterIndexShardCreated(indexShard); | ||
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); | ||
|
@@ -710,6 +724,30 @@ private void maybeTrimTranslog() { | |
} | ||
} | ||
|
||
private void syncGlobalCheckpoints() { | ||
for (final IndexShard shard : this.shards.values()) { | ||
if (shard.routingEntry().active() && shard.routingEntry().primary()) { | ||
switch (shard.state()) { | ||
case CREATED: | ||
case RECOVERING: | ||
case CLOSED: | ||
continue; | ||
case POST_RECOVERY: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. routing entry cannot be active while shard state is POST_RECOVERY ;-) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed in 9bf875e. |
||
case STARTED: | ||
case RELOCATED: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in case its relocated, we won't get the operation permit, so I think we can just skip it (i.e. treat it same way as CLOSED). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed in 9bf875e. |
||
try { | ||
shard.maybeSyncGlobalCheckpoint(); | ||
} catch (final AlreadyClosedException | IndexShardClosedException e) { | ||
// the shard was closed concurrently, continue | ||
} | ||
continue; | ||
default: | ||
throw new IllegalStateException("unknown state [" + shard.state() + "]"); | ||
} | ||
} | ||
} | ||
} | ||
|
||
abstract static class BaseAsyncTask implements Runnable, Closeable { | ||
protected final IndexService indexService; | ||
protected final ThreadPool threadPool; | ||
|
@@ -877,6 +915,39 @@ public String toString() { | |
} | ||
} | ||
|
||
private static final TimeValue GLOBAL_CHECKPOINT_SYNC_INTERVAL = TimeValue.timeValueSeconds(30); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of having this constant here, I would put the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, I pushed 7e6d1bf. |
||
|
||
/** | ||
* Background task that syncs the global checkpoint to replicas. | ||
*/ | ||
final class AsyncGlobalCheckpointTask extends BaseAsyncTask { | ||
|
||
AsyncGlobalCheckpointTask(final IndexService indexService) { | ||
// index.global_checkpoint_sync_interval is not a real setting, it is only registered in tests | ||
super( | ||
indexService, | ||
indexService | ||
.getIndexSettings() | ||
.getSettings() | ||
.getAsTime("index.global_checkpoint_sync.interval", GLOBAL_CHECKPOINT_SYNC_INTERVAL)); | ||
} | ||
|
||
@Override | ||
protected void runInternal() { | ||
indexService.syncGlobalCheckpoints(); | ||
} | ||
|
||
@Override | ||
protected String getThreadPool() { | ||
return ThreadPool.Names.GENERIC; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "global_checkpoint_sync"; | ||
} | ||
} | ||
|
||
AsyncRefreshTask getRefreshTask() { // for tests | ||
return refreshTask; | ||
} | ||
|
@@ -885,6 +956,10 @@ AsyncTranslogFSync getFsyncTask() { // for tests | |
return fsyncTask; | ||
} | ||
|
||
AsyncGlobalCheckpointTask getGlobalCheckpointTask() { | ||
return globalCheckpointTask; | ||
} | ||
|
||
/** | ||
* Clears the caches for the given shard id if the shard is still allocated on this node | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,7 +47,7 @@ | |
public class GlobalCheckpointSyncAction extends TransportReplicationAction< | ||
GlobalCheckpointSyncAction.Request, | ||
GlobalCheckpointSyncAction.Request, | ||
ReplicationResponse> implements IndexEventListener { | ||
ReplicationResponse> { | ||
|
||
public static String ACTION_NAME = "indices:admin/seq_no/global_checkpoint_sync"; | ||
|
||
|
@@ -76,6 +76,10 @@ public GlobalCheckpointSyncAction( | |
ThreadPool.Names.SAME); | ||
} | ||
|
||
public void updateGlobalCheckpointForShard(final ShardId shardId) { | ||
execute(new Request(shardId)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. execute returns a future, which we just ignore here. This means that we won't be able to log information if a gcp sync failed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 030156d. |
||
} | ||
|
||
@Override | ||
protected ReplicationResponse newResponseInstance() { | ||
return new ReplicationResponse(); | ||
|
@@ -93,11 +97,6 @@ protected void sendReplicaRequest( | |
} | ||
} | ||
|
||
@Override | ||
public void onShardInactive(final IndexShard indexShard) { | ||
execute(new Request(indexShard.shardId())); | ||
} | ||
|
||
@Override | ||
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary( | ||
final Request request, final IndexShard indexShard) throws Exception { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,8 @@ | |
|
||
package org.elasticsearch.index.seqno; | ||
|
||
import com.carrotsearch.hppc.ObjectLongHashMap; | ||
import com.carrotsearch.hppc.ObjectLongMap; | ||
import org.elasticsearch.cluster.routing.AllocationId; | ||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; | ||
import org.elasticsearch.cluster.routing.ShardRouting; | ||
|
@@ -496,6 +498,38 @@ public synchronized void updateLocalCheckpoint(final String allocationId, final | |
assert invariant(); | ||
} | ||
|
||
private ObjectLongMap<String> globalCheckpoints = new ObjectLongHashMap<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Left over from the first version, sorry. I removed this in afb082f. |
||
|
||
/** | ||
* Update the local knowledge of the global checkpoint for the specified allocation ID. | ||
* | ||
* @param allocationId the allocation ID to update the global checkpoint for | ||
* @param globalCheckpoint the global checkpoint | ||
*/ | ||
synchronized void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) { | ||
assert primaryMode; | ||
assert handoffInProgress == false; | ||
final long current = globalCheckpoints.getOrDefault(allocationId, SequenceNumbers.UNASSIGNED_SEQ_NO); | ||
if (globalCheckpoint > current) { | ||
globalCheckpoints.put(allocationId, globalCheckpoint); | ||
} | ||
} | ||
|
||
/** | ||
* Returns the global checkpoints for all shards. | ||
* | ||
* @param allocationId the allocationId to use for the global checkpoint on the primary | ||
* | ||
* @return the global checkpoints for all shards | ||
*/ | ||
synchronized ObjectLongMap<String> getGlobalCheckpoints(final String allocationId) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can set the allocation id of the primary when creating the tracker in the constructor (we then also don't need to pass it anymore to the activatePrimaryMode method). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed by #26630. |
||
assert primaryMode; | ||
assert handoffInProgress == false; | ||
final ObjectLongMap<String> copy = new ObjectLongHashMap<>(globalCheckpoints); | ||
copy.put(allocationId, globalCheckpoint); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with the above suggestion (allocation id of primary set during construction), we can just have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed by #26666. |
||
return copy; | ||
} | ||
|
||
/** | ||
* Computes the global checkpoint based on the given local checkpoints. In case where there are entries preventing the | ||
* computation to happen (for example due to blocking), it returns the fallback value. | ||
|
@@ -585,6 +619,7 @@ public synchronized void completeRelocationHandoff() { | |
lcps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; | ||
} | ||
}); | ||
globalCheckpoints.clear(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about removing map entries when replicas fail etc. and are removed by the cluster state (and cleaned from the localCheckpoints map)? Note that in the above scenario There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed by #26666. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Left over from the first version, sorry. I removed this in afb082f. |
||
assert invariant(); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,9 @@ | |
|
||
package org.elasticsearch.index.seqno; | ||
|
||
import com.carrotsearch.hppc.ObjectLongMap; | ||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; | ||
import org.elasticsearch.common.collect.Tuple; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. revert There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Left over from the first version, sorry. I removed this in afb082f. |
||
import org.elasticsearch.index.IndexSettings; | ||
import org.elasticsearch.index.shard.AbstractIndexShardComponent; | ||
import org.elasticsearch.index.shard.ReplicationGroup; | ||
|
@@ -131,6 +133,16 @@ public void updateLocalCheckpointForShard(final String allocationId, final long | |
globalCheckpointTracker.updateLocalCheckpoint(allocationId, checkpoint); | ||
} | ||
|
||
/** | ||
* Update the local knowledge of the global checkpoint for the specified allocation ID. | ||
* | ||
* @param allocationId the allocation ID to update the global checkpoint for | ||
* @param globalCheckpoint the global checkpoint | ||
*/ | ||
public void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this also be called from RecoverySourceHandler after calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed by #26666. |
||
globalCheckpointTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint); | ||
} | ||
|
||
/** | ||
* Called when the recovery process for a shard is ready to open the engine on the target shard. | ||
* See {@link GlobalCheckpointTracker#initiateTracking(String)} for details. | ||
|
@@ -179,6 +191,17 @@ public long getGlobalCheckpoint() { | |
return globalCheckpointTracker.getGlobalCheckpoint(); | ||
} | ||
|
||
/** | ||
* Returns the global checkpoints for all shards. | ||
* | ||
* @param allocationId the allocationId to use for the global checkpoint on the primary | ||
* | ||
* @return the global checkpoints for all shards | ||
*/ | ||
public ObjectLongMap<String> getGlobalCheckpoints(final String allocationId) { | ||
return globalCheckpointTracker.getGlobalCheckpoints(allocationId); | ||
} | ||
|
||
/** | ||
* Updates the global checkpoint on a replica shard after it has been updated by the primary. | ||
* | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this and remove all changes to IndicesService.createIndex etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 26e4c76.