Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add index commit id to searcher #63963

Merged
merged 18 commits into from
Dec 12, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,17 @@
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.security.MessageDigest;
import java.util.function.BooleanSupplier;

/**
* A {@link org.apache.lucene.index.FilterDirectoryReader} that exposes
Expand All @@ -35,12 +42,48 @@ public final class ElasticsearchDirectoryReader extends FilterDirectoryReader {

private final ShardId shardId;
private final FilterDirectoryReader.SubReaderWrapper wrapper;
private final CheckedFunction<DirectoryReader, String, IOException> readerIdGenerator;
private final String readerId;

private ElasticsearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper,
ShardId shardId) throws IOException {
private ElasticsearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper, ShardId shardId,
CheckedFunction<DirectoryReader, String, IOException> readerIdGenerator) throws IOException {
super(in, wrapper);
this.wrapper = wrapper;
this.shardId = shardId;
this.readerIdGenerator = readerIdGenerator;
this.readerId = readerIdGenerator.apply(in);
}

/**
* If two readers have the same reader id, then their underlying reader must consist of the same list of segments.
*/
@Nullable
public String getReaderId() {
return readerId;
}

public static String getReaderIdFromSegmentInfos(SegmentInfos segmentInfos, BooleanSupplier fullyCommitted) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
// Here we prefer using an id composed of the ids of the underlying segments instead of the id of the commit because
// the commit id changes whenever IndexWriter#commit is called although the segment files stay unchanged. A file-based
// recovery creates a new commit to associate the new translog uuid. Hence, the commit ids of the primary and replicas
// are always different although they can have the identical segment files.
final MessageDigest md = MessageDigests.sha256();
for (SegmentCommitInfo sci : segmentInfos) {
final byte[] segmentId = sci.getId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not clear to me that this is valid across different shard copies. The id generation starts somewhere random and then increments. I acknowledge the risk is small and I did not dig deeply into whether this increases the risk of collissions over using more standard uuid generation.

if (segmentId != null) {
md.update(segmentId);
} else {
// old segment
if (fullyCommitted.getAsBoolean()) {
md.reset();
md.update(segmentInfos.getId());
break;
} else {
return null;
}
}
}
return MessageDigests.toHexString(md.digest());
}

