Skip to content

Commit

Permalink
Remove TranslogRecoveryPerformer
Browse files Browse the repository at this point in the history
  • Loading branch information
ywelsch committed May 24, 2017
1 parent b5adb3c commit ca5b9f9
Show file tree
Hide file tree
Showing 16 changed files with 270 additions and 362 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TcpTransport;

Expand Down Expand Up @@ -765,8 +766,7 @@ private enum ElasticsearchExceptionHandle {
org.elasticsearch.search.SearchContextMissingException::new, 24, UNKNOWN_VERSION_ADDED),
GENERAL_SCRIPT_EXCEPTION(org.elasticsearch.script.GeneralScriptException.class,
org.elasticsearch.script.GeneralScriptException::new, 25, UNKNOWN_VERSION_ADDED),
BATCH_OPERATION_EXCEPTION(org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException.class,
org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException::new, 26, UNKNOWN_VERSION_ADDED),
// 26 was BatchOperationException
SNAPSHOT_CREATION_EXCEPTION(org.elasticsearch.snapshots.SnapshotCreationException.class,
org.elasticsearch.snapshots.SnapshotCreationException::new, 27, UNKNOWN_VERSION_ADDED),
DELETE_FAILED_ENGINE_EXCEPTION(org.elasticsearch.index.engine.DeleteFailedEngineException.class, // deprecated in 6.0, remove in 7.0
Expand Down
9 changes: 4 additions & 5 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -1015,7 +1014,7 @@ public long startTime() {

abstract String id();

abstract TYPE operationType();
public abstract TYPE operationType();
}

public static class Index extends Operation {
Expand Down Expand Up @@ -1057,7 +1056,7 @@ public String id() {
}

@Override
TYPE operationType() {
public TYPE operationType() {
return TYPE.INDEX;
}

Expand Down Expand Up @@ -1133,7 +1132,7 @@ public String id() {
}

@Override
TYPE operationType() {
public TYPE operationType() {
return TYPE.DELETE;
}

Expand Down Expand Up @@ -1183,7 +1182,7 @@ String id() {
}

@Override
TYPE operationType() {
public TYPE operationType() {
return TYPE.NO_OP;
}

Expand Down
51 changes: 37 additions & 14 deletions core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.similarities.Similarity;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand All @@ -36,20 +37,22 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.shard.TranslogOpToEngineOpConverter;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;

/*
* Holds all the configuration that is used to create an {@link Engine}.
* Once {@link Engine} has been created with this object, changes to this
* object will affect the {@link Engine} instance.
*/
public final class EngineConfig {
private final ShardId shardId;
private final TranslogRecoveryPerformer translogRecoveryPerformer;
private final IndexSettings indexSettings;
private final ByteSizeValue indexingBufferSize;
private volatile boolean enableGcDeletes = true;
Expand All @@ -70,6 +73,9 @@ public final class EngineConfig {
private final ReferenceManager.RefreshListener refreshListeners;
@Nullable
private final Sort indexSort;
private final TranslogOpToEngineOpConverter translogOpToEngineOpConverter;
private final CheckedBiConsumer<Engine, Engine.Operation, IOException> operationApplier;
private final RecoveryState.Translog translogStats;

/**
* Index setting to change the low level lucene codec used for writing new segments.
Expand Down Expand Up @@ -112,9 +118,11 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
IndexSettings indexSettings, Engine.Warmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig, TimeValue flushMergesAfter, ReferenceManager.RefreshListener refreshListeners,
Sort indexSort) {
Sort indexSort, TranslogOpToEngineOpConverter translogOpToEngineOpConverter,
CheckedBiConsumer<Engine, Engine.Operation, IOException> operationApplier,
RecoveryState.Translog translogStats) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
Expand All @@ -134,14 +142,16 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
// there are not too many shards allocated to this node. Instead, IndexingMemoryController periodically checks
// and refreshes the most heap-consuming shards when total indexing heap usage across all shards is too high:
indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB);
this.translogRecoveryPerformer = translogRecoveryPerformer;
this.queryCache = queryCache;
this.queryCachingPolicy = queryCachingPolicy;
this.translogConfig = translogConfig;
this.flushMergesAfter = flushMergesAfter;
this.openMode = openMode;
this.refreshListeners = refreshListeners;
this.indexSort = indexSort;
this.translogOpToEngineOpConverter = translogOpToEngineOpConverter;
this.operationApplier = operationApplier;
this.translogStats = translogStats;
}

/**
Expand Down Expand Up @@ -262,15 +272,6 @@ public Similarity getSimilarity() {
return similarity;
}

/**
* Returns the {@link org.elasticsearch.index.shard.TranslogRecoveryPerformer} for this engine. This class is used
* to apply transaction log operations to the engine. It encapsulates all the logic to transfer the translog entry into
* an indexing operation.
*/
public TranslogRecoveryPerformer getTranslogRecoveryPerformer() {
return translogRecoveryPerformer;
}

/**
* Return the cache to use for queries.
*/
Expand Down Expand Up @@ -340,4 +341,26 @@ public boolean isAutoGeneratedIDsOptimizationEnabled() {
public Sort getIndexSort() {
return indexSort;
}

/**
* Return a converter to turn Translog operations into Engine operations.
* Used during translog recovery, see also {@link Engine#recoverFromTranslog()}
*/
public TranslogOpToEngineOpConverter getTranslogOpToEngineOpConverter() {
return translogOpToEngineOpConverter;
}

/**
* Returns applier that applies operation to the engine. Used during translog recovery, see also {@link Engine#recoverFromTranslog()}
*/
public CheckedBiConsumer<Engine, Engine.Operation, IOException> getOperationApplier() {
return operationApplier;
}

/**
* Returns statistics object for the translog. Used during translog recovery, see also {@link Engine#recoverFromTranslog()}
*/
public RecoveryState.Translog getTranslogStats() {
return translogStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -70,10 +71,10 @@
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -277,7 +278,7 @@ public InternalEngine recoverFromTranslog() throws IOException {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslog(engineConfig.getTranslogRecoveryPerformer());
recoverFromTranslogInternal();
} catch (Exception e) {
try {
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
Expand All @@ -293,12 +294,31 @@ public InternalEngine recoverFromTranslog() throws IOException {
return this;
}

private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOException {
private void recoverFromTranslogInternal() throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
int opsRecovered = 0;
try {
Translog.Snapshot snapshot = translog.newSnapshot();
opsRecovered = handler.recoveryFromSnapshot(this, snapshot);
Translog.Operation operation;
config().getTranslogStats().totalOperations(snapshot.totalOperations());
config().getTranslogStats().totalOperationsOnStart(snapshot.totalOperations());
while ((operation = snapshot.next()) != null) {
try {
logger.trace("[translog] recover op {}", operation);
Operation engineOp = config().getTranslogOpToEngineOpConverter()
.convertToEngineOp(operation, Operation.Origin.LOCAL_TRANSLOG_RECOVERY);
config().getOperationApplier().accept(this, engineOp);
opsRecovered++;
config().getTranslogStats().incrementRecoveredOperations();
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.BAD_REQUEST) {
// mainly for MapperParsingException and Failure to detect xcontent
logger.info("ignoring recovery of a corrupt translog entry", e);
} else {
throw e;
}
}
}
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
Expand Down
91 changes: 40 additions & 51 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.index.shard;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexOptions;
Expand Down Expand Up @@ -167,6 +166,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final IndexEventListener indexEventListener;
private final QueryCachingPolicy cachingPolicy;
private final Supplier<Sort> indexSortSupplier;
private final TranslogOpToEngineOpConverter translogOpToEngineOpConverter;

/**
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
Expand Down Expand Up @@ -261,6 +261,7 @@ public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardP
this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings,
bigArrays);
this.translogOpToEngineOpConverter = new TranslogOpToEngineOpConverter(shardId, mapperService, logger);
// the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis
if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) {
Expand Down Expand Up @@ -552,6 +553,34 @@ static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse
return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry);
}

public Engine.Result applyOperation(Engine.Operation operation) throws IOException {
return applyOperation(getEngine(), operation);
}

private Engine.Result applyOperation(Engine engine, Engine.Operation operation) throws IOException {
switch (operation.operationType()) {
case INDEX:
Engine.Index engineIndex = (Engine.Index) operation;
return index(engine, engineIndex);
case DELETE:
final Engine.Delete engineDelete = (Engine.Delete) operation;
return delete(engine, engineDelete);
case NO_OP:
final Engine.NoOp engineNoOp = (Engine.NoOp) operation;
return noOp(engine, engineNoOp);
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
}

private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) {
active.set(true);
if (logger.isTraceEnabled()) {
logger.trace("noop (seq# [{}])", noOp.seqNo());
}
return engine.noOp(noOp);
}

public Engine.IndexResult index(Engine.Index index) throws IOException {
ensureWriteAllowed(index);
Engine engine = getEngine();
Expand Down Expand Up @@ -1013,21 +1042,8 @@ public void prepareForIndexRecovery() {
assert currentEngineReference.get() == null;
}

/**
* Applies all operations in the iterable to the current engine and returns the number of operations applied.
* This operation will stop applying operations once an operation failed to apply.
* Note: This method is typically used in peer recovery to replay remote transaction log entries.
*/
public int performBatchRecovery(Iterable<Translog.Operation> operations) {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still invoke any onShardInactive listeners ... we won't sync'd flush in this case because we only do that on primary and this
// is a replica
active.set(true);
Engine engine = getEngine();
return engine.config().getTranslogRecoveryPerformer().performBatchRecovery(engine, operations);
public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
return translogOpToEngineOpConverter.convertToEngineOp(operation, origin);
}

/**
Expand Down Expand Up @@ -1835,12 +1851,12 @@ private DocumentMapperForType docMapper(String type) {
}

private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger);
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(openMode, shardId,
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort);
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, indexCache.query(), cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort,
translogOpToEngineOpConverter, this::applyOperation, recoveryState.getTranslog());
}

/**
Expand Down Expand Up @@ -1953,6 +1969,11 @@ public final void sync(Translog.Location location, Consumer<Exception> syncListe
translogSyncProcessor.put(location, syncListener);
}

public final void sync() throws IOException {
verifyNotClosed();
getEngine().getTranslog().sync();
}

/**
* Returns the current translog durability mode
*/
Expand Down Expand Up @@ -2084,36 +2105,4 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
refreshListeners.addOrNotify(location, listener);
}

private class IndexShardRecoveryPerformer extends TranslogRecoveryPerformer {

protected IndexShardRecoveryPerformer(ShardId shardId, MapperService mapperService, Logger logger) {
super(shardId, mapperService, logger);
}

@Override
protected void operationProcessed() {
assert recoveryState != null;
recoveryState.getTranslog().incrementRecoveredOperations();
}

@Override
public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException {
assert recoveryState != null;
RecoveryState.Translog translogStats = recoveryState.getTranslog();
translogStats.totalOperations(snapshot.totalOperations());
translogStats.totalOperationsOnStart(snapshot.totalOperations());
return super.recoveryFromSnapshot(engine, snapshot);
}

@Override
protected void index(Engine engine, Engine.Index engineIndex) throws IOException {
IndexShard.this.index(engine, engineIndex);
}

@Override
protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException {
IndexShard.this.delete(engine, engineDelete);
}
}

}
Loading

0 comments on commit ca5b9f9

Please sign in to comment.