Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a shard filter search phase to pre-filter shards based on query rewriting #25658

Merged
merged 13 commits into from
Jul 12, 2017

Conversation

s1monw
Copy link
Contributor

@s1monw s1monw commented Jul 11, 2017

Today if we search across a large amount of shards we hit every shard. Yet, it's quite
common to search across an index pattern for time based indices but filtering will exclude
all results outside a certain time range ie. now-3d. While the search can potentially hit
hundreds of shards the majority of the shards might yield 0 results since there is not document
that is within this date range. Kibana for instance does this regularly but used _field_stats
to optimize the indexes they need to query. Now with the deprecation of _field_stats and it's upcoming
removal a single dashboard in kibana can potentially turn into searches hitting hundreds or thousands
of shards and that can easily cause search rejections even though the most of the requests are
very likely super cheap and only need a query rewriting to early terminate with 0 results.

This change adds a pre-filter phase for searches that can, if the number of shards are higher than
a the pre_filter_shard_size threshold (defaults to 128 shards), fan out to the shards
and check if the query can potentially match any documents at all. While false positives are possible,
a negative response means that no matches are possible. These requests are not subject to rejection
and can greatly reduce the number of shards a request needs to hit. The approach here is preferable
to the kibana approach with field stats since it correctly handles aliases and uses the correct
threadpools to execute these requests. Further it's completely transparent to the user and improves
scalability of elasticsearch in general on large clusters.

…ewriting

Today if we search across a large amount of shards we hit every shard. Yet, it's quite
common to search across an index pattern for time based indices but filtering will exclude
all results outside a certain time range ie. `now-3d`. While the search can potentially hit
hunderets of shards the majority of the shards might yield 0 results since there is not document
that is within this date range. Kibana for instance does this regularly but used `_field_stats`
to optimzie the indice they need to query. Now with the deprecation of `_field_stats` and it's upcoming
removal a single dashboard in kibanan can potentially turn into searches hitting hunderets or thousands
of shards and that can easily cause search rejections even though the most of the requests are
very likely super cheap and only need a query rewriting to early terminate with 0 results.

This change adds a pre-filter phase for searches that can, if the number of shards are higher than
a the `pre_filter_shards_after` threshold (defaults to 128 shards), fan out to the shards
and check if the query can potentially match any documents at all. While false positives are possible,
a negative response means that no matches are possible. These requests are not subject to rejection
and can greatly reduce the number of shards a request needs to hit. The approach here is preferable
to the kibana approach with field stats since it correctly handles aliases and uses the correct
threadpools to execute these requests. Further it's completely transparent to the user and improves
scalability of elasticsearch in general on large clusters.
@s1monw s1monw added :Search/Search Search-related issues that do not fall into other categories >enhancement review v6.0.0 labels Jul 11, 2017
@s1monw s1monw requested review from jpountz and jimczi July 11, 2017 21:13
@s1monw
Copy link
Contributor Author

s1monw commented Jul 11, 2017

@jpountz @jimczi I will need to do some work on the unittest end but I wanted to get it out here asap for first rounds and opinions. I also would like to have @clintongormley to look into naming of the parameter I am not a huge fan of it.

@epixa
Copy link
Contributor

epixa commented Jul 11, 2017

/cc @spalger

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some minor comments but I love it.
This is a nice solution for time based search but not only so a huge +1.

} else if (results.numMatches == 0) {
// this is a special case where we have no hit but we need to get at least one search response in order
// to produce a valid search result with all the aggs etc. at least that is what I think is the case... and clint does so
// too :D
Copy link
Contributor

@jimczi jimczi Jul 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree too ;)
It's extra work since for instance global ords or fielddata could be loaded by this single search but we can optimize this later. It's already a huge win since this will avoid the loading on the other shards !

@@ -58,6 +58,8 @@

private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

public static final int DEFAULT_PRE_FILTER_SHARDS_AFTER = 1;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the default ? 128 like below or 1 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha yeah true I wanted to trigger this constantly so I changed this but didn't revert

