Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First step towards incremental reduction of query responses #23253

Merged
merged 19 commits into from
Feb 21, 2017

Conversation

s1monw
Copy link
Contributor

@s1monw s1monw commented Feb 19, 2017

Today all query results are buffered up until we received responses of
all shards. This can hold on to a significant amount of memory if the number of
shards is large. This commit adds a first step towards incrementally reducing
aggregations results if a, per search request, configurable amount of responses
are received. If enough query results have been received and buffered all so-far
received aggregation responses will be reduced and released to be GCed.

this PR really needs some reviews and potential discussions but it's a start and outlines what it takes to make this feature work

Today all query results are buffered up until we received responses of
all shards. This can hold on to a significant amount of memory if the number of
shards is large. This commit adds a first step towards incrementally reducing
aggregations results if a, per search request, configurable amount of responses
are received. If enough query results have been received and buffered all so-far
received aggregation responses will be reduced and released to be GCed.
@s1monw s1monw added :Analytics/Aggregations Aggregations :Search/Search Search-related issues that do not fall into other categories >enhancement review v5.4.0 v6.0.0-alpha1 labels Feb 19, 2017
@s1monw
Copy link
Contributor Author

s1monw commented Feb 20, 2017

@elasticmachine test this please

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this change, I expected it to be more complex than that so this is a good surprise to me! I left some picky coments about naming and comments to make this change a bit easier to read. I think the interesting question is about how many buckets intermediate reduces for terms (or geo-hash) aggregations should produce.

*/
public Stream<Result> stream() {
return results.asList().stream().map(e -> e.value);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how to address it but when I see both a size() and stream() method on a class, I tend to expect that the stream wraps size elements. I wonder whether we should make naming a bit more explicit to avoid this potential confusion.

if (buffer != null) {
InternalAggregations aggregations = (InternalAggregations) querySearchResult.consumeAggs();
// once the size is incremented to the length of the buffer we know all elements are added
// we also have happens before guarantees due to the memory barrier of the size write
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this comment outdated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I had a complex solution first with non-blocking concurrency etc. I didn't go with it apparently

}

/**
* Returns <code>true</code> iff the current reduce phase is the final reduce phase. This indicated if operations like
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/indicated/indicates/

}
logger.info("test failed. trying to see if it recovers after 1m.", ae);
try {
Thread.sleep(60000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!!!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dude!

@@ -228,7 +228,7 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
}
}

final int size = Math.min(requiredSize, buckets.size());
final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since one of the goals of this change is to limit memory usage, I wonder whether this should use getShardSize() rather than buckets.size(), this should be a good trade-off between accuracy and memory usage? cc @colings86

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory I think this would be a good change to make, however I think we should do it in a separate PR as it may require the error calculations to be tweaked a bit to be correct.

final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, firstResult, suggest, aggregations,
shardResults);
}


private InternalAggregations reduceAggsOnly(List<InternalAggregations> aggregationsList) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe update the name or add a comment to say that this method is about performing an intermediate reduce? (as opposed to final)

/**
* Reduces the given query results and consumes all aggregations and profile results.
* @see QuerySearchResult#consumeAggs()
* @see QuerySearchResult#consumeProfileResult()
*/
public final ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults) {
public final ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults,
List<InternalAggregations> reducedAggs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to explain that reducedAggs is the result from intermediate reduce operations?

aggregationsList = new ArrayList<>(queryResults.size());
} else {
aggregationsList = reducedAggs == null ? Collections.emptyList() : reducedAggs;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think hasAggs is a confusing name given how this method was refactored. Maybe we should restructure the logic a bit with comments to explain what each case maps to? eg. something like

if (reducedAggs != null) {
  // we already have results from intermediate reduces and just need to perform the final reduce
  assert firstResult.hasAggs();
} else if (firstResult.hasAggs()) {
  // the number of shards was less than the buffer size so we reduce agg results directly
} else {
  // no aggregations
}

@s1monw
Copy link
Contributor Author

s1monw commented Feb 20, 2017

@elasticmachine would you bother to test this

@s1monw
Copy link
Contributor Author

s1monw commented Feb 20, 2017

@jpountz I pushed some changes

@jpountz
Copy link
Contributor

jpountz commented Feb 20, 2017

Looks great. Do you have any opinions about the size to use for intermediate reduces of terms aggs? I'm good with pulling this change in and making it a follow-up, this change is already a net improvement as-is.

Copy link
Contributor

@colings86 colings86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I left a couple of minor comments

@@ -46,6 +46,8 @@
*/
public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, SearchResponse, SearchRequestBuilder> {

private int reduceUpTo;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used? It looks below like we set this directly on the request?

@@ -228,7 +228,7 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
}
}

final int size = Math.min(requiredSize, buckets.size());
final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory I think this would be a good change to make, however I think we should do it in a separate PR as it may require the error calculations to be tweaked a bit to be correct.

@s1monw
Copy link
Contributor Author

s1monw commented Feb 20, 2017

@jpountz I will open followup for the following things:

I think we should just pull this one in without adding more stuff to it

@jpountz
Copy link
Contributor

jpountz commented Feb 20, 2017

I think we should just pull this one in without adding more stuff to it

yes

