Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for early termination of search request #24398

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ public SearchRequestBuilder setTerminateAfter(int terminateAfter) {
return this;
}

/**
* Indicates whether the search should early terminate based on the index sorting.
* The search sort must not be set since documents will be returned sorted by the index sorting criteria.
*/
public SearchRequestBuilder setEarlyTerminate(boolean value) {
sourceBuilder().earlyTerminate(value);
return this;
}

/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.EarlyTerminatingSortingCollector;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
Expand All @@ -50,6 +51,7 @@
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.SimpleCollector;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.SortedSetSortField;
Expand Down Expand Up @@ -259,6 +261,14 @@ public static final TimeLimitingCollector wrapTimeLimitingCollector(final Collec
return new TimeLimitingCollector(delegate, counter, timeoutInMillis);
}

/**
* Wraps <code>delegate</code> with segment count based early termination sorting collector with a threshold of <code>maxHitsPerSegment</code>
*/
public static final EarlyTerminatingSortingCollector wrapCountBasedEarlyTerminatingSortingCollector(final Collector delegate, final Sort sort, int maxHitsPerSegment) {
return new EarlyTerminatingSortingCollector(delegate, sort, maxHitsPerSegment);
}


/**
* Check whether there is one or more documents matching the provided query.
*/
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
Expand Down Expand Up @@ -120,7 +121,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final ScriptService scriptService;
private final ClusterService clusterService;
private final Client client;
private Supplier<Sort> indexSortSupplier;
private Supplier<SortAndFormats> indexSortSupplier;

public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
NamedXContentRegistry xContentRegistry,
Expand Down Expand Up @@ -255,7 +256,7 @@ public SimilarityService similarityService() {
return similarityService;
}