if (source != null) {
QueryBuilder queryBuilder = source.query();
AggregatorFactories.Builder aggregations = source.aggregations();
boolean hasGlobalAggs = aggregations != null && aggregations.hasGlobalAggregationBuilder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be check on the coordinating node instead to save the round trip since if there is a global agg all shards must match ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ will do that

pre_filter_shards_after: 1
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"} } } }

- match: { _shards.total: 2 }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why it's important for testing this feature but shouldn't we return the total number of shards pre-filtering ? I think it should be transparent and not modify the total here, otherwise it becomes hard to understand why some shards are in and some are not ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed I would like it better if it was transparent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try but it might complicate things to be honest...

@jakommo
Copy link
Contributor

jakommo commented Jul 12, 2017

/cc @n0othing @astefan @gingerwizard @inqueue as we talked about this a few days ago.

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some thoughts.

@@ -58,6 +58,8 @@

private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

public static final int DEFAULT_PRE_FILTER_SHARDS_AFTER = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs claim the default is 128?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha yeah true I wanted to trigger this constantly so I changed this but didn't revert

if (source != null) {
QueryBuilder queryBuilder = source.query();
AggregatorFactories.Builder aggregations = source.aggregations();
boolean hasGlobalAggs = aggregations != null && aggregations.hasGlobalAggregationBuilder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh oh oh I would have forgotten about this guy. I guess testing found it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there is a similar case with minDocCount=0 on terms aggs which exposes all terms contained in the terms dict of doc values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there is a similar case with minDocCount=0 on terms aggs which exposes all terms contained in the terms dict of doc values.

can you elaborate on this. I am not sure how i can check that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. This means that we need to check all root aggregations and make sure that none of them can return buckets when the query is MatchNone.
I think we could/should make the aggregation rewriting aware of the query rewriting.
Currently we rewrite aggregations on the shards but they are not supposed to check the query. Instead we could just pass the rewritten query when we rewrite aggs and if the query cannot match document the agg could be rewritten in an MatchNoneAggregationBuilder. Then we could have special cases for aggs like a root terms aggregation with minDocCount set to 0 and canMatch could check after the aggs rewriting that all root aggregations are MatchNoneAggregationBuilder ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a first step, I'd just do instanceof checks for TermsAggregationBuilder and (Date)HistogramAggregationBuilder, and check the value of minDocCount.

@@ -105,7 +105,8 @@ private void innerRun() throws IOException {
-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults);
if (queryAndFetchOptimization) {
assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null;
assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults emtpy [" + phaseResults.isEmpty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: emtpy --> empty

List<SearchRequest> requests = multiRequest.requests();
preFilterShardsAfter = Math.max(1, preFilterShardsAfter / (requests.size()+1));
for (SearchRequest request : requests) {
request.setPreFilterSearchShardsAfter(preFilterShardsAfter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if preFilterShardsAfter has been set explicitly on the search request and set it to the min of preFilterShardsAfter and request.getPreFilterSearchShardsAfter()? Not sure if this would actually matter in practice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch I will do that

@s1monw
Copy link
Contributor Author

s1monw commented Jul 12, 2017

I pushed new commit addressing all commetns except of the min_doc_count @jpountz @jimczi

AggregatorFactories.Builder aggregations = source.aggregations();
boolean hasGlobalAggs = aggregations != null && aggregations.hasGlobalAggregationBuilder();
if (queryBuilder != null && hasGlobalAggs == false) { // we need to executed hasGlobalAggs is equivalent to match all
return queryBuilder instanceof MatchNoneQueryBuilder == false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see the only time this will be hit is if the query is a simple range query which does not overlap with the data on the shard as we only check the root query type. This means that if you have a boolean query with a must/filter range clause and other clauses this won't be rewritten to a match none query and therefore will still cause the search request to hit that shard. To me this seems like a fairly common case for search. Maybe we should change the rewrite of the BoolQueryBuilder to rewrite to a match none query if any of the must/filter clauses are match_none to catch these cases too? (I can add this in a separate PR after this is merged)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is irrelevent as I hadn't seen #25650

@clintongormley
Copy link

I also would like to have @clintongormley to look into naming of the parameter I am not a huge fan of it.

I don't have much in the way of suggestions, but we have batched_reduce_size, so perhaps prefilter_shards_size?

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for keeping the "total" _shards.total.
For the min_doc_count issue I agree with Adrien, just checking the root aggregations builders should be enough.

@s1monw
Copy link
Contributor Author

s1monw commented Jul 12, 2017

@jpountz @jimczi it's ready for another round

@s1monw s1monw merged commit e81804c into elastic:master Jul 12, 2017
jasontedor added a commit to fred84/elasticsearch that referenced this pull request Jul 12, 2017
* master: (181 commits)
  Use a non default port range in MockTransportService
  Add a shard filter search phase to pre-filter shards based on query rewriting (elastic#25658)
  Prevent excessive disk consumption by log files
  Migrate RestHttpResponseHeadersIT to ESRestTestCase (elastic#25675)
  Use config directory to find jvm.options
  Fix inadvertent rename of systemd tests
  Adding basic search request documentation for high level client (elastic#25651)
  Disallow lang to be used with Stored Scripts (elastic#25610)
  Fix typo in ScriptDocValues deprecation warnings (elastic#25672)
  Changes DocValueFieldsFetchSubPhase to reuse doc values iterators for multiple hits (elastic#25644)
  Query range fields by doc values when they are expected to be more efficient than points.
  Remove SearchHit#internalHits (elastic#25653)
  [DOCS] Reorganized the highlighting topic so it's less confusing.
  Add an underscore to flood stage setting
  Avoid failing install if system-sysctl is masked
  Add another parent value option to join documentation (elastic#25609)
  Ensure we rewrite common queries to `match_none` if possible (elastic#25650)
  Remove reference to field-stats docs.
  Optimize the order of bytes in uuids for better compression. (elastic#24615)
  Fix BytesReferenceStreamInput#skip with offset (elastic#25634)
  ...
s1monw added a commit that referenced this pull request Jul 13, 2017
@s1monw s1monw mentioned this pull request Jul 13, 2017
s1monw added a commit to s1monw/elasticsearch that referenced this pull request Jul 13, 2017
…tion in mixed version

6.0 applies some optimization to query rewriting if the number of shards
is large. In oder to make use of this optimization this commit adds the internal endpoint
to 5.6 such that a 6.0 coordinator node can make use of the feature even in a mixed cluster
or via cross cluster search.

Relates to elastic#25658
s1monw added a commit that referenced this pull request Jul 15, 2017
s1monw added a commit that referenced this pull request Jul 15, 2017
…ewriting (#25658)

Today if we search across a large amount of shards we hit every shard. Yet, it's quite
common to search across an index pattern for time based indices but filtering will exclude
all results outside a certain time range ie. `now-3d`. While the search can potentially hit
hundreds of shards the majority of the shards might yield 0 results since there is not document
that is within this date range. Kibana for instance does this regularly but used `_field_stats`
to optimize the indexes they need to query. Now with the deprecation of `_field_stats` and it's upcoming removal a single dashboard in kibana can potentially turn into searches hitting hundreds or thousands of shards and that can easily cause search rejections even though the most of the requests are very likely super cheap and only need a query rewriting to early terminate with 0 results.

This change adds a pre-filter phase for searches that can, if the number of shards are higher than a the `pre_filter_shard_size` threshold (defaults to 128 shards), fan out to the shards
and check if the query can potentially match any documents at all. While false positives are possible, a negative response means that no matches are possible. These requests are not subject to rejection and can greatly reduce the number of shards a request needs to hit. The approach here is preferable to the kibana approach with field stats since it correctly handles aliases and uses the correct threadpools to execute these requests. Further it's completely transparent to the user and improves scalability of elasticsearch in general on large clusters.
@s1monw s1monw added the v5.6.0 label Jul 15, 2017
s1monw added a commit to s1monw/elasticsearch that referenced this pull request Jul 15, 2017
Even if the query part can rewrite to match none we can't skip the
suggest execution since it might yield results.

Relates to elastic#25658
s1monw added a commit that referenced this pull request Jul 16, 2017
Even if the query part can rewrite to match none we can't skip the
suggest execution since it might yield results.

Relates to #25658
s1monw added a commit that referenced this pull request Jul 16, 2017
Even if the query part can rewrite to match none we can't skip the
suggest execution since it might yield results.

Relates to #25658
@@ -282,10 +286,22 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

public Builder addAggregators(AggregatorFactories factories) {
throw new UnsupportedOperationException("This needs to be removed");
public boolean mustVisiteAllDocs() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra 'e'

@therealnb
Copy link

This is a great idea, but it looks like failIfOverShardCountLimit is called before the shards are skipped. Is there any reason it has to be like this?

Obviously, if I query index-* with a small time range query the pre filter would bring the shard count to < 1000, but it will still fail failIfOverShardCountLimit. So, naively, it looks like it would be better to do failIfOverShardCountLimit after.

@therealnb
Copy link

@egalpin
Copy link

egalpin commented Nov 26, 2020

@s1monw Would you be able to expand on the origin of the default shard threshold of 128? Is there a reasonable rule of thumb in terms of the amount of overhead per shard incurred by pre-filtering? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.