/**
Expand All @@ -58,19 +101,27 @@ public CacheHelper getReaderCacheHelper() {

@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return new ElasticsearchDirectoryReader(in, wrapper, shardId);
return new ElasticsearchDirectoryReader(in, wrapper, shardId, readerIdGenerator);
}

// Remove this helpers
public static ElasticsearchDirectoryReader wrap(DirectoryReader reader, ShardId shardId) throws IOException {
return new ElasticsearchDirectoryReader(reader, new SubReaderWrapper(shardId), shardId, r -> null);
}

/**
* Wraps the given reader in a {@link ElasticsearchDirectoryReader} as
* well as all it's sub-readers in {@link ElasticsearchLeafReader} to
* expose the given shard Id.
*
* @param reader the reader to wrap
* @param shardId the shard ID to expose via the elasticsearch internal reader wrappers.
* @param reader the reader to wrap
* @param shardId the shard ID to expose via the elasticsearch internal reader wrappers
* @param readerIdGenerator a function that returns the id of the reader to wrap
*/
public static ElasticsearchDirectoryReader wrap(DirectoryReader reader, ShardId shardId) throws IOException {
return new ElasticsearchDirectoryReader(reader, new SubReaderWrapper(shardId), shardId);
public static ElasticsearchDirectoryReader wrap(DirectoryReader reader, ShardId shardId,
CheckedFunction<DirectoryReader, String, IOException> readerIdGenerator)
throws IOException {
return new ElasticsearchDirectoryReader(reader, new SubReaderWrapper(shardId), shardId, readerIdGenerator);
}

private static final class SubReaderWrapper extends FilterDirectoryReader.SubReaderWrapper {
Expand Down
25 changes: 23 additions & 2 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wra
try {
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
ElasticsearchDirectoryReader acquire = referenceManager.acquire();
SearcherSupplier reader = new SearcherSupplier(wrapper) {
SearcherSupplier reader = new SearcherSupplier(acquire.getReaderId(), wrapper) {
@Override
public Searcher acquireSearcherInternal(String source) {
assert assertSearcherIsWarmedUp(source, scope);
Expand Down Expand Up @@ -1178,21 +1178,42 @@ default void onFailedEngine(String reason, @Nullable Exception e) {
}

public abstract static class SearcherSupplier implements Releasable {
private final String searcherId;
private final Function<Searcher, Searcher> wrapper;
private final AtomicBoolean released = new AtomicBoolean(false);

public SearcherSupplier(Function<Searcher, Searcher> wrapper) {
public SearcherSupplier(String searcherId, Function<Searcher, Searcher> wrapper) {
this.searcherId = searcherId;
this.wrapper = wrapper;
}

/**
* If two searcher suppliers have the same id, then they will provide searchers that consist of the same underlying segment files.
* The search layer can use this search id to retry the query or fetch phase on another copy if the "snapshot" of that copy
* has the same underlying segment files.
*/
@Nullable
public final String getSearcherId() {
return searcherId;
}

public final Searcher acquireSearcher(String source) {
if (released.get()) {
throw new AlreadyClosedException("SearcherSupplier was closed");
}
final Searcher searcher = acquireSearcherInternal(source);
assert assertSameSearcherId(searcher);
return CAN_MATCH_SEARCH_SOURCE.equals(source) ? searcher : wrapper.apply(searcher);
}

private boolean assertSameSearcherId(Searcher searcher) {
final ElasticsearchDirectoryReader reader =
ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher.getDirectoryReader());
assert Objects.equals(reader.getReaderId(), getSearcherId()) :
reader.getReaderId() + " != " + getSearcherId();
return true;
}

@Override
public final void close() {
if (released.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
Expand All @@ -35,6 +36,7 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.ShuffleForcedMergePolicy;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
Expand Down Expand Up @@ -97,6 +99,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -589,13 +592,43 @@ private String loadHistoryUUID(Map<String, String> commitData) {
return uuid;
}

// pkg level for testing
final boolean isFullyCommitted(DirectoryReader reader) {
final boolean hasUncommittedChanges;
if (flushLock.tryLock()) {
try {
hasUncommittedChanges = indexWriter.hasUncommittedChanges();
} finally {
flushLock.unlock();
}
} else {
hasUncommittedChanges = true; // can't determine
}
try {
return hasUncommittedChanges == false && reader.isCurrent();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private ExternalReaderManager createReaderManager(RefreshWarmerListener externalRefreshListener) throws EngineException {
boolean success = false;
ElasticsearchReaderManager internalReaderManager = null;
try {
try {
final ElasticsearchDirectoryReader directoryReader =
ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId, reader -> {
// With soft-deletes enables, all changes are always written to disk and we can use id of segments of the reader
// to compute its signature.
reader = FilterDirectoryReader.unwrap(reader);
if (reader instanceof StandardDirectoryReader) {
final StandardDirectoryReader unwrapped = (StandardDirectoryReader) reader;
return ElasticsearchDirectoryReader.getReaderIdFromSegmentInfos(unwrapped.getSegmentInfos(),
() -> isFullyCommitted(unwrapped));
} else {
return null;
}
});
internalReaderManager = new ElasticsearchReaderManager(directoryReader,
new RamAccountingRefreshListener(engineConfig.getCircuitBreakerService()));
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class ReadOnlyEngine extends Engine {
private final boolean requireCompleteHistory;

protected volatile TranslogStats translogStats;
private final String searcherId;

/**
* Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened
Expand Down Expand Up @@ -107,6 +108,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
// yet this makes sure nobody else does. including some testing tools that try to be messy
indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null;
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
this.searcherId = ElasticsearchDirectoryReader.getReaderIdFromSegmentInfos(lastCommittedSegmentInfos, () -> true);
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
Expand Down Expand Up @@ -170,10 +172,14 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
// reopened as an internal engine, which would be the path to fix the issue.
}

protected String getSearcherId() {
return searcherId;
}

protected final ElasticsearchDirectoryReader wrapReader(DirectoryReader reader,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) throws IOException {
reader = readerWrapperFunction.apply(reader);
return ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId());
return ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId(), r -> searcherId);
}

protected DirectoryReader open(IndexCommit commit) throws IOException {
Expand Down
Loading