Skip to content

Commit

Permalink
Introduce global checkpoint background sync
Browse files Browse the repository at this point in the history
It is the exciting return of the global checkpoint background
sync. Long, long ago, in snapshot version far, far away we had and only
had a global checkpoint background sync. This sync would fire
periodically and send the global checkpoint from the primary shard to
the replicas so that they could update their local knowledge of the
global checkpoint. Later in time, as we sped ahead towards finalizing
the initial version of sequence IDs, we realized that we need the global
checkpoint updates to be inline. This means that on a replication
operation, the primary shard would piggy back the global checkpoint with
the replication operation to the replicas. The replicas would update
their local knowledge of the global checkpoint and reply with their
local checkpoint. However, this could allow the global checkpoint on the
primary to advance again and the replicas would fall behind in their
local knowledge of the global checkpoint. If another replication
operation never fired, then the replicas would be permanently behind. To
account for this, we added one more sync that would fire when the
primary shard fell idle. However, this has problems:
 - the shard idle timer defaults to five minutes, a long time to wait
   for the replicas to learn of the new global checkpoint
 - if a replica missed the sync, there was no follow-up sync to catch
   them up
 - there is an inherent race condition where the primary shard could
   fall idle mid-operation (after having sent the replication request to
   the replicas); in this case, there would never be a background sync
   after the operation completes
 - tying the global checkpoint sync to the idle timer was never natural

To fix this, we add two additional changes for the global checkpoint to
be synced to the replicas. The first is that we add a post-operation
sync that only fires if there are no operations in flight and there is a
lagging replica. This gives us a chance to sync the global checkpoint to
the replicas immediately after an operation so that they are always kept
up to date. The second is that we add back a global checkpoint
background sync that fires on a timer. This timer fires every thirty
seconds, and is not configurable (for simplicity). This background sync
is smarter than what we had previously in the sense that it only sends a
sync if the global checkpoint on at least one replica is lagging that of
the primary. When the timer fires, we can compare the global checkpoint
on the primary to its knowledge of the global checkpoint on the replicas
and only send a sync if there is a shard behind.

Relates #26591
  • Loading branch information
jasontedor committed Sep 21, 2017
1 parent cab0023 commit 5c2572c
Show file tree
Hide file tree
Showing 24 changed files with 675 additions and 173 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/
allprojects {
ext.bwc_tests_enabled = true
ext.bwc_tests_enabled = false
}

task verifyBwcTestsEnabled {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.elasticsearch.action.support.replication;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
Expand Down Expand Up @@ -55,6 +57,7 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -108,12 +111,26 @@ public abstract class TransportReplicationAction<
protected final String transportReplicaAction;
protected final String transportPrimaryAction;

private final boolean syncGlobalCheckpointAfterOperation;

protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor, false);
}


protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor,
boolean syncGlobalCheckpointAfterOperation) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
this.transportService = transportService;
this.clusterService = clusterService;
Expand All @@ -126,6 +143,8 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
registerRequestHandlers(actionName, transportService, request, replicaRequest, executor);

this.transportOptions = transportOptions();

this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
}

protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
Expand All @@ -150,7 +169,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
new ReroutePhase((ReplicationTask) task, request, listener).run();
}

protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
protected ReplicationOperation.Replicas<ReplicaRequest> newReplicasProxy(long primaryTerm) {
return new ReplicasProxy(primaryTerm);
}

Expand Down Expand Up @@ -359,6 +378,17 @@ private ActionListener<Response> createResponseListener(final PrimaryShardRefere
return new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
if (syncGlobalCheckpointAfterOperation) {
try {
primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation");
} catch (final Exception e) {
// only log non-closed exceptions
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
logger.info("post-operation global checkpoint sync failed", e);
// intentionally swallow, a missed global checkpoint sync should not fail this operation
}
}
}
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor);
indexNameExpressionResolver, request, replicaRequest, executor, true);
}

/** Syncs operation result to the translog or throws a shard not available failure */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand All @@ -40,6 +39,7 @@
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.BM25SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService;
Expand Down
103 changes: 97 additions & 6 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
Expand Down Expand Up @@ -82,6 +86,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -109,10 +114,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean deleted = new AtomicBoolean(false);
private final IndexSettings indexSettings;
private final List<IndexingOperationListener> indexingOperationListeners;
private final List<SearchOperationListener> searchOperationListeners;
private final List<IndexingOperationListener> indexingOperationListeners;
private volatile AsyncRefreshTask refreshTask;
private volatile AsyncTranslogFSync fsyncTask;
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;

// don't convert to Setting<> and register... we only set this in tests and register via a plugin
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
Expand Down Expand Up @@ -182,11 +188,12 @@ public IndexService(
this.engineFactory = engineFactory;
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this);
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
// kick off async ops for the first shard in this index
this.refreshTask = new AsyncRefreshTask(this);
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
rescheduleFsyncTask(indexSettings.getTranslogDurability());
}

Expand Down Expand Up @@ -268,7 +275,15 @@ public synchronized void close(final String reason, boolean delete) throws IOExc
}
}
} finally {
IOUtils.close(bitsetFilterCache, indexCache, indexFieldData, mapperService, refreshTask, fsyncTask, trimTranslogTask);
IOUtils.close(
bitsetFilterCache,
indexCache,
indexFieldData,
mapperService,
refreshTask,
fsyncTask,
trimTranslogTask,
globalCheckpointTask);
}
}
}
Expand All @@ -293,8 +308,7 @@ private long getAvgShardSizeInBytes() throws IOException {
}
}

