From 26ec89173bc3a008070e081879e049f2eeecb614 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 7 Jun 2017 17:11:27 +0200 Subject: [PATCH] Remove TranslogRecoveryPerformer (#24858) Splits TranslogRecoveryPerformer into three parts: - the translog operation to engine operation converter - the operation perfomer (that indexes the operation into the engine) - the translog statistics (for which there is already RecoveryState.Translog) This makes it possible for peer recovery to use the same IndexShard interface as bulk shard requests (i.e. Engine operations instead of Translog operations). It also pushes the "fail on bad mapping" logic outside of IndexShard. Future pull requests could unify the BulkShard and peer recovery path even more. --- .../elasticsearch/ElasticsearchException.java | 3 +- .../elasticsearch/index/engine/Engine.java | 8 +- .../index/engine/EngineConfig.java | 32 +-- .../index/engine/InternalEngine.java | 7 +- .../elasticsearch/index/shard/IndexShard.java | 119 ++++++---- .../shard/TranslogOpToEngineOpConverter.java | 73 ++++++ .../shard/TranslogRecoveryPerformer.java | 214 ------------------ .../index/translog/Translog.java | 15 +- .../recovery/PeerRecoveryTargetService.java | 17 +- .../recovery/RecoverySourceHandler.java | 6 +- .../indices/recovery/RecoveryTarget.java | 31 ++- .../recovery/RecoveryTargetHandler.java | 2 +- .../ExceptionSerializationTests.java | 20 +- .../index/engine/InternalEngineTests.java | 87 +++++-- .../RecoveryDuringReplicationTests.java | 5 +- .../index/shard/IndexShardTests.java | 112 ++++++++- .../index/shard/RefreshListenersTests.java | 6 +- .../index/shard/IndexShardTestCase.java | 2 +- 18 files changed, 390 insertions(+), 369 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/index/shard/TranslogOpToEngineOpConverter.java delete mode 100644 core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java index b020dc6ea1f5b..ae006045e3d47 100644 --- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -765,8 +765,7 @@ private enum ElasticsearchExceptionHandle { org.elasticsearch.search.SearchContextMissingException::new, 24, UNKNOWN_VERSION_ADDED), GENERAL_SCRIPT_EXCEPTION(org.elasticsearch.script.GeneralScriptException.class, org.elasticsearch.script.GeneralScriptException::new, 25, UNKNOWN_VERSION_ADDED), - BATCH_OPERATION_EXCEPTION(org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException.class, - org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException::new, 26, UNKNOWN_VERSION_ADDED), + // 26 was BatchOperationException SNAPSHOT_CREATION_EXCEPTION(org.elasticsearch.snapshots.SnapshotCreationException.class, org.elasticsearch.snapshots.SnapshotCreationException::new, 27, UNKNOWN_VERSION_ADDED), DELETE_FAILED_ENGINE_EXCEPTION(org.elasticsearch.index.engine.DeleteFailedEngineException.class, // deprecated in 6.0, remove in 7.0 diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 0242445de5d7f..7763c8d04a4e7 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1008,7 +1008,7 @@ public long startTime() { abstract String id(); - abstract TYPE operationType(); + public abstract TYPE operationType(); } public static class Index extends Operation { @@ -1050,7 +1050,7 @@ public String id() { } @Override - TYPE operationType() { + public TYPE operationType() { return TYPE.INDEX; } @@ -1126,7 +1126,7 @@ public String id() { } @Override - TYPE operationType() { + public TYPE operationType() { return TYPE.DELETE; } @@ -1176,7 +1176,7 @@ String id() { } @Override - TYPE operationType() { + public TYPE operationType() { return TYPE.NO_OP; } diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 5016c0fcb4fa4..d7019c77321da 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -35,12 +35,13 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.List; /* @@ -50,7 +51,6 @@ */ public final class EngineConfig { private final ShardId shardId; - private final TranslogRecoveryPerformer translogRecoveryPerformer; private final IndexSettings indexSettings; private final ByteSizeValue indexingBufferSize; private volatile boolean enableGcDeletes = true; @@ -70,6 +70,7 @@ public final class EngineConfig { private final List refreshListeners; @Nullable private final Sort indexSort; + private final TranslogRecoveryRunner translogRecoveryRunner; /** * Index setting to change the low level lucene codec used for writing new segments. @@ -112,9 +113,9 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool, IndexSettings indexSettings, Engine.Warmer warmer, Store store, MergePolicy mergePolicy, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.EventListener eventListener, - TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, + QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig, TimeValue flushMergesAfter, List refreshListeners, - Sort indexSort) { + Sort indexSort, TranslogRecoveryRunner translogRecoveryRunner) { if (openMode == null) { throw new IllegalArgumentException("openMode must not be null"); } @@ -133,7 +134,6 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool, // there are not too many shards allocated to this node. Instead, IndexingMemoryController periodically checks // and refreshes the most heap-consuming shards when total indexing heap usage across all shards is too high: indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB); - this.translogRecoveryPerformer = translogRecoveryPerformer; this.queryCache = queryCache; this.queryCachingPolicy = queryCachingPolicy; this.translogConfig = translogConfig; @@ -141,6 +141,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool, this.openMode = openMode; this.refreshListeners = refreshListeners; this.indexSort = indexSort; + this.translogRecoveryRunner = translogRecoveryRunner; } /** @@ -253,15 +254,6 @@ public Similarity getSimilarity() { return similarity; } - /** - * Returns the {@link org.elasticsearch.index.shard.TranslogRecoveryPerformer} for this engine. This class is used - * to apply transaction log operations to the engine. It encapsulates all the logic to transfer the translog entry into - * an indexing operation. - */ - public TranslogRecoveryPerformer getTranslogRecoveryPerformer() { - return translogRecoveryPerformer; - } - /** * Return the cache to use for queries. */ @@ -297,6 +289,18 @@ public OpenMode getOpenMode() { return openMode; } + @FunctionalInterface + public interface TranslogRecoveryRunner { + int run(Engine engine, Translog.Snapshot snapshot) throws IOException; + } + + /** + * Returns a runner that implements the translog recovery from the given snapshot + */ + public TranslogRecoveryRunner getTranslogRecoveryRunner() { + return translogRecoveryRunner; + } + /** * Engine open mode defines how the engine should be opened or in other words what the engine should expect * to recover from. We either create a brand new engine with a new index and translog or we recover from an existing index. diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 5db0249320ed8..8c0481d686f41 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -72,7 +72,6 @@ import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogCorruptedException; @@ -286,7 +285,7 @@ public InternalEngine recoverFromTranslog() throws IOException { throw new IllegalStateException("Engine has already been recovered"); } try { - recoverFromTranslog(engineConfig.getTranslogRecoveryPerformer()); + recoverFromTranslogInternal(); } catch (Exception e) { try { pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush @@ -302,12 +301,12 @@ public InternalEngine recoverFromTranslog() throws IOException { return this; } - private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOException { + private void recoverFromTranslogInternal() throws IOException { Translog.TranslogGeneration translogGeneration = translog.getGeneration(); final int opsRecovered; try { Translog.Snapshot snapshot = translog.newSnapshot(); - opsRecovered = handler.recoveryFromSnapshot(this, snapshot); + opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot); } catch (Exception e) { throw new EngineException(shardId, "failed to recover from translog", e); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8a733de505ec9..83edd73350b10 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.shard; -import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexOptions; @@ -116,6 +115,7 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.suggest.completion.CompletionFieldStats; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.threadpool.ThreadPool; @@ -167,6 +167,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final IndexEventListener indexEventListener; private final QueryCachingPolicy cachingPolicy; private final Supplier indexSortSupplier; + private final TranslogOpToEngineOpConverter translogOpToEngineOpConverter; /** * How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this @@ -259,6 +260,7 @@ public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardP this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); + this.translogOpToEngineOpConverter = new TranslogOpToEngineOpConverter(shardId, mapperService); // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) { @@ -571,6 +573,37 @@ static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); } + /** + * Applies an engine operation to the shard, which can be either an index, delete or noop operation. + */ + public Engine.Result applyOperation(Engine.Operation operation) throws IOException { + return applyOperation(getEngine(), operation); + } + + private Engine.Result applyOperation(Engine engine, Engine.Operation operation) throws IOException { + switch (operation.operationType()) { + case INDEX: + Engine.Index engineIndex = (Engine.Index) operation; + return index(engine, engineIndex); + case DELETE: + final Engine.Delete engineDelete = (Engine.Delete) operation; + return delete(engine, engineDelete); + case NO_OP: + final Engine.NoOp engineNoOp = (Engine.NoOp) operation; + return noOp(engine, engineNoOp); + default: + throw new IllegalStateException("No operation defined for [" + operation + "]"); + } + } + + private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) { + active.set(true); + if (logger.isTraceEnabled()) { + logger.trace("noop (seq# [{}])", noOp.seqNo()); + } + return engine.noOp(noOp); + } + public Engine.IndexResult index(Engine.Index index) throws IOException { ensureWriteAllowed(index); Engine engine = getEngine(); @@ -1019,21 +1052,33 @@ public void prepareForIndexRecovery() { assert currentEngineReference.get() == null; } - /** - * Applies all operations in the iterable to the current engine and returns the number of operations applied. - * This operation will stop applying operations once an operation failed to apply. - * Note: This method is typically used in peer recovery to replay remote transaction log entries. - */ - public int performBatchRecovery(Iterable operations) { - if (state != IndexShardState.RECOVERING) { - throw new IndexShardNotRecoveringException(shardId, state); + public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { + return translogOpToEngineOpConverter.convertToEngineOp(operation, origin); + } + + // package-private for testing + int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException { + recoveryState.getTranslog().totalOperations(snapshot.totalOperations()); + recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations()); + int opsRecovered = 0; + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + try { + logger.trace("[translog] recover op {}", operation); + Engine.Operation engineOp = convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY); + applyOperation(engine, engineOp); + opsRecovered++; + recoveryState.getTranslog().incrementRecoveredOperations(); + } catch (ElasticsearchException e) { + if (e.status() == RestStatus.BAD_REQUEST) { + // mainly for MapperParsingException and Failure to detect xcontent + logger.info("ignoring recovery of a corrupt translog entry", e); + } else { + throw e; + } + } } - // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, - // we still invoke any onShardInactive listeners ... we won't sync'd flush in this case because we only do that on primary and this - // is a replica - active.set(true); - Engine engine = getEngine(); - return engine.config().getTranslogRecoveryPerformer().performBatchRecovery(engine, operations); + return opsRecovered; } /** @@ -1841,13 +1886,14 @@ private DocumentMapperForType docMapper(String type) { } private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) { - final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger); Sort indexSort = indexSortSupplier.get(); return new EngineConfig(openMode, shardId, threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), - mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, + mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, + indexCache.query(), cachingPolicy, translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), - Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort); + Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort, + this::runTranslogRecovery); } /** @@ -1960,6 +2006,11 @@ public final void sync(Translog.Location location, Consumer syncListe translogSyncProcessor.put(location, syncListener); } + public final void sync() throws IOException { + verifyNotClosed(); + getEngine().getTranslog().sync(); + } + /** * Returns the current translog durability mode */ @@ -2091,38 +2142,6 @@ public void addRefreshListener(Translog.Location location, Consumer lis refreshListeners.addOrNotify(location, listener); } - private class IndexShardRecoveryPerformer extends TranslogRecoveryPerformer { - - protected IndexShardRecoveryPerformer(ShardId shardId, MapperService mapperService, Logger logger) { - super(shardId, mapperService, logger); - } - - @Override - protected void operationProcessed() { - assert recoveryState != null; - recoveryState.getTranslog().incrementRecoveredOperations(); - } - - @Override - public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException { - assert recoveryState != null; - RecoveryState.Translog translogStats = recoveryState.getTranslog(); - translogStats.totalOperations(snapshot.totalOperations()); - translogStats.totalOperationsOnStart(snapshot.totalOperations()); - return super.recoveryFromSnapshot(engine, snapshot); - } - - @Override - protected void index(Engine engine, Engine.Index engineIndex) throws IOException { - IndexShard.this.index(engine, engineIndex); - } - - @Override - protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException { - IndexShard.this.delete(engine, engineDelete); - } - } - private static class RefreshMetricUpdater implements ReferenceManager.RefreshListener { private final MeanMetric refreshMetric; diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogOpToEngineOpConverter.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogOpToEngineOpConverter.java new file mode 100644 index 0000000000000..372e8f4e25a61 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogOpToEngineOpConverter.java @@ -0,0 +1,73 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.DocumentMapperForType; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.translog.Translog; + +import static org.elasticsearch.index.mapper.SourceToParse.source; + +/** + * The TranslogOpToEngineOpConverter encapsulates all the logic needed to transform a translog entry into an + * indexing operation including source parsing and field creation from the source. + */ +public class TranslogOpToEngineOpConverter { + private final MapperService mapperService; + private final ShardId shardId; + + protected TranslogOpToEngineOpConverter(ShardId shardId, MapperService mapperService) { + this.shardId = shardId; + this.mapperService = mapperService; + } + + protected DocumentMapperForType docMapper(String type) { + return mapperService.documentMapperWithAutoCreate(type); // protected for testing + } + + public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { + switch (operation.opType()) { + case INDEX: + final Translog.Index index = (Translog.Index) operation; + // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all + // autoGeneratedID docs that are coming from the primary are updated correctly. + final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), + source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source())) + .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(), + index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, + index.getAutoGeneratedIdTimestamp(), true); + return engineIndex; + case DELETE: + final Translog.Delete delete = (Translog.Delete) operation; + final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), + delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), + origin, System.nanoTime()); + return engineDelete; + case NO_OP: + final Translog.NoOp noOp = (Translog.NoOp) operation; + final Engine.NoOp engineNoOp = + new Engine.NoOp(noOp.seqNo(), noOp.primaryTerm(), origin, System.nanoTime(), noOp.reason()); + return engineNoOp; + default: + throw new IllegalStateException("No operation defined for [" + operation + "]"); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java deleted file mode 100644 index 668e957ae52ec..0000000000000 --- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.index.shard; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.mapper.DocumentMapperForType; -import org.elasticsearch.index.mapper.MapperException; -import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.mapper.Mapping; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.rest.RestStatus; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import static org.elasticsearch.index.mapper.SourceToParse.source; - -/** - * The TranslogRecoveryPerformer encapsulates all the logic needed to transform a translog entry into an - * indexing operation including source parsing and field creation from the source. - */ -public class TranslogRecoveryPerformer { - private final MapperService mapperService; - private final Logger logger; - private final Map recoveredTypes = new HashMap<>(); - private final ShardId shardId; - - protected TranslogRecoveryPerformer(ShardId shardId, MapperService mapperService, Logger logger) { - this.shardId = shardId; - this.mapperService = mapperService; - this.logger = logger; - } - - protected DocumentMapperForType docMapper(String type) { - return mapperService.documentMapperWithAutoCreate(type); // protected for testing - } - - /** - * Applies all operations in the iterable to the current engine and returns the number of operations applied. - * This operation will stop applying operations once an operation failed to apply. - * - * Throws a {@link MapperException} to be thrown if a mapping update is encountered. - */ - int performBatchRecovery(Engine engine, Iterable operations) { - int numOps = 0; - try { - for (Translog.Operation operation : operations) { - performRecoveryOperation(engine, operation, false, Engine.Operation.Origin.PEER_RECOVERY); - numOps++; - } - engine.getTranslog().sync(); - } catch (Exception e) { - throw new BatchOperationException(shardId, "failed to apply batch translog operation", numOps, e); - } - return numOps; - } - - public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException { - Translog.Operation operation; - int opsRecovered = 0; - while ((operation = snapshot.next()) != null) { - try { - performRecoveryOperation(engine, operation, true, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY); - opsRecovered++; - } catch (ElasticsearchException e) { - if (e.status() == RestStatus.BAD_REQUEST) { - // mainly for MapperParsingException and Failure to detect xcontent - logger.info("ignoring recovery of a corrupt translog entry", e); - } else { - throw e; - } - } - } - - return opsRecovered; - } - - public static class BatchOperationException extends ElasticsearchException { - - private final int completedOperations; - - public BatchOperationException(ShardId shardId, String msg, int completedOperations, Throwable cause) { - super(msg, cause); - setShard(shardId); - this.completedOperations = completedOperations; - } - - public BatchOperationException(StreamInput in) throws IOException{ - super(in); - completedOperations = in.readInt(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeInt(completedOperations); - } - - /** the number of successful operations performed before the exception was thrown */ - public int completedOperations() { - return completedOperations; - } - } - - private void maybeAddMappingUpdate(String type, Mapping update, String docId, boolean allowMappingUpdates) { - if (update == null) { - return; - } - if (allowMappingUpdates == false) { - throw new MapperException("mapping updates are not allowed (type: [" + type + "], id: [" + docId + "])"); - } - Mapping currentUpdate = recoveredTypes.get(type); - if (currentUpdate == null) { - recoveredTypes.put(type, update); - } else { - currentUpdate = currentUpdate.merge(update, false); - } - } - - /** - * Performs a single recovery operation. - * - * @param allowMappingUpdates true if mapping update should be accepted (but collected). Setting it to false will - * cause a {@link MapperException} to be thrown if an update - * is encountered. - */ - private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) throws IOException { - switch (operation.opType()) { - case INDEX: - Translog.Index index = (Translog.Index) operation; - // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all - // autoGeneratedID docs that are coming from the primary are updated correctly. - Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), - source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source())) - .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(), - index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, index.getAutoGeneratedIdTimestamp(), true); - maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates); - logger.trace("[translog] recover [index] op [({}, {})] of [{}][{}]", index.seqNo(), index.primaryTerm(), index.type(), index.id()); - index(engine, engineIndex); - break; - case DELETE: - Translog.Delete delete = (Translog.Delete) operation; - logger.trace("[translog] recover [delete] op [({}, {})] of [{}][{}]", delete.seqNo(), delete.primaryTerm(), delete.type(), delete.id()); - final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), - delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), - origin, System.nanoTime()); - delete(engine, engineDelete); - break; - case NO_OP: - final Translog.NoOp noOp = (Translog.NoOp) operation; - final long seqNo = noOp.seqNo(); - final long primaryTerm = noOp.primaryTerm(); - final String reason = noOp.reason(); - logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason); - final Engine.NoOp engineNoOp = - new Engine.NoOp(seqNo, primaryTerm, origin, System.nanoTime(), reason); - noOp(engine, engineNoOp); - break; - default: - throw new IllegalStateException("No operation defined for [" + operation + "]"); - } - operationProcessed(); - } - - protected void index(Engine engine, Engine.Index engineIndex) throws IOException { - engine.index(engineIndex); - } - - protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException { - engine.delete(engineDelete); - } - - protected void noOp(Engine engine, Engine.NoOp engineNoOp) { - engine.noOp(engineNoOp); - } - - /** - * Called once for every processed operation by this recovery performer. - * This can be used to get progress information on the translog execution. - */ - protected void operationProcessed() { - // noop - } - - - /** - * Returns the recovered types modifying the mapping during the recovery - */ - public Map getRecoveredTypes() { - return recoveredTypes; - } -} diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 1314504e397ec..c351f0346236e 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -916,15 +916,20 @@ public Index(Engine.Index index, Engine.IndexResult indexResult) { } public Index(String type, String id, long seqNo, byte[] source) { + this(type, id, seqNo, Versions.MATCH_ANY, VersionType.INTERNAL, source, null, null, -1); + } + + public Index(String type, String id, long seqNo, long version, VersionType versionType, byte[] source, String routing, + String parent, long autoGeneratedIdTimestamp) { this.type = type; this.id = id; this.source = new BytesArray(source); this.seqNo = seqNo; - version = Versions.MATCH_ANY; - versionType = VersionType.INTERNAL; - routing = null; - parent = null; - autoGeneratedIdTimestamp = -1; + this.version = version; + this.versionType = versionType; + this.routing = routing; + this.parent = parent; + this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp; } @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 8435fe4ee1e80..4823edcc2f119 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -48,7 +48,6 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; -import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef; @@ -423,22 +422,10 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin try { recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps()); channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint())); - } catch (TranslogRecoveryPerformer.BatchOperationException exception) { - MapperException mapperException = (MapperException) ExceptionsHelper.unwrap(exception, MapperException.class); - if (mapperException == null) { - throw exception; - } + } catch (MapperException exception) { // in very rare cases a translog replay from primary is processed before a mapping update on this node // which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node. - // we want to wait until these mappings are processed but also need to do some maintenance and roll back the - // number of processed (completed) operations in this batch to ensure accounting is correct. - logger.trace( - (Supplier) () -> new ParameterizedMessage( - "delaying recovery due to missing mapping changes (rolling back stats for [{}] ops)", - exception.completedOperations()), - exception); - final RecoveryState.Translog translog = recoveryTarget.state().getTranslog(); - translog.decrementRecoveredOperations(exception.completedOperations()); // do the maintainance and rollback competed ops + logger.debug("delaying recovery due to missing mapping changes", exception); // we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be // canceled) observer.waitForNextChange(new ClusterStateObserver.Listener() { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 5c7787999da88..8abd3a05d8e68 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -510,7 +510,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl logger.trace("no translog operations to send"); } - final CancellableThreads.Interruptable sendBatch = + final CancellableThreads.IOInterruptable sendBatch = () -> targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps)); // send operations in batches @@ -536,7 +536,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl // check if this request is past bytes threshold, and if so, send it off if (size >= chunkSizeInBytes) { - cancellableThreads.execute(sendBatch); + cancellableThreads.executeIO(sendBatch); logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps); ops = 0; size = 0; @@ -546,7 +546,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl if (!operations.isEmpty() || totalSentOps == 0) { // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint - cancellableThreads.execute(sendBatch); + cancellableThreads.executeIO(sendBatch); } assert expectedTotalOps == skippedOps + totalSentOps diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 8f554ed14a093..6a465f111150c 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -39,9 +39,12 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardNotRecoveringException; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; @@ -58,6 +61,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import java.util.function.LongConsumer; /** @@ -375,12 +379,30 @@ public void ensureClusterStateVersion(long clusterStateVersion) { } @Override - public long indexTranslogOperations( - List operations, int totalTranslogOps) throws TranslogRecoveryPerformer.BatchOperationException { + public long indexTranslogOperations(List operations, int totalTranslogOps) throws MapperException, IOException { final RecoveryState.Translog translog = state().getTranslog(); translog.totalOperations(totalTranslogOps); assert indexShard().recoveryState() == state(); - indexShard().performBatchRecovery(operations); + if (indexShard().state() != IndexShardState.RECOVERING) { + throw new IndexShardNotRecoveringException(shardId, indexShard().state()); + } + // first convert all translog operations to engine operations to check for mapping updates + List engineOps = operations.stream().map( + op -> { + Engine.Operation engineOp = indexShard().convertToEngineOp(op, Engine.Operation.Origin.PEER_RECOVERY); + if (engineOp instanceof Engine.Index && ((Engine.Index) engineOp).parsedDoc().dynamicMappingsUpdate() != null) { + throw new MapperException("mapping updates are not allowed (type: [" + engineOp.type() + "], id: [" + + ((Engine.Index) engineOp).id() + "])"); + } + return engineOp; + } + ).collect(Collectors.toList()); + // actually apply engine operations + for (Engine.Operation engineOp : engineOps) { + indexShard().applyOperation(engineOp); + translog.incrementRecoveredOperations(); + } + indexShard().sync(); return indexShard().getLocalCheckpoint(); } @@ -476,5 +498,4 @@ public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesR Path translogLocation() { return indexShard().shardPath().resolveTranslog(); } - } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 38f412fed734a..42cf1bc1ce19d 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -56,7 +56,7 @@ public interface RecoveryTargetHandler { * * @return the local checkpoint on the target shard */ - long indexTranslogOperations(List operations, int totalTranslogOps); + long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException; /** * Notifies the target of the files it is going to receive diff --git a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 764a6d3b3512d..4add6bce90071 100644 --- a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -62,10 +62,10 @@ import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.indices.IndexTemplateMissingException; import org.elasticsearch.indices.InvalidIndexTemplateException; import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException; +import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException; @@ -334,22 +334,6 @@ public void testRecoverFilesRecoveryException() throws IOException { assertTrue(ex.getCause() instanceof NullPointerException); } - public void testBatchOperationException() throws IOException { - ShardId id = new ShardId("foo", "_na_", 1); - TranslogRecoveryPerformer.BatchOperationException ex = serialize( - new TranslogRecoveryPerformer.BatchOperationException(id, "batched the fucker", 666, null)); - assertEquals(ex.getShardId(), id); - assertEquals(666, ex.completedOperations()); - assertEquals("batched the fucker", ex.getMessage()); - assertNull(ex.getCause()); - - ex = serialize(new TranslogRecoveryPerformer.BatchOperationException(null, "batched the fucker", -1, new NullPointerException())); - assertNull(ex.getShardId()); - assertEquals(-1, ex.completedOperations()); - assertEquals("batched the fucker", ex.getMessage()); - assertTrue(ex.getCause() instanceof NullPointerException); - } - public void testInvalidIndexTemplateException() throws IOException { InvalidIndexTemplateException ex = serialize(new InvalidIndexTemplateException("foo", "bar")); assertEquals(ex.getMessage(), "index_template [foo] invalid, cause [bar]"); @@ -702,7 +686,7 @@ public void testIds() { ids.put(23, org.elasticsearch.index.shard.IndexShardStartedException.class); ids.put(24, org.elasticsearch.search.SearchContextMissingException.class); ids.put(25, org.elasticsearch.script.GeneralScriptException.class); - ids.put(26, org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException.class); + ids.put(26, null); ids.put(27, org.elasticsearch.snapshots.SnapshotCreationException.class); ids.put(28, org.elasticsearch.index.engine.DeleteFailedEngineException.class); //deprecated in 6.0 ids.put(29, org.elasticsearch.index.engine.DocumentMissingException.class); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f363b31044746..bb9ec29f1ada9 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -119,7 +119,7 @@ import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; -import org.elasticsearch.index.shard.TranslogRecoveryPerformer; +import org.elasticsearch.index.shard.TranslogOpToEngineOpConverter; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryUtils; @@ -151,6 +151,7 @@ import java.util.Base64; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -261,9 +262,9 @@ public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) { public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) { return new EngineConfig(openMode, config.getShardId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), - new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), config.getQueryCache(), + new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getTranslogConfig(), - config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort()); + config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort(), config.getTranslogRecoveryRunner()); } @Override @@ -430,13 +431,14 @@ public void onFailedEngine(String reason, @Nullable Exception e) { // we don't need to notify anybody in this test } }; + final TranslogHandler handler = new TranslogHandler(xContentRegistry(), IndexSettingsModule.newIndexSettings(shardId.getIndexName(), + indexSettings.getSettings())); final List refreshListenerList = refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, - mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, - new TranslogHandler(xContentRegistry(), shardId.getIndexName(), indexSettings.getSettings(), logger), + mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), refreshListenerList, indexSort); + TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler); return config; } @@ -2614,7 +2616,7 @@ public void testTranslogReplay() throws IOException { } assertVisibleCount(engine, numDocs); - TranslogHandler parser = (TranslogHandler) engine.config().getTranslogRecoveryPerformer(); + TranslogHandler parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); parser.mappingUpdate = dynamicUpdate(); engine.close(); @@ -2622,8 +2624,8 @@ public void testTranslogReplay() throws IOException { engine.recoverFromTranslog(); assertVisibleCount(engine, numDocs, false); - parser = (TranslogHandler) engine.config().getTranslogRecoveryPerformer(); - assertEquals(numDocs, parser.recoveredOps.get()); + parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); + assertEquals(numDocs, parser.appliedOperations.get()); if (parser.mappingUpdate != null) { assertEquals(1, parser.getRecoveredTypes().size()); assertTrue(parser.getRecoveredTypes().containsKey("test")); @@ -2634,8 +2636,8 @@ public void testTranslogReplay() throws IOException { engine.close(); engine = createEngine(store, primaryTranslogDir); assertVisibleCount(engine, numDocs, false); - parser = (TranslogHandler) engine.config().getTranslogRecoveryPerformer(); - assertEquals(0, parser.recoveredOps.get()); + parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); + assertEquals(0, parser.appliedOperations.get()); final boolean flush = randomBoolean(); int randomId = randomIntBetween(numDocs + 1, numDocs + 10); @@ -2663,8 +2665,8 @@ public void testTranslogReplay() throws IOException { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), numDocs + 1); assertThat(topDocs.totalHits, equalTo(numDocs + 1)); } - parser = (TranslogHandler) engine.config().getTranslogRecoveryPerformer(); - assertEquals(flush ? 1 : 2, parser.recoveredOps.get()); + parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); + assertEquals(flush ? 1 : 2, parser.appliedOperations.get()); engine.delete(new Engine.Delete("test", Integer.toString(randomId), newUid(doc))); if (randomBoolean()) { engine.refresh("test"); @@ -2678,23 +2680,22 @@ public void testTranslogReplay() throws IOException { } } - public static class TranslogHandler extends TranslogRecoveryPerformer { + public static class TranslogHandler extends TranslogOpToEngineOpConverter + implements EngineConfig.TranslogRecoveryRunner { private final MapperService mapperService; public Mapping mappingUpdate = null; + private final Map recoveredTypes = new HashMap<>(); + private final AtomicLong appliedOperations = new AtomicLong(); - public final AtomicInteger recoveredOps = new AtomicInteger(0); - - public TranslogHandler(NamedXContentRegistry xContentRegistry, String indexName, Settings settings, Logger logger) { - super(new ShardId("test", "_na_", 0), null, logger); - Index index = new Index(indexName, "_na_"); - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings); + public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) { + super(new ShardId("test", "_na_", 0), null); NamedAnalyzer defaultAnalyzer = new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer()); IndexAnalyzers indexAnalyzers = new IndexAnalyzers(indexSettings, defaultAnalyzer, defaultAnalyzer, defaultAnalyzer, Collections.emptyMap(), Collections.emptyMap()); SimilarityService similarityService = new SimilarityService(indexSettings, Collections.emptyMap()); MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry(); mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry, - () -> null); + () -> null); } @Override @@ -2704,9 +2705,44 @@ protected DocumentMapperForType docMapper(String type) { return new DocumentMapperForType(b.build(mapperService), mappingUpdate); } + private void applyOperation(Engine engine, Engine.Operation operation) throws IOException { + switch (operation.operationType()) { + case INDEX: + Engine.Index engineIndex = (Engine.Index) operation; + Mapping update = engineIndex.parsedDoc().dynamicMappingsUpdate(); + if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) { + recoveredTypes.compute(engineIndex.type(), (k, mapping) -> mapping == null ? update : mapping.merge(update, false)); + } + engine.index(engineIndex); + break; + case DELETE: + engine.delete((Engine.Delete) operation); + break; + case NO_OP: + engine.noOp((Engine.NoOp) operation); + break; + default: + throw new IllegalStateException("No operation defined for [" + operation + "]"); + } + } + + /** + * Returns the recovered types modifying the mapping during the recovery + */ + public Map getRecoveredTypes() { + return recoveredTypes; + } + @Override - protected void operationProcessed() { - recoveredOps.incrementAndGet(); + public int run(Engine engine, Translog.Snapshot snapshot) throws IOException { + int opsRecovered = 0; + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + applyOperation(engine, convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY)); + opsRecovered++; + appliedOperations.incrementAndGet(); + } + return opsRecovered; } } @@ -2736,9 +2772,10 @@ public void testRecoverFromForeignTranslog() throws IOException { EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), - config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), + config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), config.getRefreshListeners(), null); + TimeValue.timeValueMinutes(5), config.getRefreshListeners(), null, + config.getTranslogRecoveryRunner()); try { InternalEngine internalEngine = new InternalEngine(brokenConfig); diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index ddd69c084950f..1c7705d534afb 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -333,7 +333,8 @@ protected EngineFactory getEngineFactory(final ShardRouting routing) { replica, (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) { @Override - public long indexTranslogOperations(final List operations, final int totalTranslogOps) { + public long indexTranslogOperations(final List operations, final int totalTranslogOps) + throws IOException { // index a doc which is not part of the snapshot, but also does not complete on replica replicaEngineFactory.latchIndexers(); threadPool.generic().submit(() -> { @@ -445,7 +446,7 @@ private void blockIfNeeded(RecoveryState.Stage currentStage) { } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) { + public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { if (hasBlocked() == false) { blockIfNeeded(RecoveryState.Stage.TRANSLOG); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index ebde407d33d2b..b299168ce622f 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -102,12 +102,14 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.nio.charset.Charset; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -1627,7 +1629,7 @@ public void testTranslogRecoverySyncsTranslog() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) { + public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); assertFalse(replica.getTranslog().syncNeeded()); return localCheckpoint; @@ -1637,6 +1639,112 @@ public long indexTranslogOperations(List operations, int tot closeShards(primary, replica); } + public void testRecoverFromTranslog() throws IOException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + List operations = new ArrayList<>(); + int numTotalEntries = randomIntBetween(0, 10); + int numCorruptEntries = 0; + for (int i = 0; i < numTotalEntries; i++) { + if (randomBoolean()) { + operations.add(new Translog.Index("test", "1", 0, 1, VersionType.INTERNAL, + "{\"foo\" : \"bar\"}".getBytes(Charset.forName("UTF-8")), null, null, -1)); + } else { + // corrupt entry + operations.add(new Translog.Index("test", "2", 1, 1, VersionType.INTERNAL, + "{\"foo\" : \"bar}".getBytes(Charset.forName("UTF-8")), null, null, -1)); + numCorruptEntries++; + } + } + + Iterator iterator = operations.iterator(); + Translog.Snapshot snapshot = new Translog.Snapshot() { + + @Override + public int totalOperations() { + return numTotalEntries; + } + + @Override + public Translog.Operation next() throws IOException { + return iterator.hasNext() ? iterator.next() : null; + } + }; + primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), + getFakeDiscoNode(primary.routingEntry().currentNodeId()), + null)); + primary.recoverFromStore(); + + primary.runTranslogRecovery(primary.getEngine(), snapshot); + assertThat(primary.recoveryState().getTranslog().totalOperationsOnStart(), equalTo(numTotalEntries)); + assertThat(primary.recoveryState().getTranslog().totalOperations(), equalTo(numTotalEntries)); + assertThat(primary.recoveryState().getTranslog().recoveredOperations(), equalTo(numTotalEntries - numCorruptEntries)); + + closeShards(primary); + } + + public void testTranslogOpToEngineOpConverter() throws IOException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + TranslogOpToEngineOpConverter converter = new TranslogOpToEngineOpConverter(primary.shardId(), primary.mapperService()); + + Engine.Operation.Origin origin = randomFrom(Engine.Operation.Origin.values()); + // convert index op + Translog.Index translogIndexOp = new Translog.Index(randomAlphaOfLength(10), randomAlphaOfLength(10), randomNonNegativeLong(), + randomNonNegativeLong(), randomFrom(VersionType.values()), "{\"foo\" : \"bar\"}".getBytes(Charset.forName("UTF-8")), + randomAlphaOfLength(5), randomAlphaOfLength(5), randomLong()); + Engine.Index engineIndexOp = (Engine.Index) converter.convertToEngineOp(translogIndexOp, origin); + assertEquals(engineIndexOp.origin(), origin); + assertEquals(engineIndexOp.primaryTerm(), translogIndexOp.primaryTerm()); + assertEquals(engineIndexOp.seqNo(), translogIndexOp.seqNo()); + assertEquals(engineIndexOp.version(), translogIndexOp.version()); + assertEquals(engineIndexOp.versionType(), translogIndexOp.versionType().versionTypeForReplicationAndRecovery()); + assertEquals(engineIndexOp.id(), translogIndexOp.id()); + assertEquals(engineIndexOp.type(), translogIndexOp.type()); + assertEquals(engineIndexOp.getAutoGeneratedIdTimestamp(), translogIndexOp.getAutoGeneratedIdTimestamp()); + assertEquals(engineIndexOp.parent(), translogIndexOp.parent()); + assertEquals(engineIndexOp.routing(), translogIndexOp.routing()); + assertEquals(engineIndexOp.source(), translogIndexOp.source()); + + // convert delete op + Translog.Delete translogDeleteOp = new Translog.Delete(randomAlphaOfLength(5), randomAlphaOfLength(5), + new Term(randomAlphaOfLength(5), randomAlphaOfLength(5)), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomFrom(VersionType.values())); + Engine.Delete engineDeleteOp = (Engine.Delete) converter.convertToEngineOp(translogDeleteOp, origin); + assertEquals(engineDeleteOp.origin(), origin); + assertEquals(engineDeleteOp.primaryTerm(), translogDeleteOp.primaryTerm()); + assertEquals(engineDeleteOp.seqNo(), translogDeleteOp.seqNo()); + assertEquals(engineDeleteOp.version(), translogDeleteOp.version()); + assertEquals(engineDeleteOp.versionType(), translogDeleteOp.versionType().versionTypeForReplicationAndRecovery()); + assertEquals(engineDeleteOp.id(), translogDeleteOp.id()); + assertEquals(engineDeleteOp.type(), translogDeleteOp.type()); + assertEquals(engineDeleteOp.uid(), translogDeleteOp.uid()); + + // convert noop + Translog.NoOp translogNoOp = new Translog.NoOp(randomNonNegativeLong(), randomNonNegativeLong(), randomAlphaOfLength(5)); + Engine.NoOp engineNoOp = (Engine.NoOp) converter.convertToEngineOp(translogNoOp, origin); + assertEquals(engineNoOp.origin(), origin); + assertEquals(engineNoOp.primaryTerm(), translogNoOp.primaryTerm()); + assertEquals(engineNoOp.seqNo(), translogNoOp.seqNo()); + assertEquals(engineNoOp.reason(), translogNoOp.reason()); + + closeShards(primary); + } + public void testShardActiveDuringInternalRecovery() throws IOException { IndexShard shard = newStartedShard(true); indexDoc(shard, "type", "0"); @@ -1684,7 +1792,7 @@ public void prepareForTranslogOperations(int totalTranslogOps) throws IOExceptio } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) { + public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); // Shard should now be active since we did recover: assertTrue(replica.isActive()); diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 7c396fd6693bd..6b5bd57aed9c2 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -41,7 +41,6 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.InternalEngine; -import org.elasticsearch.index.engine.InternalEngineTests.TranslogHandler; import org.elasticsearch.index.fieldvisitor.SingleFieldsVisitor; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.ParseContext.Document; @@ -116,12 +115,11 @@ public void onFailedEngine(String reason, @Nullable Exception e) { // we don't need to notify anybody in this test } }; - TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), shardId.getIndexName(), Settings.EMPTY, logger); EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, threadPool, indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), - iwc.getSimilarity(), new CodecService(null, logger), eventListener, translogHandler, + iwc.getSimilarity(), new CodecService(null, logger), eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null); + TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null); engine = new InternalEngine(config); listeners.setTranslog(engine.getTranslog()); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 515e01c0409e3..a4d587b4835d7 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -365,7 +365,7 @@ protected void recoveryEmptyReplica(IndexShard replica) throws IOException { } } - private DiscoveryNode getFakeDiscoNode(String id) { + protected DiscoveryNode getFakeDiscoNode(String id) { return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT); }