Skip to content

Commit

Permalink
Remove TranslogRecoveryPerformer (#24858)
Browse files Browse the repository at this point in the history
Splits TranslogRecoveryPerformer into three parts:
- the translog operation to engine operation converter
- the operation perfomer (that indexes the operation into the engine)
- the translog statistics (for which there is already RecoveryState.Translog)
This makes it possible for peer recovery to use the same IndexShard interface as bulk shard requests (i.e. Engine operations instead of Translog operations). It also pushes the "fail on bad mapping" logic outside of IndexShard. Future pull requests could unify the BulkShard and peer recovery path even more.
  • Loading branch information
ywelsch authored Jun 7, 2017
1 parent c8bf7ec commit 26ec891
Show file tree
Hide file tree
Showing 18 changed files with 390 additions and 369 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -765,8 +765,7 @@ private enum ElasticsearchExceptionHandle {
org.elasticsearch.search.SearchContextMissingException::new, 24, UNKNOWN_VERSION_ADDED),
GENERAL_SCRIPT_EXCEPTION(org.elasticsearch.script.GeneralScriptException.class,
org.elasticsearch.script.GeneralScriptException::new, 25, UNKNOWN_VERSION_ADDED),
BATCH_OPERATION_EXCEPTION(org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException.class,
org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException::new, 26, UNKNOWN_VERSION_ADDED),
// 26 was BatchOperationException
SNAPSHOT_CREATION_EXCEPTION(org.elasticsearch.snapshots.SnapshotCreationException.class,
org.elasticsearch.snapshots.SnapshotCreationException::new, 27, UNKNOWN_VERSION_ADDED),
DELETE_FAILED_ENGINE_EXCEPTION(org.elasticsearch.index.engine.DeleteFailedEngineException.class, // deprecated in 6.0, remove in 7.0
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,7 @@ public long startTime() {

abstract String id();

abstract TYPE operationType();
public abstract TYPE operationType();
}

public static class Index extends Operation {
Expand Down Expand Up @@ -1050,7 +1050,7 @@ public String id() {
}

@Override
TYPE operationType() {
public TYPE operationType() {
return TYPE.INDEX;
}

Expand Down Expand Up @@ -1126,7 +1126,7 @@ public String id() {
}

@Override
TYPE operationType() {
public TYPE operationType() {
return TYPE.DELETE;
}

Expand Down Expand Up @@ -1176,7 +1176,7 @@ String id() {
}

@Override
TYPE operationType() {
public TYPE operationType() {
return TYPE.NO_OP;
}

Expand Down
32 changes: 18 additions & 14 deletions core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.List;

/*
Expand All @@ -50,7 +51,6 @@
*/
public final class EngineConfig {
private final ShardId shardId;
private final TranslogRecoveryPerformer translogRecoveryPerformer;
private final IndexSettings indexSettings;
private final ByteSizeValue indexingBufferSize;
private volatile boolean enableGcDeletes = true;
Expand All @@ -70,6 +70,7 @@ public final class EngineConfig {
private final List<ReferenceManager.RefreshListener> refreshListeners;
@Nullable
private final Sort indexSort;
private final TranslogRecoveryRunner translogRecoveryRunner;

/**
* Index setting to change the low level lucene codec used for writing new segments.
Expand Down Expand Up @@ -112,9 +113,9 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
IndexSettings indexSettings, Engine.Warmer warmer, Store store,
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig, TimeValue flushMergesAfter, List<ReferenceManager.RefreshListener> refreshListeners,
Sort indexSort) {
Sort indexSort, TranslogRecoveryRunner translogRecoveryRunner) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
Expand All @@ -133,14 +134,14 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
// there are not too many shards allocated to this node. Instead, IndexingMemoryController periodically checks
// and refreshes the most heap-consuming shards when total indexing heap usage across all shards is too high:
indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB);
this.translogRecoveryPerformer = translogRecoveryPerformer;
this.queryCache = queryCache;
this.queryCachingPolicy = queryCachingPolicy;
this.translogConfig = translogConfig;
this.flushMergesAfter = flushMergesAfter;
this.openMode = openMode;
this.refreshListeners = refreshListeners;
this.indexSort = indexSort;
this.translogRecoveryRunner = translogRecoveryRunner;
}

/**
Expand Down Expand Up @@ -253,15 +254,6 @@ public Similarity getSimilarity() {
return similarity;
}

/**
* Returns the {@link org.elasticsearch.index.shard.TranslogRecoveryPerformer} for this engine. This class is used
* to apply transaction log operations to the engine. It encapsulates all the logic to transfer the translog entry into
* an indexing operation.
*/
public TranslogRecoveryPerformer getTranslogRecoveryPerformer() {
return translogRecoveryPerformer;
}

/**
* Return the cache to use for queries.
*/
Expand Down Expand Up @@ -297,6 +289,18 @@ public OpenMode getOpenMode() {
return openMode;
}

@FunctionalInterface
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
}

/**
* Returns a runner that implements the translog recovery from the given snapshot
*/
public TranslogRecoveryRunner getTranslogRecoveryRunner() {
return translogRecoveryRunner;
}

/**
* Engine open mode defines how the engine should be opened or in other words what the engine should expect
* to recover from. We either create a brand new engine with a new index and translog or we recover from an existing index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
Expand Down Expand Up @@ -286,7 +285,7 @@ public InternalEngine recoverFromTranslog() throws IOException {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslog(engineConfig.getTranslogRecoveryPerformer());
recoverFromTranslogInternal();
} catch (Exception e) {
try {
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
Expand All @@ -302,12 +301,12 @@ public InternalEngine recoverFromTranslog() throws IOException {
return this;
}

private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOException {
private void recoverFromTranslogInternal() throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
try {
Translog.Snapshot snapshot = translog.newSnapshot();
opsRecovered = handler.recoveryFromSnapshot(this, snapshot);
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
Expand Down
119 changes: 69 additions & 50 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.index.shard;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexOptions;
Expand Down Expand Up @@ -116,6 +115,7 @@
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.suggest.completion.CompletionFieldStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -167,6 +167,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final IndexEventListener indexEventListener;
private final QueryCachingPolicy cachingPolicy;
private final Supplier<Sort> indexSortSupplier;
private final TranslogOpToEngineOpConverter translogOpToEngineOpConverter;

/**
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
Expand Down Expand Up @@ -259,6 +260,7 @@ public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardP

this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
this.translogOpToEngineOpConverter = new TranslogOpToEngineOpConverter(shardId, mapperService);
// the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis
if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) {
Expand Down Expand Up @@ -571,6 +573,37 @@ static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse
return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry);
}

/**
* Applies an engine operation to the shard, which can be either an index, delete or noop operation.
*/
public Engine.Result applyOperation(Engine.Operation operation) throws IOException {
return applyOperation(getEngine(), operation);
}

private Engine.Result applyOperation(Engine engine, Engine.Operation operation) throws IOException {
switch (operation.operationType()) {
case INDEX:
Engine.Index engineIndex = (Engine.Index) operation;
return index(engine, engineIndex);
case DELETE:
final Engine.Delete engineDelete = (Engine.Delete) operation;
return delete(engine, engineDelete);
case NO_OP:
final Engine.NoOp engineNoOp = (Engine.NoOp) operation;
return noOp(engine, engineNoOp);
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
}

private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) {
active.set(true);
if (logger.isTraceEnabled()) {
logger.trace("noop (seq# [{}])", noOp.seqNo());
}
return engine.noOp(noOp);
}

public Engine.IndexResult index(Engine.Index index) throws IOException {
ensureWriteAllowed(index);
Engine engine = getEngine();
Expand Down Expand Up @@ -1019,21 +1052,33 @@ public void prepareForIndexRecovery() {
assert currentEngineReference.get() == null;
}

/**
* Applies all operations in the iterable to the current engine and returns the number of operations applied.
* This operation will stop applying operations once an operation failed to apply.
* Note: This method is typically used in peer recovery to replay remote transaction log entries.
*/
public int performBatchRecovery(Iterable<Translog.Operation> operations) {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
return translogOpToEngineOpConverter.convertToEngineOp(operation, origin);
}

// package-private for testing
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException {
recoveryState.getTranslog().totalOperations(snapshot.totalOperations());
recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations());
int opsRecovered = 0;
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
try {
logger.trace("[translog] recover op {}", operation);
Engine.Operation engineOp = convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY);
applyOperation(engine, engineOp);
opsRecovered++;
recoveryState.getTranslog().incrementRecoveredOperations();
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.BAD_REQUEST) {
// mainly for MapperParsingException and Failure to detect xcontent
logger.info("ignoring recovery of a corrupt translog entry", e);
} else {
throw e;
}
}
}
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still invoke any onShardInactive listeners ... we won't sync'd flush in this case because we only do that on primary and this
// is a replica
active.set(true);
Engine engine = getEngine();
return engine.config().getTranslogRecoveryPerformer().performBatchRecovery(engine, operations);
return opsRecovered;
}

/**
Expand Down Expand Up @@ -1841,13 +1886,14 @@ private DocumentMapperForType docMapper(String type) {
}

private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger);
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(openMode, shardId,
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
indexCache.query(), cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort);
Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort,
this::runTranslogRecovery);
}

/**
Expand Down Expand Up @@ -1960,6 +2006,11 @@ public final void sync(Translog.Location location, Consumer<Exception> syncListe
translogSyncProcessor.put(location, syncListener);
}

public final void sync() throws IOException {
verifyNotClosed();
getEngine().getTranslog().sync();
}

/**
* Returns the current translog durability mode
*/
Expand Down Expand Up @@ -2091,38 +2142,6 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
refreshListeners.addOrNotify(location, listener);
}

private class IndexShardRecoveryPerformer extends TranslogRecoveryPerformer {

protected IndexShardRecoveryPerformer(ShardId shardId, MapperService mapperService, Logger logger) {
super(shardId, mapperService, logger);
}

@Override
protected void operationProcessed() {
assert recoveryState != null;
recoveryState.getTranslog().incrementRecoveredOperations();
}

@Override
public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException {
assert recoveryState != null;
RecoveryState.Translog translogStats = recoveryState.getTranslog();
translogStats.totalOperations(snapshot.totalOperations());
translogStats.totalOperationsOnStart(snapshot.totalOperations());
return super.recoveryFromSnapshot(engine, snapshot);
}

@Override
protected void index(Engine engine, Engine.Index engineIndex) throws IOException {
IndexShard.this.index(engine, engineIndex);
}

@Override
protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException {
IndexShard.this.delete(engine, engineDelete);
}
}

private static class RefreshMetricUpdater implements ReferenceManager.RefreshListener {

private final MeanMetric refreshMetric;
Expand Down
Loading

0 comments on commit 26ec891

Please sign in to comment.