@s1monw s1monw merged commit f933f80 into elastic:master Feb 21, 2017
@s1monw s1monw deleted the incremental_reduce branch February 21, 2017 12:02
s1monw added a commit to s1monw/elasticsearch that referenced this pull request Feb 21, 2017
In elastic#23253 we added an the ability to incrementally reduce search results.
This change exposes the parameter to control the batch since and therefore
the memory consumption of a large search request.
s1monw added a commit that referenced this pull request Feb 21, 2017
We can and should randomly reduce down to a single result before
we passing the aggs to the final reduce. This commit changes the logic
to do that and ensures we don't trip the assertions the previous imple tripped.

Relates to #23253
s1monw added a commit that referenced this pull request Feb 21, 2017
Some randomization caused reduction of the same agg multiple times
which causes issues on some aggregations.

Relates to #23253
jimczi added a commit that referenced this pull request Feb 21, 2017
InternalTopHits uses "==" to compare hit scores and fails when score is NaN.
This commit changes the comparaison to always use Double.compare.

Relates #23253
s1monw added a commit that referenced this pull request Feb 21, 2017
In #23253 we added an the ability to incrementally reduce search results.
This change exposes the parameter to control the batch since and therefore
the memory consumption of a large search request.
s1monw added a commit to s1monw/elasticsearch that referenced this pull request Feb 22, 2017
Both PRs below have been backported to 5.4 such that we can enable
BWC tests of this feature as well as remove version dependend serialization
for search request / responses.

Relates to elastic#23288
Relates to elastic#23253
s1monw added a commit that referenced this pull request Feb 22, 2017
Today all query results are buffered up until we received responses of
all shards. This can hold on to a significant amount of memory if the number of
shards is large. This commit adds a first step towards incrementally reducing
aggregations results if a, per search request, configurable amount of responses
are received. If enough query results have been received and buffered all so-far
received aggregation responses will be reduced and released to be GCed.
s1monw added a commit that referenced this pull request Feb 22, 2017
We can and should randomly reduce down to a single result before
we passing the aggs to the final reduce. This commit changes the logic
to do that and ensures we don't trip the assertions the previous imple tripped.

Relates to #23253
s1monw added a commit that referenced this pull request Feb 22, 2017
In #23253 we added an the ability to incrementally reduce search results.
This change exposes the parameter to control the batch since and therefore
the memory consumption of a large search request.
s1monw added a commit that referenced this pull request Feb 22, 2017
Both PRs below have been backported to 5.4 such that we can enable
BWC tests of this feature as well as remove version dependend serialization
for search request / responses.

Relates to #23288
Relates to #23253
@IdanWo
Copy link

IdanWo commented Jul 21, 2017

If I understand it right, the motivation here is to make several small top-10 calculations in the coordinating node, instead of making a single large calculation in the end when all the responses are available? Does this change effect the accuracy of the terms aggregation, as in oppose to the previous approach?

@jpountz
Copy link
Contributor

jpountz commented Jul 21, 2017

Yes, it potentially reduces the accuracy of terms aggregations.

Note that combined with #25658, this change only starts reducing accuracy of the terms aggregations if more than 512 shards have matches. So say that you query 1000 shards but only 300 of them have matches, the accuracy will be the same as today.

@IdanWo
Copy link

IdanWo commented Jul 21, 2017

Okay, got it. Sounds cool. It means that this behavior is activated in only specific conditions? Or that this is the new behavior, and it will reduce accuracy in only some conditions?

By the way, will the user see this inaccuracy in the error bounds? (sum_other_doc_count, doc_count_error_upper_bound)? And if not, will this configuration be documented? I believe there should be a page for "tune for search accuracy", while there is a page for "tune for search speed" or "tune for disk usage".

I understand that there's nothing to do when making a request to so many shards at once, but I don't like the approach of "scaling out, performance and latency are always much more important factors than accuracy". No one mentioned that less accurate results will come here, in Elasticsearch 5.4.0 released blog post:

That said, it is quite easy to reach the 1,000 shard limit, especially with the recent release of Cross Cluster Search. As of 5.4.0, Top-N search results and aggregations are reduced in batches of 512, which puts an upper limit on the amount of memory used on the coordinating node, which has allowed us to set the shard soft limit to unlimited by default.

@jpountz
Copy link
Contributor

jpountz commented Jul 21, 2017

Will the user see this inaccuracy in the error bounds?

Yes.

I understand that there's nothing to do when making a request to so many shards at once, but I don't like the approach of "scaling out, performance and latency are much more important factors than accuracy".

I would agree it would be discussable if that was only about performance and latency, but to me this is mostly about cluster stability, which I consider more important than the accuracy of terms aggs.

In my opinion the number of users that are affected by decreased accuracy of terms aggs is low enough that mentioning it in the release notes would be more confusing than helping.

@colings86
Copy link
Contributor

@jpountz @IdanWo Actually I am pretty sure that we don't lose any accuracy on the terms aggregation because of incremental reduce because we do not truncate the list of terms until we are doing the 'final' incremental reduction (see https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java#L284) so the accuracy should not be affected. We do have #23285 open to explore truncating the list during the other incremental reductions though and this would indeed affect the accuracy of the terms aggregation if implemented.

@jpountz
Copy link
Contributor

jpountz commented Jul 21, 2017

@colings86 Oh right, I remember we discussed it but had forgotten which approach we took. Thanks for clarifying!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/Aggregations Aggregations >enhancement release highlight :Search/Search Search-related issues that do not fall into other categories v5.4.0 v6.0.0-alpha1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants