Skip to content

Commit

Permalink
drop index.shard.check_on_startup: fix
Browse files Browse the repository at this point in the history
Relates #31389
  • Loading branch information
Vladimir Dolzhenko committed Jul 27, 2018
1 parent 7aa5365 commit 843f977
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 47 deletions.
6 changes: 0 additions & 6 deletions docs/reference/index-modules.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,6 @@ corruption is detected, it will prevent the shard from being opened. Accepts:
Check for both physical and logical corruption. This is much more
expensive in terms of CPU and memory usage.

`fix`::

Check for both physical and logical corruption. Segments that were reported
as corrupted will be automatically removed. This option *may result in data loss*.
Use with extreme caution!

WARNING: Expert only. Checking shards may take a lot of time on large indices.
--

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,10 @@ public final class IndexSettings {
switch(s) {
case "false":
case "true":
case "fix":
case "checksum":
return s;
default:
throw new IllegalArgumentException("unknown value for [index.shard.check_on_startup] must be one of [true, false, fix, checksum] but was: " + s);
throw new IllegalArgumentException("unknown value for [index.shard.check_on_startup] must be one of [true, false, checksum] but was: " + s);
}
}, Property.IndexScope);

Expand Down
18 changes: 5 additions & 13 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,7 @@ private void innerOpenEngineAndTranslog() throws IOException {
}
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
// also check here, before we apply the translog
if (Booleans.isTrue(checkIndexOnStartup)) {
if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) {
try {
checkIndex();
} catch (IOException ex) {
Expand Down Expand Up @@ -1890,6 +1890,9 @@ void checkIndex() throws IOException {
if (store.tryIncRef()) {
try {
doCheckIndex();
} catch (IOException e) {
store.markStoreCorrupted(e);
throw e;
} finally {
store.decRef();
}
Expand Down Expand Up @@ -1933,18 +1936,7 @@ private void doCheckIndex() throws IOException {
return;
}
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
if ("fix".equals(checkIndexOnStartup)) {
if (logger.isDebugEnabled()) {
logger.debug("fixing index, writing new segments file ...");
}
store.exorciseIndex(status);
if (logger.isDebugEnabled()) {
logger.debug("index fixed, wrote new segments file \"{}\"", status.segmentsFileName);
}
} else {
// only throw a failure if we are not going to fix the index
throw new IllegalStateException("index check failure but can't fix it");
}
throw new IllegalStateException("index check failure");
}
}

Expand Down
15 changes: 2 additions & 13 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
static final int VERSION_STACK_TRACE = 1; // we write the stack trace too since 1.4.0
static final int VERSION_START = 0;
static final int VERSION = VERSION_WRITE_THROWABLE;
static final String CORRUPTED = "corrupted_";
// public is for test purposes
public static final String CORRUPTED = "corrupted_";
public static final Setting<TimeValue> INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING =
Setting.timeSetting("index.store.stats_refresh_interval", TimeValue.timeValueSeconds(10), Property.IndexScope);

Expand Down Expand Up @@ -360,18 +361,6 @@ public CheckIndex.Status checkIndex(PrintStream out) throws IOException {
}
}

/**
* Repairs the index using the previous returned status from {@link #checkIndex(PrintStream)}.
*/
public void exorciseIndex(CheckIndex.Status status) throws IOException {
metadataLock.writeLock().lock();
try (CheckIndex checkIndex = new CheckIndex(directory)) {
checkIndex.exorciseIndex(status);
} finally {
metadataLock.writeLock().unlock();
}
}

public StoreStats stats() throws IOException {
ensureOpen();
return new StoreStats(directory.estimateSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testIndexTemplateInvalidNumberOfShards() {
containsString("Failed to parse value [0] for setting [index.number_of_shards] must be >= 1"));
assertThat(throwables.get(0).getMessage(),
containsString("unknown value for [index.shard.check_on_startup] " +
"must be one of [true, false, fix, checksum] but was: blargh"));
"must be one of [true, false, checksum] but was: blargh"));
}

public void testIndexTemplateValidationAccumulatesValidationErrors() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BaseDirectoryWrapper;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
Expand All @@ -53,6 +55,7 @@
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
Expand Down Expand Up @@ -93,6 +96,7 @@
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.translog.TestTranslog;
Expand All @@ -110,6 +114,7 @@
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.FieldMaskingReader;
import org.elasticsearch.test.VersionUtils;
Expand All @@ -118,7 +123,11 @@

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1216,7 +1225,7 @@ public String[] listAll() throws IOException {
};

try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) {
IndexShard shard = newShard(shardRouting, shardPath, metaData, store,
IndexShard shard = newShard(shardRouting, shardPath, metaData, i -> store,
null, new InternalEngineFactory(), () -> {
}, EMPTY_EVENT_LISTENER);
AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false);
Expand Down Expand Up @@ -2561,6 +2570,104 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept
closeShards(newShard);
}

public void testIndexCheckChecksum() throws Exception {
final boolean primary = true;

IndexShard indexShard = newStartedShard(primary);

final long numDocs = between(10, 100);
for (long i = 0; i < numDocs; i++) {
indexDoc(indexShard, "_doc", Long.toString(i), "{}");
}
indexShard.flush(new FlushRequest());
closeShards(indexShard);

// start shard with checksum - it has to pass successfully

final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
primary ? RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE
);
final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData())
.settings(Settings.builder()
.put(indexShard.indexSettings.getSettings())
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum"))
.build();

final ShardPath shardPath = indexShard.shardPath();

final IndexShard newShard = newShard(shardRouting, shardPath, indexMetaData,
null, null, indexShard.engineFactory,
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);

closeShards(newStartedShard(p -> newShard, primary));

// corrupt files
final Path indexPath = shardPath.getDataPath().resolve("index");
final Path[] filesToCorrupt =
Files.walk(indexPath)
.filter(p -> Files.isRegularFile(p) && IndexWriter.WRITE_LOCK_NAME.equals(p.getFileName().toString()) == false)
.toArray(Path[]::new);
CorruptionUtils.corruptFile(random(), filesToCorrupt);

// check that corrupt marker is *NOT* there
final AtomicInteger corruptedMarkerCount = new AtomicInteger();
final SimpleFileVisitor<Path> corruptedVisitor = new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED)) {
corruptedMarkerCount.incrementAndGet();
}
return FileVisitResult.CONTINUE;
}
};
Files.walkFileTree(indexPath, corruptedVisitor);

assertThat("store is clean",
corruptedMarkerCount.get(), equalTo(0));

// storeProvider that does not perform check index on close - it is corrupted
final CheckedFunction<IndexSettings, Store, IOException> storeProvider = indexSettings -> {
final ShardId shardId = shardPath.getShardId();
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
@Override
public Directory newDirectory() throws IOException {
final BaseDirectoryWrapper baseDirectoryWrapper = newFSDirectory(shardPath.resolveIndex());
// index is corrupted - don't even try to check index on close - it fails
baseDirectoryWrapper.setCheckIndexOnClose(false);
return baseDirectoryWrapper;
}
};
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
};

// try to start shard on corrupted files
final IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData,
storeProvider, null, indexShard.engineFactory,
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);

expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, primary));
closeShards(corruptedShard);

// check that corrupt marker is there
Files.walkFileTree(indexPath, corruptedVisitor);
assertThat("store has to be marked as corrupted",
corruptedMarkerCount.get(), equalTo(1));

// try to start another time shard on corrupted files
final IndexShard corruptedShard2 = newShard(shardRouting, shardPath, indexMetaData,
storeProvider, null, indexShard.engineFactory,
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);

expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard2, primary));
closeShards(corruptedShard2);

// check that corrupt marker is there
corruptedMarkerCount.set(0);
Files.walkFileTree(indexPath, corruptedVisitor);
assertThat("store still has a single corrupt marker",
corruptedMarkerCount.get(), equalTo(1));
}

/**
* Simulates a scenario that happens when we are async fetching snapshot metadata from GatewayService
* and checking index concurrently. This should always be possible without any exception.
Expand All @@ -2584,7 +2691,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData())
.settings(Settings.builder()
.put(indexShard.indexSettings.getSettings())
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix")))
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum")))
.build();
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData,
null, null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
Expand Down
Loading

0 comments on commit 843f977

Please sign in to comment.