Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a shard filter search phase to pre-filter shards based on query rewriting #25658

Merged
merged 13 commits into from
Jul 12, 2017
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ protected void doExecute(SearchRequest request, ActionListener<SearchResponse> l
new SearchHit[0], 0L, 0.0f),
new InternalAggregations(Collections.emptyList()),
new Suggest(Collections.emptyList()),
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, new ShardSearchFailure[0]));
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, 0, new ShardSearchFailure[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void testInfo() throws IOException {
public void testSearchScroll() throws IOException {
Header[] headers = randomHeaders(random(), "Header");
SearchResponse mockSearchResponse = new SearchResponse(new SearchResponseSections(SearchHits.empty(), InternalAggregations.EMPTY,
null, false, false, null, 1), randomAlphaOfLengthBetween(5, 10), 5, 5, 100, new ShardSearchFailure[0]);
null, false, false, null, 1), randomAlphaOfLengthBetween(5, 10), 5, 5, 0, 100, new ShardSearchFailure[0]);
mockResponse(mockSearchResponse);
SearchResponse searchResponse = restHighLevelClient.searchScroll(new SearchScrollRequest(randomAlphaOfLengthBetween(5, 10)),
headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -67,6 +66,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
private final Object shardFailuresMutex = new Object();
private final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger skippedOps = new AtomicInteger();
private final TransportSearchAction.SearchTimeProvider timeProvider;


Expand Down Expand Up @@ -107,7 +107,7 @@ public final void start() {
if (getNumShards() == 0) {
//no search shards to search on, bail with empty response
//(it happens with search across _all with no indices around and consistent with broadcast operations)
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, buildTookInMillis(),
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, buildTookInMillis(),
ShardSearchFailure.EMPTY_ARRAY));
return;
}
Expand Down Expand Up @@ -169,35 +169,35 @@ private ShardSearchFailure[] buildShardFailures() {

public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e) {
// we don't aggregate shard failures on non active shards (but do keep the header counts right)
if (TransportActions.isShardNotAvailableException(e)) {
return;
}
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
// lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures)
if (shardFailures == null) { // this is double checked locking but it's fine since SetOnce uses a volatile read internally
synchronized (shardFailuresMutex) {
shardFailures = this.shardFailures.get(); // read again otherwise somebody else has created it?
if (shardFailures == null) { // still null so we are the first and create a new instance
shardFailures = new AtomicArray<>(getNumShards());
this.shardFailures.set(shardFailures);
if (TransportActions.isShardNotAvailableException(e) == false) {
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
// lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures)
if (shardFailures == null) { // this is double checked locking but it's fine since SetOnce uses a volatile read internally
synchronized (shardFailuresMutex) {
shardFailures = this.shardFailures.get(); // read again otherwise somebody else has created it?
if (shardFailures == null) { // still null so we are the first and create a new instance
shardFailures = new AtomicArray<>(getNumShards());
this.shardFailures.set(shardFailures);
}
}
}
}
ShardSearchFailure failure = shardFailures.get(shardIndex);
if (failure == null) {
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
} else {
// the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state
if (TransportActions.isReadOverrideException(e)) {
ShardSearchFailure failure = shardFailures.get(shardIndex);
if (failure == null) {
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
} else {
// the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state
if (TransportActions.isReadOverrideException(e)) {
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
}
}
}

if (results.hasResult(shardIndex)) {
assert failure == null : "shard failed before but shouldn't: " + failure;
successfulOps.decrementAndGet(); // if this shard was successful before (initial phase) we have to adjust the counter
if (results.hasResult(shardIndex)) {
assert failure == null : "shard failed before but shouldn't: " + failure;
successfulOps.decrementAndGet(); // if this shard was successful before (initial phase) we have to adjust the counter
}
}
results.consumeShardFailure(shardIndex);
}

/**
Expand Down Expand Up @@ -264,7 +264,7 @@ public final SearchRequest getRequest() {
@Override
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
buildTookInMillis(), buildShardFailures());
skippedOps.get(), buildTookInMillis(), buildShardFailures());
}

@Override
Expand Down Expand Up @@ -313,4 +313,11 @@ public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIter
* @param context the search context for the next phase
*/
protected abstract SearchPhase getNextPhase(SearchPhaseResults<Result> results, SearchPhaseContext context);

@Override
protected void skipShard(SearchShardIterator iterator) {
super.skipShard(iterator);
successfulOps.incrementAndGet();
skippedOps.incrementAndGet();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.transport.Transport;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

/**
* This search phrase can be used as an initial search phase to pre-filter search shards based on query rewriting.
* The queries are rewritten against the shards and based on the rewrite result shards might be able to be excluded
* from the search. The extra round trip to the search shards is very cheap and is not subject to rejections
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
* large portion of the clusters indices.
*/
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchTransportService.CanMatchResponse> {

private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
private final GroupShardsIterator<SearchShardIterator> shardsIts;

CanMatchPreFilterSearchPhase(Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory) {
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request,
listener,
shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()));
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
}

@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
SearchActionListener<SearchTransportService.CanMatchResponse> listener) {
getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchTransportService.CanMatchResponse> results,
SearchPhaseContext context) {

return phaseFactory.apply(getIterator((BitSetSearchPhaseResults) results, shardsIts));
}

private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseResults results,
GroupShardsIterator<SearchShardIterator> shardsIts) {
int cardinality = results.getNumPossibleMatches();
FixedBitSet possibleMatches = results.getPossibleMatches();
if (cardinality == 0) {
// this is a special case where we have no hit but we need to get at least one search response in order
// to produce a valid search result with all the aggs etc.
possibleMatches.set(0);
}
int i = 0;
for (SearchShardIterator iter : shardsIts) {
if (possibleMatches.get(i++)) {
iter.reset();
} else {
iter.resetAndSkip();
}
}
return shardsIts;
}

private static final class BitSetSearchPhaseResults extends InitialSearchPhase.
SearchPhaseResults<SearchTransportService.CanMatchResponse> {

private final FixedBitSet possibleMatches;
private int numPossibleMatches;

BitSetSearchPhaseResults(int size) {
super(size);
possibleMatches = new FixedBitSet(size);
}

@Override
void consumeResult(SearchTransportService.CanMatchResponse result) {
if (result.canMatch()) {
consumeShardFailure(result.getShardIndex());
}
}

@Override
boolean hasResult(int shardIndex) {
return false; // unneeded
}

@Override
synchronized void consumeShardFailure(int shardIndex) {
// we have to carry over shard failures in order to account for them in the response.
possibleMatches.set(shardIndex);
numPossibleMatches++;
}


synchronized int getNumPossibleMatches() {
return numPossibleMatches;
}

synchronized FixedBitSet getPossibleMatches() {
return possibleMatches;
}

@Override
Stream<SearchTransportService.CanMatchResponse> getSuccessfulResults() {
return Stream.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@
* @see CountedCollector#onFailure(int, SearchShardTarget, Exception)
*/
final class DfsQueryPhase extends SearchPhase {
private final InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> queryResult;
private final InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> queryResult;
private final SearchPhaseController searchPhaseController;
private final AtomicArray<DfsSearchResult> dfsSearchResults;
private final Function<InitialSearchPhase.SearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory;
private final Function<InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory;
private final SearchPhaseContext context;
private final SearchTransportService searchTransportService;

DfsQueryPhase(AtomicArray<DfsSearchResult> dfsSearchResults,
SearchPhaseController searchPhaseController,
Function<InitialSearchPhase.SearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory,
Function<InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory,
SearchPhaseContext context) {
super("dfs_query");
this.queryResult = searchPhaseController.newSearchPhaseResults(context.getRequest(), context.getNumShards());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ final class FetchSearchPhase extends SearchPhase {
}
this.fetchResults = new AtomicArray<>(resultConsumer.getNumShards());
this.searchPhaseController = searchPhaseController;
this.queryResults = resultConsumer.results;
this.queryResults = resultConsumer.getAtomicArray();
this.nextPhaseFactory = nextPhaseFactory;
this.context = context;
this.logger = context.getLogger();
Expand Down Expand Up @@ -105,7 +105,8 @@ private void innerRun() throws IOException {
-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults);
if (queryAndFetchOptimization) {
assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null;
assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults empty [" + phaseResults.isEmpty()
+ "], single result: " + phaseResults.get(0).fetchResult();
// query AND fetch optimization
finishPhase.run();
} else {
Expand Down
Loading