public Supplier<Sort> getIndexSortSupplier() {
public Supplier<SortAndFormats> getIndexSortSupplier() {
return indexSortSupplier;
}

Expand Down
10 changes: 7 additions & 3 deletions core/src/main/java/org/elasticsearch/index/IndexSortConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortOrder;

import java.util.Collections;
Expand Down Expand Up @@ -175,13 +177,14 @@ public boolean hasIndexSort() {
* Builds the {@link Sort} order from the settings for this index
* or returns null if this index has no sort.
*/
public Sort buildIndexSort(Function<String, MappedFieldType> fieldTypeLookup,
Function<MappedFieldType, IndexFieldData<?>> fieldDataLookup) {
public SortAndFormats buildIndexSort(Function<String, MappedFieldType> fieldTypeLookup,
Function<MappedFieldType, IndexFieldData<?>> fieldDataLookup) {
if (hasIndexSort() == false) {
return null;
}

final SortField[] sortFields = new SortField[sortSpecs.length];
final DocValueFormat[] docValueFormats = new DocValueFormat[sortSpecs.length];
for (int i = 0; i < sortSpecs.length; i++) {
FieldSortSpec sortSpec = sortSpecs[i];
final MappedFieldType ft = fieldTypeLookup.apply(sortSpec.field);
Expand All @@ -203,9 +206,10 @@ public Sort buildIndexSort(Function<String, MappedFieldType> fieldTypeLookup,
throw new IllegalArgumentException("docvalues not found for index sort field:[" + sortSpec.field + "]");
}
sortFields[i] = fieldData.sortField(sortSpec.missingValue, mode, null, reverse);
docValueFormats[i] = ft.docValueFormat(null, null);
validateIndexSortField(sortFields[i]);
}
return new Sort(sortFields);
return new SortAndFormats(new Sort(sortFields), docValueFormats);
}

private void validateIndexSortField(SortField sortField) {
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.suggest.completion.CompletionFieldStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -174,7 +175,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final TranslogConfig translogConfig;
private final IndexEventListener indexEventListener;
private final QueryCachingPolicy cachingPolicy;
private final Supplier<Sort> indexSortSupplier;
private final Supplier<SortAndFormats> indexSortSupplier;

/**
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
Expand Down Expand Up @@ -230,7 +231,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final RefreshListeners refreshListeners;

public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardPath path, Store store,
Supplier<Sort> indexSortSupplier, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService,
Supplier<SortAndFormats> indexSortSupplier, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService,
IndexFieldDataService indexFieldDataService, @Nullable EngineFactory engineFactory,
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, ThreadPool threadPool, BigArrays bigArrays,
Engine.Warmer warmer, Runnable globalCheckpointSyncer, List<SearchOperationListener> searchOperationListener, List<IndexingOperationListener> listeners) throws IOException {
Expand Down Expand Up @@ -297,7 +298,7 @@ public Store store() {
/**
* Return the sort order of this index, or null if the index has no sort.
*/
public Sort getIndexSort() {
public SortAndFormats getIndexSort() {
return indexSortSupplier.get();
}
/**
Expand Down Expand Up @@ -1818,11 +1819,12 @@ private DocumentMapperForType docMapper(String type) {

private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger);
Sort indexSort = indexSortSupplier.get();
final SortAndFormats indexSort = indexSortSupplier.get();
return new EngineConfig(openMode, shardId,
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort);
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners,
indexSort == null ? null : indexSort.sort);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.search.sort.SortAndFormats;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -110,12 +111,13 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
}
indexShard.mapperService().merge(indexMetaData, MapperService.MergeReason.MAPPING_RECOVERY, true);
// now that the mapping is merged we can validate the index sort configuration.
Sort indexSort = indexShard.getIndexSort();
SortAndFormats indexSort = indexShard.getIndexSort();
return executeRecovery(indexShard, () -> {
logger.debug("starting recovery from local shards {}", shards);
try {
final Directory directory = indexShard.store().directory(); // don't close this directory!!
addIndices(indexShard.recoveryState().getIndex(), directory, indexSort,
addIndices(indexShard.recoveryState().getIndex(), directory,
indexSort == null ? null : indexSort.sort,
shards.stream().map(s -> s.getSnapshotDirectory())
.collect(Collectors.toList()).toArray(new Directory[shards.size()]));
internalRecoverFromStore(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ final class DefaultSearchContext extends SearchContext {
private TimeValue timeout;
// terminate after count
private int terminateAfter = DEFAULT_TERMINATE_AFTER;
private boolean earlyTerminate = false;
private List<String> groupStats;
private ScrollContext scrollContext;
private boolean explain;
Expand Down Expand Up @@ -514,6 +515,16 @@ public void terminateAfter(int terminateAfter) {
this.terminateAfter = terminateAfter;
}

@Override
public boolean earlyTerminate() {
return earlyTerminate;
}

@Override
public void earlyTerminate(boolean value) {
this.earlyTerminate = value;
}

@Override
public SearchContext minimumScore(float minimumScore) {
this.minimumScore = minimumScore;
Expand Down
31 changes: 31 additions & 0 deletions core/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.QueryShardContext;
Expand Down Expand Up @@ -726,6 +727,36 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
final CollapseContext collapseContext = source.collapse().build(context);
context.collapse(collapseContext);
}

if (source.earlyTerminate()) {
IndexSortConfig sortConfig = context.mapperService().getIndexSettings().getIndexSortConfig();
if (sortConfig == null || sortConfig.hasIndexSort() == false) {
throw new SearchContextException(context,
"cannot use `early_terminate` when index sorting is not set");
}
if (context.scrollContext() != null) {
throw new SearchContextException(context,
"`early_terminate` cannot be used in a scroll context");
}
if (context.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
throw new SearchContextException(context,
"cannot use `early_terminate` in conjunction with `terminate_after`");
}
if (context.searchAfter() != null) {
throw new SearchContextException(context,
"cannot use `early_terminate` in conjunction with `search_after");
}
SortAndFormats sortAndFormats = sortConfig.buildIndexSort(context.mapperService()::fullName,
context.fieldData()::getForField);
if (context.sort() != null && context.sort().sort.equals(sortAndFormats.sort) == false) {
throw new SearchContextException(context, "cannot use `early_terminate` when the search sort [" +
context.sort().sort + "] is different than the index sorting [" + sortAndFormats.sort + "]");
} else {
// the search sort is null or equals to index sorting so it's safe to override it here
context.sort(sortAndFormats);
}
context.earlyTerminate(true);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
public static final ParseField SIZE_FIELD = new ParseField("size");
public static final ParseField TIMEOUT_FIELD = new ParseField("timeout");
public static final ParseField TERMINATE_AFTER_FIELD = new ParseField("terminate_after");
public static final ParseField EARLY_TERMINATE_FIELD = new ParseField("early_terminate");
public static final ParseField QUERY_FIELD = new ParseField("query");
public static final ParseField POST_FILTER_FIELD = new ParseField("post_filter");
public static final ParseField MIN_SCORE_FIELD = new ParseField("min_score");
Expand Down Expand Up @@ -149,6 +150,7 @@ public static HighlightBuilder highlight() {

private TimeValue timeout = null;
private int terminateAfter = SearchContext.DEFAULT_TERMINATE_AFTER;
private boolean earlyTerminate = false;

private StoredFieldsContext storedFieldsContext;
private List<String> docValueFields;
Expand Down Expand Up @@ -223,6 +225,9 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_5_3_0_UNRELEASED)) {
collapse = in.readOptionalWriteable(CollapseBuilder::new);
}
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
earlyTerminate = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -274,6 +279,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_5_3_0_UNRELEASED)) {
out.writeOptionalWriteable(collapse);
}
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeBoolean(earlyTerminate);
}
}

/**
Expand Down Expand Up @@ -426,6 +434,22 @@ public int terminateAfter() {
return terminateAfter;
}

/**
* Indicates whether the search should early terminate based on the index sorting.
* The search sort must not be set since documents will be returned sorted by the index sorting criteria.
*/
public SearchSourceBuilder earlyTerminate(boolean value) {
this.earlyTerminate = value;
return this;
}

/**
* Returns whether the search should early terminate based on the index sorting
*/
public boolean earlyTerminate() {
return earlyTerminate;
}

/**
* Adds a sort against the given field name and the sort ordering.
*
Expand Down Expand Up @@ -955,6 +979,8 @@ public void parseXContent(QueryParseContext context) throws IOException {
timeout = TimeValue.parseTimeValue(parser.text(), null, TIMEOUT_FIELD.getPreferredName());
} else if (TERMINATE_AFTER_FIELD.match(currentFieldName)) {
terminateAfter = parser.intValue();
} else if (EARLY_TERMINATE_FIELD.match(currentFieldName)) {
earlyTerminate = parser.booleanValue();
} else if (MIN_SCORE_FIELD.match(currentFieldName)) {
minScore = parser.floatValue();
} else if (VERSION_FIELD.match(currentFieldName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,16 @@ public void terminateAfter(int terminateAfter) {
in.terminateAfter(terminateAfter);
}

@Override
public boolean earlyTerminate() {
return in.earlyTerminate();
}

@Override
public void earlyTerminate(boolean value) {
in.earlyTerminate(value);
}

@Override
public boolean lowLevelCancellation() {
return in.lowLevelCancellation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ public InnerHitsContext innerHits() {

public abstract void terminateAfter(int terminateAfter);

public abstract boolean earlyTerminate();

public abstract void earlyTerminate(boolean value);

/**
* Indicates if the current index should perform frequent low level search cancellation check.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class CollectorResult implements ToXContentObject, Writeable {
public static final String REASON_SEARCH_COUNT = "search_count";
public static final String REASON_SEARCH_TOP_HITS = "search_top_hits";
public static final String REASON_SEARCH_TERMINATE_AFTER_COUNT = "search_terminate_after_count";
public static final String REASON_SEARCH_SORTING_EARLY_TERMINATION = "search_sorting_early_terminate";
public static final String REASON_SEARCH_POST_FILTER = "search_post_filter";
public static final String REASON_SEARCH_MIN_SCORE = "search_min_score";
public static final String REASON_SEARCH_MULTI = "search_multi";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.grouping.CollapsingTopDocsCollector;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.MinimumScoreCollector;
import org.elasticsearch.common.lucene.search.FilteredCollector;
Expand Down Expand Up @@ -265,6 +264,7 @@ static boolean execute(SearchContext searchContext, final IndexSearcher searcher
}

final boolean terminateAfterSet = searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER;
final boolean earlyTerminateSet = searchContext.earlyTerminate();
if (terminateAfterSet) {
final Collector child = collector;
// throws Lucene.EarlyTerminationException when given count is reached
Expand All @@ -273,6 +273,14 @@ static boolean execute(SearchContext searchContext, final IndexSearcher searcher
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TERMINATE_AFTER_COUNT,
Collections.singletonList((InternalProfileCollector) child));
}
} else if (earlyTerminateSet) {
assert searchContext.sort() != null;
final Collector child = collector;
collector = Lucene.wrapCountBasedEarlyTerminatingSortingCollector(collector, searchContext.sort().sort, numDocs);
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_SORTING_EARLY_TERMINATION,
Collections.singletonList((InternalProfileCollector) child));
}
}

if (searchContext.parsedPostFilter() != null) {
Expand Down
Loading