-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
First step towards incremental reduction of query responses #23253
Conversation
Today all query results are buffered up until we received responses of all shards. This can hold on to a significant amount of memory if the number of shards is large. This commit adds a first step towards incrementally reducing aggregations results if a, per search request, configurable amount of responses are received. If enough query results have been received and buffered all so-far received aggregation responses will be reduced and released to be GCed.
@elasticmachine test this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this change, I expected it to be more complex than that so this is a good surprise to me! I left some picky coments about naming and comments to make this change a bit easier to read. I think the interesting question is about how many buckets intermediate reduces for terms (or geo-hash) aggregations should produce.
*/ | ||
public Stream<Result> stream() { | ||
return results.asList().stream().map(e -> e.value); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how to address it but when I see both a size()
and stream()
method on a class, I tend to expect that the stream wraps size
elements. I wonder whether we should make naming a bit more explicit to avoid this potential confusion.
if (buffer != null) { | ||
InternalAggregations aggregations = (InternalAggregations) querySearchResult.consumeAggs(); | ||
// once the size is incremented to the length of the buffer we know all elements are added | ||
// we also have happens before guarantees due to the memory barrier of the size write |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this comment outdated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I had a complex solution first with non-blocking concurrency etc. I didn't go with it apparently
} | ||
|
||
/** | ||
* Returns <code>true</code> iff the current reduce phase is the final reduce phase. This indicated if operations like |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/indicated/indicates/
} | ||
logger.info("test failed. trying to see if it recovers after 1m.", ae); | ||
try { | ||
Thread.sleep(60000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!!!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dude!
@@ -228,7 +228,7 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu | |||
} | |||
} | |||
|
|||
final int size = Math.min(requiredSize, buckets.size()); | |||
final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since one of the goals of this change is to limit memory usage, I wonder whether this should use getShardSize()
rather than buckets.size()
, this should be a good trade-off between accuracy and memory usage? cc @colings86
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory I think this would be a good change to make, however I think we should do it in a separate PR as it may require the error calculations to be tweaked a bit to be correct.
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults); | ||
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, firstResult, suggest, aggregations, | ||
shardResults); | ||
} | ||
|
||
|
||
private InternalAggregations reduceAggsOnly(List<InternalAggregations> aggregationsList) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe update the name or add a comment to say that this method is about performing an intermediate reduce? (as opposed to final)
/** | ||
* Reduces the given query results and consumes all aggregations and profile results. | ||
* @see QuerySearchResult#consumeAggs() | ||
* @see QuerySearchResult#consumeProfileResult() | ||
*/ | ||
public final ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults) { | ||
public final ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults, | ||
List<InternalAggregations> reducedAggs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment to explain that reducedAggs is the result from intermediate reduce operations?
aggregationsList = new ArrayList<>(queryResults.size()); | ||
} else { | ||
aggregationsList = reducedAggs == null ? Collections.emptyList() : reducedAggs; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think hasAggs
is a confusing name given how this method was refactored. Maybe we should restructure the logic a bit with comments to explain what each case maps to? eg. something like
if (reducedAggs != null) {
// we already have results from intermediate reduces and just need to perform the final reduce
assert firstResult.hasAggs();
} else if (firstResult.hasAggs()) {
// the number of shards was less than the buffer size so we reduce agg results directly
} else {
// no aggregations
}
@elasticmachine would you bother to test this |
@jpountz I pushed some changes |
Looks great. Do you have any opinions about the size to use for intermediate reduces of terms aggs? I'm good with pulling this change in and making it a follow-up, this change is already a net improvement as-is. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I left a couple of minor comments
@@ -46,6 +46,8 @@ | |||
*/ | |||
public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, SearchResponse, SearchRequestBuilder> { | |||
|
|||
private int reduceUpTo; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used? It looks below like we set this directly on the request?
@@ -228,7 +228,7 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu | |||
} | |||
} | |||
|
|||
final int size = Math.min(requiredSize, buckets.size()); | |||
final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory I think this would be a good change to make, however I think we should do it in a separate PR as it may require the error calculations to be tweaked a bit to be correct.
@jpountz I will open followup for the following things:
I think we should just pull this one in without adding more stuff to it |
yes |
In elastic#23253 we added an the ability to incrementally reduce search results. This change exposes the parameter to control the batch since and therefore the memory consumption of a large search request.
We can and should randomly reduce down to a single result before we passing the aggs to the final reduce. This commit changes the logic to do that and ensures we don't trip the assertions the previous imple tripped. Relates to #23253
Some randomization caused reduction of the same agg multiple times which causes issues on some aggregations. Relates to #23253
InternalTopHits uses "==" to compare hit scores and fails when score is NaN. This commit changes the comparaison to always use Double.compare. Relates #23253
Both PRs below have been backported to 5.4 such that we can enable BWC tests of this feature as well as remove version dependend serialization for search request / responses. Relates to elastic#23288 Relates to elastic#23253
Today all query results are buffered up until we received responses of all shards. This can hold on to a significant amount of memory if the number of shards is large. This commit adds a first step towards incrementally reducing aggregations results if a, per search request, configurable amount of responses are received. If enough query results have been received and buffered all so-far received aggregation responses will be reduced and released to be GCed.
We can and should randomly reduce down to a single result before we passing the aggs to the final reduce. This commit changes the logic to do that and ensures we don't trip the assertions the previous imple tripped. Relates to #23253
If I understand it right, the motivation here is to make several small top-10 calculations in the coordinating node, instead of making a single large calculation in the end when all the responses are available? Does this change effect the accuracy of the terms aggregation, as in oppose to the previous approach? |
Yes, it potentially reduces the accuracy of Note that combined with #25658, this change only starts reducing accuracy of the |
Okay, got it. Sounds cool. It means that this behavior is activated in only specific conditions? Or that this is the new behavior, and it will reduce accuracy in only some conditions? By the way, will the user see this inaccuracy in the error bounds? ( I understand that there's nothing to do when making a request to so many shards at once, but I don't like the approach of "scaling out, performance and latency are always much more important factors than accuracy". No one mentioned that less accurate results will come here, in Elasticsearch 5.4.0 released blog post:
|
Yes.
I would agree it would be discussable if that was only about performance and latency, but to me this is mostly about cluster stability, which I consider more important than the accuracy of In my opinion the number of users that are affected by decreased accuracy of |
@jpountz @IdanWo Actually I am pretty sure that we don't lose any accuracy on the terms aggregation because of incremental reduce because we do not truncate the list of terms until we are doing the 'final' incremental reduction (see https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java#L284) so the accuracy should not be affected. We do have #23285 open to explore truncating the list during the other incremental reductions though and this would indeed affect the accuracy of the terms aggregation if implemented. |
@colings86 Oh right, I remember we discussed it but had forgotten which approach we took. Thanks for clarifying! |
Today all query results are buffered up until we received responses of
all shards. This can hold on to a significant amount of memory if the number of
shards is large. This commit adds a first step towards incrementally reducing
aggregations results if a, per search request, configurable amount of responses
are received. If enough query results have been received and buffered all so-far
received aggregation responses will be reduced and released to be GCed.
this PR really needs some reviews and potential discussions but it's a start and outlines what it takes to make this feature work