diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java index b020dc6ea1f5b..c94494b412141 100644 --- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.transport.TcpTransport; @@ -765,8 +766,7 @@ private enum ElasticsearchExceptionHandle { org.elasticsearch.search.SearchContextMissingException::new, 24, UNKNOWN_VERSION_ADDED), GENERAL_SCRIPT_EXCEPTION(org.elasticsearch.script.GeneralScriptException.class, org.elasticsearch.script.GeneralScriptException::new, 25, UNKNOWN_VERSION_ADDED), - BATCH_OPERATION_EXCEPTION(org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException.class, - org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException::new, 26, UNKNOWN_VERSION_ADDED), + // 26 was BatchOperationException SNAPSHOT_CREATION_EXCEPTION(org.elasticsearch.snapshots.SnapshotCreationException.class, org.elasticsearch.snapshots.SnapshotCreationException::new, 27, UNKNOWN_VERSION_ADDED), DELETE_FAILED_ENGINE_EXCEPTION(org.elasticsearch.index.engine.DeleteFailedEngineException.class, // deprecated in 6.0, remove in 7.0 diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 295f559da1698..f4b5e12de9dc0 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -63,7 +63,6 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ShardId; @@ -1015,7 +1014,7 @@ public long startTime() { abstract String id(); - abstract TYPE operationType(); + public abstract TYPE operationType(); } public static class Index extends Operation { @@ -1057,7 +1056,7 @@ public String id() { } @Override - TYPE operationType() { + public TYPE operationType() { return TYPE.INDEX; } @@ -1133,7 +1132,7 @@ public String id() { } @Override - TYPE operationType() { + public TYPE operationType() { return TYPE.DELETE; } @@ -1183,7 +1182,7 @@ String id() { } @Override - TYPE operationType() { + public TYPE operationType() { return TYPE.NO_OP; } diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index d22a93273c7e1..2c7afb00d1528 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -27,6 +27,7 @@ import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.Sort; import org.apache.lucene.search.similarities.Similarity; +import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -36,12 +37,15 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.TranslogRecoveryPerformer; +import org.elasticsearch.index.shard.TranslogOpToEngineOpConverter; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.IndexingMemoryController; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; + /* * Holds all the configuration that is used to create an {@link Engine}. * Once {@link Engine} has been created with this object, changes to this @@ -49,7 +53,6 @@ */ public final class EngineConfig { private final ShardId shardId; - private final TranslogRecoveryPerformer translogRecoveryPerformer; private final IndexSettings indexSettings; private final ByteSizeValue indexingBufferSize; private volatile boolean enableGcDeletes = true; @@ -70,6 +73,9 @@ public final class EngineConfig { private final ReferenceManager.RefreshListener refreshListeners; @Nullable private final Sort indexSort; + private final TranslogOpToEngineOpConverter translogOpToEngineOpConverter; + private final CheckedBiConsumer operationApplier; + private final RecoveryState.Translog translogStats; /** * Index setting to change the low level lucene codec used for writing new segments. @@ -112,9 +118,11 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool, IndexSettings indexSettings, Engine.Warmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, MergePolicy mergePolicy, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.EventListener eventListener, - TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, + QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig, TimeValue flushMergesAfter, ReferenceManager.RefreshListener refreshListeners, - Sort indexSort) { + Sort indexSort, TranslogOpToEngineOpConverter translogOpToEngineOpConverter, + CheckedBiConsumer operationApplier, + RecoveryState.Translog translogStats) { if (openMode == null) { throw new IllegalArgumentException("openMode must not be null"); } @@ -134,7 +142,6 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool, // there are not too many shards allocated to this node. Instead, IndexingMemoryController periodically checks // and refreshes the most heap-consuming shards when total indexing heap usage across all shards is too high: indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB); - this.translogRecoveryPerformer = translogRecoveryPerformer; this.queryCache = queryCache; this.queryCachingPolicy = queryCachingPolicy; this.translogConfig = translogConfig; @@ -142,6 +149,9 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool, this.openMode = openMode; this.refreshListeners = refreshListeners; this.indexSort = indexSort; + this.translogOpToEngineOpConverter = translogOpToEngineOpConverter; + this.operationApplier = operationApplier; + this.translogStats = translogStats; } /** @@ -262,15 +272,6 @@ public Similarity getSimilarity() { return similarity; } - /** - * Returns the {@link org.elasticsearch.index.shard.TranslogRecoveryPerformer} for this engine. This class is used - * to apply transaction log operations to the engine. It encapsulates all the logic to transfer the translog entry into - * an indexing operation. - */ - public TranslogRecoveryPerformer getTranslogRecoveryPerformer() { - return translogRecoveryPerformer; - } - /** * Return the cache to use for queries. */ @@ -340,4 +341,26 @@ public boolean isAutoGeneratedIDsOptimizationEnabled() { public Sort getIndexSort() { return indexSort; } + + /** + * Return a converter to turn Translog operations into Engine operations. + * Used during translog recovery, see also {@link Engine#recoverFromTranslog()} + */ + public TranslogOpToEngineOpConverter getTranslogOpToEngineOpConverter() { + return translogOpToEngineOpConverter; + } + + /** + * Returns applier that applies operation to the engine. Used during translog recovery, see also {@link Engine#recoverFromTranslog()} + */ + public CheckedBiConsumer getOperationApplier() { + return operationApplier; + } + + /** + * Returns statistics object for the translog. Used during translog recovery, see also {@link Engine#recoverFromTranslog()} + */ + public RecoveryState.Translog getTranslogStats() { + return translogStats; + } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 03007d96b9f23..908536c68f78a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -42,6 +42,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; @@ -70,10 +71,10 @@ import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogCorruptedException; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -277,7 +278,7 @@ public InternalEngine recoverFromTranslog() throws IOException { throw new IllegalStateException("Engine has already been recovered"); } try { - recoverFromTranslog(engineConfig.getTranslogRecoveryPerformer()); + recoverFromTranslogInternal(); } catch (Exception e) { try { pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush @@ -293,12 +294,31 @@ public InternalEngine recoverFromTranslog() throws IOException { return this; } - private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOException { + private void recoverFromTranslogInternal() throws IOException { Translog.TranslogGeneration translogGeneration = translog.getGeneration(); - final int opsRecovered; + int opsRecovered = 0; try { Translog.Snapshot snapshot = translog.newSnapshot(); - opsRecovered = handler.recoveryFromSnapshot(this, snapshot); + Translog.Operation operation; + config().getTranslogStats().totalOperations(snapshot.totalOperations()); + config().getTranslogStats().totalOperationsOnStart(snapshot.totalOperations()); + while ((operation = snapshot.next()) != null) { + try { + logger.trace("[translog] recover op {}", operation); + Operation engineOp = config().getTranslogOpToEngineOpConverter() + .convertToEngineOp(operation, Operation.Origin.LOCAL_TRANSLOG_RECOVERY); + config().getOperationApplier().accept(this, engineOp); + opsRecovered++; + config().getTranslogStats().incrementRecoveredOperations(); + } catch (ElasticsearchException e) { + if (e.status() == RestStatus.BAD_REQUEST) { + // mainly for MapperParsingException and Failure to detect xcontent + logger.info("ignoring recovery of a corrupt translog entry", e); + } else { + throw e; + } + } + } } catch (Exception e) { throw new EngineException(shardId, "failed to recover from translog", e); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 557b5996668bb..b1b874ca8d01a 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.shard; -import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexOptions; @@ -167,6 +166,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final IndexEventListener indexEventListener; private final QueryCachingPolicy cachingPolicy; private final Supplier indexSortSupplier; + private final TranslogOpToEngineOpConverter translogOpToEngineOpConverter; /** * How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this @@ -261,6 +261,7 @@ public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardP this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); + this.translogOpToEngineOpConverter = new TranslogOpToEngineOpConverter(shardId, mapperService, logger); // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) { @@ -552,6 +553,34 @@ static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); } + public Engine.Result applyOperation(Engine.Operation operation) throws IOException { + return applyOperation(getEngine(), operation); + } + + private Engine.Result applyOperation(Engine engine, Engine.Operation operation) throws IOException { + switch (operation.operationType()) { + case INDEX: + Engine.Index engineIndex = (Engine.Index) operation; + return index(engine, engineIndex); + case DELETE: + final Engine.Delete engineDelete = (Engine.Delete) operation; + return delete(engine, engineDelete); + case NO_OP: + final Engine.NoOp engineNoOp = (Engine.NoOp) operation; + return noOp(engine, engineNoOp); + default: + throw new IllegalStateException("No operation defined for [" + operation + "]"); + } + } + + private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) { + active.set(true); + if (logger.isTraceEnabled()) { + logger.trace("noop (seq# [{}])", noOp.seqNo()); + } + return engine.noOp(noOp); + } + public Engine.IndexResult index(Engine.Index index) throws IOException { ensureWriteAllowed(index); Engine engine = getEngine(); @@ -1013,21 +1042,8 @@ public void prepareForIndexRecovery() { assert currentEngineReference.get() == null; } - /** - * Applies all operations in the iterable to the current engine and returns the number of operations applied. - * This operation will stop applying operations once an operation failed to apply. - * Note: This method is typically used in peer recovery to replay remote transaction log entries. - */ - public int performBatchRecovery(Iterable operations) { - if (state != IndexShardState.RECOVERING) { - throw new IndexShardNotRecoveringException(shardId, state); - } - // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, - // we still invoke any onShardInactive listeners ... we won't sync'd flush in this case because we only do that on primary and this - // is a replica - active.set(true); - Engine engine = getEngine(); - return engine.config().getTranslogRecoveryPerformer().performBatchRecovery(engine, operations); + public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { + return translogOpToEngineOpConverter.convertToEngineOp(operation, origin); } /** @@ -1835,12 +1851,12 @@ private DocumentMapperForType docMapper(String type) { } private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) { - final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger); Sort indexSort = indexSortSupplier.get(); return new EngineConfig(openMode, shardId, threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(), - mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, - IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort); + mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, indexCache.query(), cachingPolicy, translogConfig, + IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort, + translogOpToEngineOpConverter, this::applyOperation, recoveryState.getTranslog()); } /** @@ -1953,6 +1969,11 @@ public final void sync(Translog.Location location, Consumer syncListe translogSyncProcessor.put(location, syncListener); } + public final void sync() throws IOException { + verifyNotClosed(); + getEngine().getTranslog().sync(); + } + /** * Returns the current translog durability mode */ @@ -2084,36 +2105,4 @@ public void addRefreshListener(Translog.Location location, Consumer lis refreshListeners.addOrNotify(location, listener); } - private class IndexShardRecoveryPerformer extends TranslogRecoveryPerformer { - - protected IndexShardRecoveryPerformer(ShardId shardId, MapperService mapperService, Logger logger) { - super(shardId, mapperService, logger); - } - - @Override - protected void operationProcessed() { - assert recoveryState != null; - recoveryState.getTranslog().incrementRecoveredOperations(); - } - - @Override - public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException { - assert recoveryState != null; - RecoveryState.Translog translogStats = recoveryState.getTranslog(); - translogStats.totalOperations(snapshot.totalOperations()); - translogStats.totalOperationsOnStart(snapshot.totalOperations()); - return super.recoveryFromSnapshot(engine, snapshot); - } - - @Override - protected void index(Engine engine, Engine.Index engineIndex) throws IOException { - IndexShard.this.index(engine, engineIndex); - } - - @Override - protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException { - IndexShard.this.delete(engine, engineDelete); - } - } - } diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogOpToEngineOpConverter.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogOpToEngineOpConverter.java new file mode 100644 index 0000000000000..954b512f3bb53 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogOpToEngineOpConverter.java @@ -0,0 +1,78 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.DocumentMapperForType; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.translog.Translog; + +import static org.elasticsearch.index.mapper.SourceToParse.source; + +/** + * The TranslogOpToEngineOpConverter encapsulates all the logic needed to transform a translog entry into an + * indexing operation including source parsing and field creation from the source. + */ +public class TranslogOpToEngineOpConverter { + private final MapperService mapperService; + private final Logger logger; + private final ShardId shardId; + + protected TranslogOpToEngineOpConverter(ShardId shardId, MapperService mapperService, Logger logger) { + this.shardId = shardId; + this.mapperService = mapperService; + this.logger = logger; + } + + protected DocumentMapperForType docMapper(String type) { + return mapperService.documentMapperWithAutoCreate(type); // protected for testing + } + + public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { + switch (operation.opType()) { + case INDEX: + Translog.Index index = (Translog.Index) operation; + // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all + // autoGeneratedID docs that are coming from the primary are updated correctly. + Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), + source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source())) + .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(), + index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, index.getAutoGeneratedIdTimestamp(), true); + return engineIndex; + case DELETE: + Translog.Delete delete = (Translog.Delete) operation; + final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), + delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), + origin, System.nanoTime()); + return engineDelete; + case NO_OP: + final Translog.NoOp noOp = (Translog.NoOp) operation; + final long seqNo = noOp.seqNo(); + final long primaryTerm = noOp.primaryTerm(); + final String reason = noOp.reason(); + final Engine.NoOp engineNoOp = + new Engine.NoOp(seqNo, primaryTerm, origin, System.nanoTime(), reason); + return engineNoOp; + default: + throw new IllegalStateException("No operation defined for [" + operation + "]"); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java deleted file mode 100644 index 668e957ae52ec..0000000000000 --- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.index.shard; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.mapper.DocumentMapperForType; -import org.elasticsearch.index.mapper.MapperException; -import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.mapper.Mapping; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.rest.RestStatus; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import static org.elasticsearch.index.mapper.SourceToParse.source; - -/** - * The TranslogRecoveryPerformer encapsulates all the logic needed to transform a translog entry into an - * indexing operation including source parsing and field creation from the source. - */ -public class TranslogRecoveryPerformer { - private final MapperService mapperService; - private final Logger logger; - private final Map recoveredTypes = new HashMap<>(); - private final ShardId shardId; - - protected TranslogRecoveryPerformer(ShardId shardId, MapperService mapperService, Logger logger) { - this.shardId = shardId; - this.mapperService = mapperService; - this.logger = logger; - } - - protected DocumentMapperForType docMapper(String type) { - return mapperService.documentMapperWithAutoCreate(type); // protected for testing - } - - /** - * Applies all operations in the iterable to the current engine and returns the number of operations applied. - * This operation will stop applying operations once an operation failed to apply. - * - * Throws a {@link MapperException} to be thrown if a mapping update is encountered. - */ - int performBatchRecovery(Engine engine, Iterable operations) { - int numOps = 0; - try { - for (Translog.Operation operation : operations) { - performRecoveryOperation(engine, operation, false, Engine.Operation.Origin.PEER_RECOVERY); - numOps++; - } - engine.getTranslog().sync(); - } catch (Exception e) { - throw new BatchOperationException(shardId, "failed to apply batch translog operation", numOps, e); - } - return numOps; - } - - public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException { - Translog.Operation operation; - int opsRecovered = 0; - while ((operation = snapshot.next()) != null) { - try { - performRecoveryOperation(engine, operation, true, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY); - opsRecovered++; - } catch (ElasticsearchException e) { - if (e.status() == RestStatus.BAD_REQUEST) { - // mainly for MapperParsingException and Failure to detect xcontent - logger.info("ignoring recovery of a corrupt translog entry", e); - } else { - throw e; - } - } - } - - return opsRecovered; - } - - public static class BatchOperationException extends ElasticsearchException { - - private final int completedOperations; - - public BatchOperationException(ShardId shardId, String msg, int completedOperations, Throwable cause) { - super(msg, cause); - setShard(shardId); - this.completedOperations = completedOperations; - } - - public BatchOperationException(StreamInput in) throws IOException{ - super(in); - completedOperations = in.readInt(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeInt(completedOperations); - } - - /** the number of successful operations performed before the exception was thrown */ - public int completedOperations() { - return completedOperations; - } - } - - private void maybeAddMappingUpdate(String type, Mapping update, String docId, boolean allowMappingUpdates) { - if (update == null) { - return; - } - if (allowMappingUpdates == false) { - throw new MapperException("mapping updates are not allowed (type: [" + type + "], id: [" + docId + "])"); - } - Mapping currentUpdate = recoveredTypes.get(type); - if (currentUpdate == null) { - recoveredTypes.put(type, update); - } else { - currentUpdate = currentUpdate.merge(update, false); - } - } - - /** - * Performs a single recovery operation. - * - * @param allowMappingUpdates true if mapping update should be accepted (but collected). Setting it to false will - * cause a {@link MapperException} to be thrown if an update - * is encountered. - */ - private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) throws IOException { - switch (operation.opType()) { - case INDEX: - Translog.Index index = (Translog.Index) operation; - // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all - // autoGeneratedID docs that are coming from the primary are updated correctly. - Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), - source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source())) - .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(), - index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, index.getAutoGeneratedIdTimestamp(), true); - maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates); - logger.trace("[translog] recover [index] op [({}, {})] of [{}][{}]", index.seqNo(), index.primaryTerm(), index.type(), index.id()); - index(engine, engineIndex); - break; - case DELETE: - Translog.Delete delete = (Translog.Delete) operation; - logger.trace("[translog] recover [delete] op [({}, {})] of [{}][{}]", delete.seqNo(), delete.primaryTerm(), delete.type(), delete.id()); - final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), - delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), - origin, System.nanoTime()); - delete(engine, engineDelete); - break; - case NO_OP: - final Translog.NoOp noOp = (Translog.NoOp) operation; - final long seqNo = noOp.seqNo(); - final long primaryTerm = noOp.primaryTerm(); - final String reason = noOp.reason(); - logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason); - final Engine.NoOp engineNoOp = - new Engine.NoOp(seqNo, primaryTerm, origin, System.nanoTime(), reason); - noOp(engine, engineNoOp); - break; - default: - throw new IllegalStateException("No operation defined for [" + operation + "]"); - } - operationProcessed(); - } - - protected void index(Engine engine, Engine.Index engineIndex) throws IOException { - engine.index(engineIndex); - } - - protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException { - engine.delete(engineDelete); - } - - protected void noOp(Engine engine, Engine.NoOp engineNoOp) { - engine.noOp(engineNoOp); - } - - /** - * Called once for every processed operation by this recovery performer. - * This can be used to get progress information on the translog execution. - */ - protected void operationProcessed() { - // noop - } - - - /** - * Returns the recovered types modifying the mapping during the recovery - */ - public Map getRecoveredTypes() { - return recoveredTypes; - } -} diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 8435fe4ee1e80..db789df087238 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -48,7 +48,6 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; -import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef; @@ -423,22 +422,12 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin try { recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps()); channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint())); - } catch (TranslogRecoveryPerformer.BatchOperationException exception) { - MapperException mapperException = (MapperException) ExceptionsHelper.unwrap(exception, MapperException.class); - if (mapperException == null) { - throw exception; - } + } catch (MapperException exception) { // in very rare cases a translog replay from primary is processed before a mapping update on this node // which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node. // we want to wait until these mappings are processed but also need to do some maintenance and roll back the // number of processed (completed) operations in this batch to ensure accounting is correct. - logger.trace( - (Supplier) () -> new ParameterizedMessage( - "delaying recovery due to missing mapping changes (rolling back stats for [{}] ops)", - exception.completedOperations()), - exception); - final RecoveryState.Translog translog = recoveryTarget.state().getTranslog(); - translog.decrementRecoveredOperations(exception.completedOperations()); // do the maintainance and rollback competed ops + logger.trace("delaying recovery due to missing mapping changes", exception); // we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be // canceled) observer.waitForNextChange(new ClusterStateObserver.Listener() { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 4c8c31779ac0d..3906627fd1fdf 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -509,7 +509,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl logger.trace("no translog operations to send"); } - final CancellableThreads.Interruptable sendBatch = + final CancellableThreads.IOInterruptable sendBatch = () -> targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps)); // send operations in batches @@ -535,7 +535,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl // check if this request is past bytes threshold, and if so, send it off if (size >= chunkSizeInBytes) { - cancellableThreads.execute(sendBatch); + cancellableThreads.executeIO(sendBatch); logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps); ops = 0; size = 0; @@ -545,7 +545,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl if (!operations.isEmpty() || totalSentOps == 0) { // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint - cancellableThreads.execute(sendBatch); + cancellableThreads.executeIO(sendBatch); } assert expectedTotalOps == skippedOps + totalSentOps diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 18557b5c7b84c..4838cea9bb9c2 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -40,9 +40,12 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardNotRecoveringException; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; @@ -375,12 +378,26 @@ public void ensureClusterStateVersion(long clusterStateVersion) { } @Override - public long indexTranslogOperations( - List operations, int totalTranslogOps) throws TranslogRecoveryPerformer.BatchOperationException { + public long indexTranslogOperations(List operations, int totalTranslogOps) throws MapperException, IOException { final RecoveryState.Translog translog = state().getTranslog(); translog.totalOperations(totalTranslogOps); assert indexShard().recoveryState() == state(); - indexShard().performBatchRecovery(operations); + int completedOps = 0; + if (indexShard().state() != IndexShardState.RECOVERING) { + throw new IndexShardNotRecoveringException(shardId, indexShard().state()); + } + for (Translog.Operation op : operations) { + Engine.Operation engineOp = indexShard().convertToEngineOp(op, Engine.Operation.Origin.PEER_RECOVERY); + if (engineOp instanceof Engine.Index && ((Engine.Index) engineOp).parsedDoc().dynamicMappingsUpdate() != null) { + translog.decrementRecoveredOperations(completedOps); // clean-up stats + throw new MapperException("mapping updates are not allowed (type: [" + engineOp.type() + "], id: [" + + ((Engine.Index) engineOp).id() + "])"); + } + completedOps++; + indexShard().applyOperation(engineOp); + translog.incrementRecoveredOperations(); + } + indexShard().sync(); return indexShard().getLocalCheckpoint(); } @@ -476,5 +493,4 @@ public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesR Path translogLocation() { return indexShard().shardPath().resolveTranslog(); } - } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 38f412fed734a..42cf1bc1ce19d 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -56,7 +56,7 @@ public interface RecoveryTargetHandler { * * @return the local checkpoint on the target shard */ - long indexTranslogOperations(List operations, int totalTranslogOps); + long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException; /** * Notifies the target of the files it is going to receive diff --git a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 764a6d3b3512d..4add6bce90071 100644 --- a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -62,10 +62,10 @@ import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.indices.IndexTemplateMissingException; import org.elasticsearch.indices.InvalidIndexTemplateException; import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException; +import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException; @@ -334,22 +334,6 @@ public void testRecoverFilesRecoveryException() throws IOException { assertTrue(ex.getCause() instanceof NullPointerException); } - public void testBatchOperationException() throws IOException { - ShardId id = new ShardId("foo", "_na_", 1); - TranslogRecoveryPerformer.BatchOperationException ex = serialize( - new TranslogRecoveryPerformer.BatchOperationException(id, "batched the fucker", 666, null)); - assertEquals(ex.getShardId(), id); - assertEquals(666, ex.completedOperations()); - assertEquals("batched the fucker", ex.getMessage()); - assertNull(ex.getCause()); - - ex = serialize(new TranslogRecoveryPerformer.BatchOperationException(null, "batched the fucker", -1, new NullPointerException())); - assertNull(ex.getShardId()); - assertEquals(-1, ex.completedOperations()); - assertEquals("batched the fucker", ex.getMessage()); - assertTrue(ex.getCause() instanceof NullPointerException); - } - public void testInvalidIndexTemplateException() throws IOException { InvalidIndexTemplateException ex = serialize(new InvalidIndexTemplateException("foo", "bar")); assertEquals(ex.getMessage(), "index_template [foo] invalid, cause [bar]"); @@ -702,7 +686,7 @@ public void testIds() { ids.put(23, org.elasticsearch.index.shard.IndexShardStartedException.class); ids.put(24, org.elasticsearch.search.SearchContextMissingException.class); ids.put(25, org.elasticsearch.script.GeneralScriptException.class); - ids.put(26, org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException.class); + ids.put(26, null); ids.put(27, org.elasticsearch.snapshots.SnapshotCreationException.class); ids.put(28, org.elasticsearch.index.engine.DeleteFailedEngineException.class); //deprecated in 6.0 ids.put(29, org.elasticsearch.index.engine.DocumentMissingException.class); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 9f4a33ada6e81..f7891946792e9 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -75,6 +75,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; @@ -122,7 +123,7 @@ import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; -import org.elasticsearch.index.shard.TranslogRecoveryPerformer; +import org.elasticsearch.index.shard.TranslogOpToEngineOpConverter; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryUtils; @@ -131,6 +132,7 @@ import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.mapper.MapperRegistry; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -153,6 +155,7 @@ import java.util.Base64; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -261,9 +264,9 @@ public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) { public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) { return new EngineConfig(openMode, config.getShardId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(), config.getStore(), config.getDeletionPolicy(), config.getMergePolicy(), analyzer, config.getSimilarity(), - new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), config.getQueryCache(), + new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), - config.getIndexSort()); + config.getIndexSort(), config.getTranslogOpToEngineOpConverter(), config.getOperationApplier(), new RecoveryState.Translog()); } @Override @@ -440,11 +443,12 @@ public void onFailedEngine(String reason, @Nullable Exception e) { // we don't need to notify anybody in this test } }; + TranslogHandler handler = new TranslogHandler(xContentRegistry(), IndexSettingsModule.newIndexSettings(shardId.getIndexName(), + indexSettings.getSettings()), logger); EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, deletionPolicy, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, - new TranslogHandler(xContentRegistry(), shardId.getIndexName(), indexSettings.getSettings(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), refreshListener, indexSort); + TimeValue.timeValueMinutes(5), refreshListener, indexSort, handler, handler, new RecoveryState.Translog()); return config; } @@ -2618,7 +2622,7 @@ public void testTranslogReplay() throws IOException { } assertVisibleCount(engine, numDocs); - TranslogHandler parser = (TranslogHandler) engine.config().getTranslogRecoveryPerformer(); + TranslogHandler parser = (TranslogHandler) engine.config().getTranslogOpToEngineOpConverter(); parser.mappingUpdate = dynamicUpdate(); engine.close(); @@ -2626,8 +2630,8 @@ public void testTranslogReplay() throws IOException { engine.recoverFromTranslog(); assertVisibleCount(engine, numDocs, false); - parser = (TranslogHandler) engine.config().getTranslogRecoveryPerformer(); - assertEquals(numDocs, parser.recoveredOps.get()); + parser = (TranslogHandler) engine.config().getTranslogOpToEngineOpConverter(); + assertEquals(numDocs, engine.config().getTranslogStats().recoveredOperations()); if (parser.mappingUpdate != null) { assertEquals(1, parser.getRecoveredTypes().size()); assertTrue(parser.getRecoveredTypes().containsKey("test")); @@ -2638,8 +2642,7 @@ public void testTranslogReplay() throws IOException { engine.close(); engine = createEngine(store, primaryTranslogDir); assertVisibleCount(engine, numDocs, false); - parser = (TranslogHandler) engine.config().getTranslogRecoveryPerformer(); - assertEquals(0, parser.recoveredOps.get()); + assertEquals(0, engine.config().getTranslogStats().recoveredOperations()); final boolean flush = randomBoolean(); int randomId = randomIntBetween(numDocs + 1, numDocs + 10); @@ -2667,8 +2670,7 @@ public void testTranslogReplay() throws IOException { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), numDocs + 1); assertThat(topDocs.totalHits, equalTo(numDocs + 1)); } - parser = (TranslogHandler) engine.config().getTranslogRecoveryPerformer(); - assertEquals(flush ? 1 : 2, parser.recoveredOps.get()); + assertEquals(flush ? 1 : 2, engine.config().getTranslogStats().recoveredOperations()); engine.delete(new Engine.Delete("test", Integer.toString(randomId), newUid(doc))); if (randomBoolean()) { engine.refresh("test"); @@ -2682,23 +2684,21 @@ public void testTranslogReplay() throws IOException { } } - public static class TranslogHandler extends TranslogRecoveryPerformer { + public static class TranslogHandler extends TranslogOpToEngineOpConverter + implements CheckedBiConsumer { private final MapperService mapperService; public Mapping mappingUpdate = null; + private final Map recoveredTypes = new HashMap<>(); - public final AtomicInteger recoveredOps = new AtomicInteger(0); - - public TranslogHandler(NamedXContentRegistry xContentRegistry, String indexName, Settings settings, Logger logger) { + public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings, Logger logger) { super(new ShardId("test", "_na_", 0), null, logger); - Index index = new Index(indexName, "_na_"); - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings); NamedAnalyzer defaultAnalyzer = new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer()); IndexAnalyzers indexAnalyzers = new IndexAnalyzers(indexSettings, defaultAnalyzer, defaultAnalyzer, defaultAnalyzer, Collections.emptyMap(), Collections.emptyMap()); SimilarityService similarityService = new SimilarityService(indexSettings, Collections.emptyMap()); MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry(); mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry, - () -> null); + () -> null); } @Override @@ -2709,8 +2709,32 @@ protected DocumentMapperForType docMapper(String type) { } @Override - protected void operationProcessed() { - recoveredOps.incrementAndGet(); + public void accept(Engine engine, Engine.Operation operation) throws IOException { + switch (operation.operationType()) { + case INDEX: + Engine.Index engineIndex = (Engine.Index) operation; + Mapping update = engineIndex.parsedDoc().dynamicMappingsUpdate(); + if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) { + recoveredTypes.compute(engineIndex.type(), (k, mapping) -> mapping == null ? update : mapping.merge(update, false)); + } + engine.index(engineIndex); + break; + case DELETE: + engine.delete((Engine.Delete) operation); + break; + case NO_OP: + engine.noOp((Engine.NoOp) operation); + break; + default: + throw new IllegalStateException("No operation defined for [" + operation + "]"); + } + } + + /** + * Returns the recovered types modifying the mapping during the recovery + */ + public Map getRecoveredTypes() { + return recoveredTypes; } } @@ -2740,9 +2764,10 @@ public void testRecoverFromForeignTranslog() throws IOException { EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, threadPool, config.getIndexSettings(), null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getAnalyzer(), - config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), + config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), config.getRefreshListeners(), null); + TimeValue.timeValueMinutes(5), config.getRefreshListeners(), null, + config.getTranslogOpToEngineOpConverter(), config.getOperationApplier(), config.getTranslogStats()); try { InternalEngine internalEngine = new InternalEngine(brokenConfig); diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index ddd69c084950f..1c7705d534afb 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -333,7 +333,8 @@ protected EngineFactory getEngineFactory(final ShardRouting routing) { replica, (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) { @Override - public long indexTranslogOperations(final List operations, final int totalTranslogOps) { + public long indexTranslogOperations(final List operations, final int totalTranslogOps) + throws IOException { // index a doc which is not part of the snapshot, but also does not complete on replica replicaEngineFactory.latchIndexers(); threadPool.generic().submit(() -> { @@ -445,7 +446,7 @@ private void blockIfNeeded(RecoveryState.Stage currentStage) { } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) { + public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { if (hasBlocked() == false) { blockIfNeeded(RecoveryState.Stage.TRANSLOG); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index a113132351b78..53bdea9498a70 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1431,7 +1431,7 @@ public void testTranslogRecoverySyncsTranslog() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) { + public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); assertFalse(replica.getTranslog().syncNeeded()); return localCheckpoint; @@ -1488,7 +1488,7 @@ public void prepareForTranslogOperations(int totalTranslogOps) throws IOExceptio } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) { + public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); // Shard should now be active since we did recover: assertTrue(replica.isActive()); diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 7ddd229a1172d..b8ef684708020 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -43,7 +43,6 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.InternalEngine; -import org.elasticsearch.index.engine.InternalEngineTests.TranslogHandler; import org.elasticsearch.index.fieldvisitor.SingleFieldsVisitor; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.ParseContext.Document; @@ -117,12 +116,11 @@ public void onFailedEngine(String reason, @Nullable Exception e) { // we don't need to notify anybody in this test } }; - TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), shardId.getIndexName(), Settings.EMPTY, logger); EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, threadPool, indexSettings, null, store, new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), newMergePolicy(), iwc.getAnalyzer(), - iwc.getSimilarity(), new CodecService(null, logger), eventListener, translogHandler, + iwc.getSimilarity(), new CodecService(null, logger), eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), listeners, null); + TimeValue.timeValueMinutes(5), listeners, null, null, null, null); engine = new InternalEngine(config); listeners.setTranslog(engine.getTranslog()); }