public synchronized IndexShard createShard(ShardRouting routing) throws IOException {
final boolean primary = routing.primary();
public synchronized IndexShard createShard(ShardRouting routing, Consumer<ShardId> globalCheckpointSyncer) throws IOException {
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just
Expand Down Expand Up @@ -365,7 +379,7 @@ public synchronized IndexShard createShard(ShardRouting routing) throws IOExcept
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier,
indexCache, mapperService, similarityService, engineFactory,
eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
searchOperationListeners, indexingOperationListeners);
searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId));
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
Expand Down Expand Up @@ -710,6 +724,44 @@ private void maybeTrimTranslog() {
}
}

private void maybeSyncGlobalCheckpoints() {
for (final IndexShard shard : this.shards.values()) {
if (shard.routingEntry().active() && shard.routingEntry().primary()) {
switch (shard.state()) {
case CLOSED:
case CREATED:
case RECOVERING:
case RELOCATED:
continue;
case POST_RECOVERY:
assert false : "shard " + shard.shardId() + " is in post-recovery but marked as active";
continue;
case STARTED:
try {
shard.acquirePrimaryOperationPermit(
ActionListener.wrap(
releasable -> {
try (Releasable ignored = releasable) {
shard.maybeSyncGlobalCheckpoint("background");
}
},
e -> {
if (!(e instanceof AlreadyClosedException || e instanceof IndexShardClosedException)) {
logger.info("failed to execute background global checkpoint sync", e);
}
}),
ThreadPool.Names.SAME);
} catch (final AlreadyClosedException | IndexShardClosedException e) {
// the shard was closed concurrently, continue
}
continue;
default:
throw new IllegalStateException("unknown state [" + shard.state() + "]");
}
}
}
}

abstract static class BaseAsyncTask implements Runnable, Closeable {
protected final IndexService indexService;
protected final ThreadPool threadPool;
Expand Down Expand Up @@ -877,6 +929,41 @@ public String toString() {
}
}

// this setting is intentionally not registered, it is only used in tests
public static final Setting<TimeValue> GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING =
Setting.timeSetting(
"index.global_checkpoint_sync.interval",
new TimeValue(30, TimeUnit.SECONDS),
new TimeValue(0, TimeUnit.MILLISECONDS),
Property.Dynamic,
Property.IndexScope);

/**
* Background task that syncs the global checkpoint to replicas.
*/
final class AsyncGlobalCheckpointTask extends BaseAsyncTask {

AsyncGlobalCheckpointTask(final IndexService indexService) {
// index.global_checkpoint_sync_interval is not a real setting, it is only registered in tests
super(indexService, GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings()));
}

@Override
protected void runInternal() {
indexService.maybeSyncGlobalCheckpoints();
}

@Override
protected String getThreadPool() {
return ThreadPool.Names.GENERIC;
}

@Override
public String toString() {
return "global_checkpoint_sync";
}
}

AsyncRefreshTask getRefreshTask() { // for tests
return refreshTask;
}
Expand All @@ -885,6 +972,10 @@ AsyncTranslogFSync getFsyncTask() { // for tests
return fsyncTask;
}

AsyncGlobalCheckpointTask getGlobalCheckpointTask() {
return globalCheckpointTask;
}

/**
* Clears the caches for the given shard id if the shard is still allocated on this node
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

package org.elasticsearch.index.seqno;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
Expand All @@ -34,6 +37,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -47,7 +51,7 @@
public class GlobalCheckpointSyncAction extends TransportReplicationAction<
GlobalCheckpointSyncAction.Request,
GlobalCheckpointSyncAction.Request,
ReplicationResponse> implements IndexEventListener {
ReplicationResponse> {

public static String ACTION_NAME = "indices:admin/seq_no/global_checkpoint_sync";

Expand All @@ -73,7 +77,17 @@ public GlobalCheckpointSyncAction(
indexNameExpressionResolver,
Request::new,
Request::new,
ThreadPool.Names.SAME);
ThreadPool.Names.MANAGEMENT);
}

public void updateGlobalCheckpointForShard(final ShardId shardId) {
execute(
new Request(shardId),
ActionListener.wrap(r -> {}, e -> {
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
logger.info(shardId + " global checkpoint sync failed", e);
}
}));
}

@Override
Expand All @@ -94,11 +108,6 @@ protected void sendReplicaRequest(
}
}

@Override
public void onShardInactive(final IndexShard indexShard) {
execute(new Request(indexShard.shardId()));
}

@Override
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
final Request request, final IndexShard indexShard) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,20 @@ public int hashCode() {
}
}

synchronized ObjectLongMap<String> getGlobalCheckpoints() {
/**
* Get the local knowledge of the global checkpoints for all in-sync allocation IDs.
*
* @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID
*/
synchronized ObjectLongMap<String> getInSyncGlobalCheckpoints() {
assert primaryMode;
assert handoffInProgress == false;
final ObjectLongMap<String> globalCheckpoints = new ObjectLongHashMap<>(checkpoints.size());
for (final Map.Entry<String, CheckpointState> cps : checkpoints.entrySet()) {
globalCheckpoints.put(cps.getKey(), cps.getValue().globalCheckpoint);
}
final ObjectLongMap<String> globalCheckpoints = new ObjectLongHashMap<>(checkpoints.size()); // upper bound on the size
checkpoints
.entrySet()
.stream()
.filter(e -> e.getValue().inSync)
.forEach(e -> globalCheckpoints.put(e.getKey(), e.getValue().globalCheckpoint));
return globalCheckpoints;
}

Expand Down
Loading

0 comments on commit 5c2572c

Please sign in to comment.