Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce global checkpoint background sync #26591

Merged
merged 42 commits into from
Sep 21, 2017
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c0e7443
Introduce global checkpoint background sync
jasontedor Aug 28, 2017
511f96e
Cleanup after test
jasontedor Sep 11, 2017
e1dc814
Recollect stats
jasontedor Sep 12, 2017
86ddf79
Merge branch 'master' into global-checkpoint-sync
jasontedor Sep 19, 2017
e0657a7
Add after-op sync
jasontedor Sep 20, 2017
b79b13f
Merge branch 'master' into global-checkpoint-sync
jasontedor Sep 20, 2017
f7a76cd
Iteration
jasontedor Sep 20, 2017
3881d76
Iteration
jasontedor Sep 20, 2017
89bbf84
Remove comments
jasontedor Sep 20, 2017
1082a31
Close it
jasontedor Sep 20, 2017
15453b4
Merge branch 'master' into global-checkpoint-sync
jasontedor Sep 20, 2017
aa0c62c
refresh needed
jasontedor Sep 20, 2017
721f725
remove ensure green
jasontedor Sep 20, 2017
9bc5155
remove background sync test setting
jasontedor Sep 20, 2017
63e9d80
Revert "remove background sync test setting"
jasontedor Sep 21, 2017
6ade720
test iteratoin
jasontedor Sep 21, 2017
354df1c
disable bwc tests
jasontedor Sep 21, 2017
79742ea
imports
jasontedor Sep 21, 2017
d9fc19d
Formatting of method
jasontedor Sep 21, 2017
afb082f
Remove leftover code
jasontedor Sep 21, 2017
9bf875e
State handling
jasontedor Sep 21, 2017
007d5c4
Logging on failed sync
jasontedor Sep 21, 2017
cb9373b
no fallthrough
jasontedor Sep 21, 2017
7e6d1bf
Setting
jasontedor Sep 21, 2017
f7295ac
Critical fix
jasontedor Sep 21, 2017
f237d88
fix comment
jasontedor Sep 21, 2017
e88b92c
Revert move, add javadocs
jasontedor Sep 21, 2017
030156d
Handle execute future
jasontedor Sep 21, 2017
0343372
Only check in-sync shards
jasontedor Sep 21, 2017
cf4e67b
Test iteration
jasontedor Sep 21, 2017
f82df30
More testing
jasontedor Sep 21, 2017
507806b
refactor shard creation
jasontedor Sep 21, 2017
530addd
handle closed
jasontedor Sep 21, 2017
f3b04dc
assertSeqNos
jasontedor Sep 21, 2017
5030a77
Add distruption test
jasontedor Sep 21, 2017
b640b10
revert formatting changes
jasontedor Sep 21, 2017
cccdec6
checkstyle
jasontedor Sep 21, 2017
d43f794
imports
jasontedor Sep 21, 2017
26e4c76
remove dead code from create index
jasontedor Sep 21, 2017
b80b728
everything in the right place
jasontedor Sep 21, 2017
b8adcce
revert formatting changes
jasontedor Sep 21, 2017
c041ea2
Merge branch 'master' into global-checkpoint-sync
jasontedor Sep 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/
allprojects {
ext.bwc_tests_enabled = true
ext.bwc_tests_enabled = false
}

task verifyBwcTestsEnabled {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.elasticsearch.action.support.replication;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
Expand Down Expand Up @@ -55,6 +57,7 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -108,12 +111,26 @@ public abstract class TransportReplicationAction<
protected final String transportReplicaAction;
protected final String transportPrimaryAction;

private final boolean syncGlobalCheckpointAfterOperation;

protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor, false);
}


protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor,
boolean syncGlobalCheckpointAfterOperation) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
this.transportService = transportService;
this.clusterService = clusterService;
Expand All @@ -126,6 +143,8 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
registerRequestHandlers(actionName, transportService, request, replicaRequest, executor);

this.transportOptions = transportOptions();

this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
}

protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
Expand All @@ -150,7 +169,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
new ReroutePhase((ReplicationTask) task, request, listener).run();
}

protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
protected ReplicationOperation.Replicas<ReplicaRequest> newReplicasProxy(long primaryTerm) {
return new ReplicasProxy(primaryTerm);
}

Expand Down Expand Up @@ -359,6 +378,17 @@ private ActionListener<Response> createResponseListener(final PrimaryShardRefere
return new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
if (syncGlobalCheckpointAfterOperation) {
try {
primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation");
} catch (final Exception e) {
// only log non-closed exceptions
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
logger.info("post-operation global checkpoint sync failed", e);
// intentionally swallow, a missed global checkpoint sync should not fail this operation
}
}
}
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
try {
Expand All @@ -382,8 +412,8 @@ public void onFailure(Exception e) {
}

protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> createReplicatedOperation(
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference) {
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDE gone rogue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed b8adcce.

return new ReplicationOperation<>(request, primaryShardReference, listener,
newReplicasProxy(primaryTerm), logger, actionName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor);
indexNameExpressionResolver, request, replicaRequest, executor, true);
}

/** Syncs operation result to the translog or throws a shard not available failure */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
(tmpImd.getNumberOfReplicas() + 1) + "]");
}
// create the index here (on the master) to validate it can be created, as well as adding the mapping
final IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList());
final IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList(), s -> {});
createdIndex = indexService.index();
// now add the mappings
MapperService mapperService = indexService.mapperService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ ClusterState innerExecute(ClusterState currentState, Iterable<AliasAction> actio
if (indexService == null) {
// temporarily create the index and add mappings so we can parse the filter
try {
indexService = indicesService.createIndex(index, emptyList());
indexService = indicesService.createIndex(index, emptyList(), s -> {});
indicesToClose.add(index.getIndex());
} catch (IOException e) {
throw new ElasticsearchException("Failed to create temporary index for parsing the alias", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private static void validateAndAddTemplate(final PutRequest request, IndexTempla
.build();

final IndexMetaData tmpIndexMetadata = IndexMetaData.builder(temporaryIndexName).settings(dummySettings).build();
IndexService dummyIndexService = indicesService.createIndex(tmpIndexMetadata, Collections.emptyList());
IndexService dummyIndexService = indicesService.createIndex(tmpIndexMetadata, Collections.emptyList(), s -> {});
createdIndex = dummyIndexService.index();

templateBuilder.order(request.order);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ ClusterState executeRefresh(final ClusterState currentState, final List<RefreshT
IndexService indexService = indicesService.indexService(indexMetaData.getIndex());
if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetaData, Collections.emptyList());
indexService = indicesService.createIndex(indexMetaData, Collections.emptyList(), s -> {});
removeIndex = true;
indexService.mapperService().merge(indexMetaData, MergeReason.MAPPING_RECOVERY, true);
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand All @@ -40,6 +39,7 @@
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.BM25SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService;
Expand Down Expand Up @@ -330,7 +330,8 @@ public IndexService newIndexService(
IndicesQueryCache indicesQueryCache,
MapperRegistry mapperRegistry,
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry)
NamedWriteableRegistry namedWriteableRegistry,
Consumer<ShardId> globalCheckpointSyncer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this and remove all changes to IndicesService.createIndex etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 26e4c76.

throws IOException {
final IndexEventListener eventListener = freeze();
IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null
Expand Down
103 changes: 97 additions & 6 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
Expand Down Expand Up @@ -82,6 +86,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -109,10 +114,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean deleted = new AtomicBoolean(false);
private final IndexSettings indexSettings;
private final List<IndexingOperationListener> indexingOperationListeners;
private final List<SearchOperationListener> searchOperationListeners;
private final List<IndexingOperationListener> indexingOperationListeners;
private volatile AsyncRefreshTask refreshTask;
private volatile AsyncTranslogFSync fsyncTask;
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;

// don't convert to Setting<> and register... we only set this in tests and register via a plugin
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
Expand Down Expand Up @@ -182,11 +188,12 @@ public IndexService(
this.engineFactory = engineFactory;
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this);
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
// kick off async ops for the first shard in this index
this.refreshTask = new AsyncRefreshTask(this);
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
rescheduleFsyncTask(indexSettings.getTranslogDurability());
}

Expand Down Expand Up @@ -268,7 +275,15 @@ public synchronized void close(final String reason, boolean delete) throws IOExc
}
}
} finally {
IOUtils.close(bitsetFilterCache, indexCache, indexFieldData, mapperService, refreshTask, fsyncTask, trimTranslogTask);
IOUtils.close(
bitsetFilterCache,
indexCache,
indexFieldData,
mapperService,
refreshTask,
fsyncTask,
trimTranslogTask,
globalCheckpointTask);
}
}
}
Expand All @@ -293,8 +308,7 @@ private long getAvgShardSizeInBytes() throws IOException {
}
}

public synchronized IndexShard createShard(ShardRouting routing) throws IOException {
final boolean primary = routing.primary();
public synchronized IndexShard createShard(ShardRouting routing, Consumer<ShardId> globalCheckpointSyncer) throws IOException {
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just
Expand Down Expand Up @@ -365,7 +379,7 @@ public synchronized IndexShard createShard(ShardRouting routing) throws IOExcept
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier,
indexCache, mapperService, similarityService, engineFactory,
eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
searchOperationListeners, indexingOperationListeners);
searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId));
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
Expand Down Expand Up @@ -710,6 +724,44 @@ private void maybeTrimTranslog() {
}
}

private void maybeSyncGlobalCheckpoints() {
for (final IndexShard shard : this.shards.values()) {
if (shard.routingEntry().active() && shard.routingEntry().primary()) {
switch (shard.state()) {
case CLOSED:
case CREATED:
case RECOVERING:
case RELOCATED:
continue;
case POST_RECOVERY:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

routing entry cannot be active while shard state is POST_RECOVERY ;-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 9bf875e.

assert false : "shard " + shard.shardId() + " is in post-recovery but marked as active";
continue;
case STARTED:
try {
shard.acquirePrimaryOperationPermit(
ActionListener.wrap(
releasable -> {
try (Releasable ignored = releasable) {
shard.maybeSyncGlobalCheckpoint("background");
}
},
e -> {
if (!(e instanceof AlreadyClosedException || e instanceof IndexShardClosedException)) {
logger.info("failed to execute background global checkpoint sync", e);
}
}),
ThreadPool.Names.SAME);
} catch (final AlreadyClosedException | IndexShardClosedException e) {
// the shard was closed concurrently, continue
}
continue;
default:
throw new IllegalStateException("unknown state [" + shard.state() + "]");
}
}
}
}

abstract static class BaseAsyncTask implements Runnable, Closeable {
protected final IndexService indexService;
protected final ThreadPool threadPool;
Expand Down Expand Up @@ -877,6 +929,41 @@ public String toString() {
}
}

// this setting is intentionally not registered, it is only used in tests
public static final Setting<TimeValue> GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING =
Setting.timeSetting(
"index.global_checkpoint_sync.interval",
new TimeValue(30, TimeUnit.SECONDS),
new TimeValue(0, TimeUnit.MILLISECONDS),
Property.Dynamic,
Property.IndexScope);

/**
* Background task that syncs the global checkpoint to replicas.
*/
final class AsyncGlobalCheckpointTask extends BaseAsyncTask {

AsyncGlobalCheckpointTask(final IndexService indexService) {
// index.global_checkpoint_sync_interval is not a real setting, it is only registered in tests
super(indexService, GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings()));
}

@Override
protected void runInternal() {
indexService.maybeSyncGlobalCheckpoints();
}

@Override
protected String getThreadPool() {
return ThreadPool.Names.GENERIC;
}

@Override
public String toString() {
return "global_checkpoint_sync";
}
}

AsyncRefreshTask getRefreshTask() { // for tests
return refreshTask;
}
Expand All @@ -885,6 +972,10 @@ AsyncTranslogFSync getFsyncTask() { // for tests
return fsyncTask;
}

AsyncGlobalCheckpointTask getGlobalCheckpointTask() {
return globalCheckpointTask;
}

/**
* Clears the caches for the given shard id if the shard is still allocated on this node
*/
Expand Down
Loading