From 38596860f69ed41c395aa41babc9a0a6949315a5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 12 Dec 2020 10:30:18 -0500 Subject: [PATCH] Add index commit id to searcher (#63963) This change assigns the id of an index commit to a searcher, so we can retry search requests on another shard copy if they have the same index commit. --- .../indices/state/CloseIndexIT.java | 30 +++++++++++++++++++ .../elasticsearch/common/lucene/Lucene.java | 8 +++++ .../index/engine/CommitStats.java | 2 +- .../elasticsearch/index/engine/Engine.java | 9 ++++++ .../index/engine/ReadOnlyEngine.java | 23 ++++++++++++++ .../index/engine/FrozenEngine.java | 5 ++++ 6 files changed, 76 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java index 5ae4145e2958..676ce4ce7f4d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -34,9 +34,11 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.IndicesService; @@ -478,6 +480,34 @@ public void testResyncPropagatePrimaryTerm() throws Exception { } } + public void testCommitIdInSearcher() throws Exception { + final String indexName = "test_commit_id"; + createIndex(indexName, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50)) + .mapToObj(n -> client().prepareIndex(indexName).setSource("num", n)).collect(toList())); + ensureGreen(indexName); + assertAcked(client().admin().indices().prepareClose(indexName)); + assertIndexIsClosed(indexName); + ensureGreen(indexName); + final String nodeWithPrimary = Iterables.get(internalCluster().nodesInclude(indexName), 0); + IndexShard shard = internalCluster().getInstance(IndicesService.class, nodeWithPrimary) + .indexService(resolveIndex(indexName)).getShard(0); + final String commitId; + try (Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier(randomFrom(Engine.SearcherScope.values()))) { + assertNotNull(searcherSupplier.getCommitId()); + commitId = searcherSupplier.getCommitId(); + } + internalCluster().restartNode(nodeWithPrimary); + ensureGreen(indexName); + shard = internalCluster().getInstance(IndicesService.class, nodeWithPrimary).indexService(resolveIndex(indexName)).getShard(0); + try (Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier(randomFrom(Engine.SearcherScope.values()))) { + assertThat(searcherSupplier.getCommitId(), equalTo(commitId)); + } + } + private static void closeIndices(final String... indices) { closeIndices(client().admin().indices().prepareClose(indices)); } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 3ca6582cc0b2..6ffbd8aa7ee4 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -99,6 +99,7 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -823,6 +824,13 @@ public void delete() { } } + /** + * Returns a base64 encoded string of the commit id of the given {@link SegmentInfos} + */ + public static String getCommitId(SegmentInfos segmentInfos) { + return Base64.getEncoder().encodeToString(segmentInfos.getId()); + } + /** * Return a {@link Bits} view of the provided scorer. * NOTE: that the returned {@link Bits} instance MUST be consumed in order. diff --git a/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java b/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java index 7686c19bd12d..5f936658090a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java @@ -44,7 +44,7 @@ public CommitStats(SegmentInfos segmentInfos) { userData = MapBuilder.newMapBuilder().putAll(segmentInfos.getUserData()).immutableMap(); // lucene calls the current generation, last generation. generation = segmentInfos.getLastGeneration(); - id = Base64.getEncoder().encodeToString(segmentInfos.getId()); + id = Lucene.getCommitId(segmentInfos); numDocs = Lucene.getNumDocs(segmentInfos); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index b767313eb69a..ac4eb6a992eb 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1242,6 +1242,15 @@ public final void close() { protected abstract void doClose(); protected abstract Searcher acquireSearcherInternal(String source); + + /** + * Returns a commit id associated with this searcher if it's opened from an index commit; otherwise, return null. Two searchers + * with the same commit id must have identical Lucene level indices (i.e., identical segments with same docs using same doc-ids). + */ + @Nullable + public String getCommitId() { + return null; + } } public static final class Searcher extends IndexSearcher implements Releasable { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 48ce733b6904..c625d6e50d82 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -79,6 +79,7 @@ public class ReadOnlyEngine extends Engine { private final boolean requireCompleteHistory; protected volatile TranslogStats translogStats; + protected final String commitId; /** * Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened @@ -110,6 +111,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats // yet this makes sure nobody else does. including some testing tools that try to be messy indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null; this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory); + this.commitId = Lucene.getCommitId(lastCommittedSegmentInfos); if (seqNoStats == null) { seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos); ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats); @@ -556,4 +558,25 @@ public ShardLongFieldRange getRawFieldRange(String field) throws IOException { } } + + @Override + public SearcherSupplier acquireSearcherSupplier(Function wrapper, SearcherScope scope) throws EngineException { + final SearcherSupplier delegate = super.acquireSearcherSupplier(wrapper, scope); + return new SearcherSupplier(Function.identity()) { + @Override + protected void doClose() { + delegate.close(); + } + + @Override + protected Searcher acquireSearcherInternal(String source) { + return delegate.acquireSearcherInternal(source); + } + + @Override + public String getCommitId() { + return commitId; + } + }; + } } diff --git a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index d5968589ff5e..9f578fc89195 100644 --- a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -193,6 +193,11 @@ public Searcher acquireSearcherInternal(String source) { protected void doClose() { store.decRef(); } + + @Override + public String getCommitId() { + return commitId; + } }; }