-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ESQL: Compute engine support for stateful grouping functions #112757
base: main
Are you sure you want to change the base?
Conversation
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
public record GroupingKey(AggregatorMode mode, Thing thing) implements EvalOperator.ExpressionEvaluator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've somewhat mirrored the way we do grouping aggs with this and it seems to have worked out fairly well. It's not perfect, but it's a lot less confusing than I thought it would be.
|
||
for (int i = 0; i < prepared.length; i++) { | ||
prepared[i] = aggregators.get(i).prepareProcessPage(blockHash, page); | ||
} | ||
|
||
blockHash.add(wrapPage(page), add); | ||
blockHash.add(new Page(keys), add); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to modify BlockHash
to take a Block[]
with the blocks in the right position. But that seems like something for another time.
|
||
for (int i = 0; i < prepared.length; i++) { | ||
prepared[i] = aggregators.get(i).prepareProcessPage(blockHash, page); | ||
} | ||
|
||
blockHash.add(wrapPage(page), add); | ||
blockHash.add(new Page(keys), add); | ||
hashNanos += System.nanoTime() - add.hashStart; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably worth timing the evaluation here.
int[] aggBlockCounts = new int[aggregators.size()]; | ||
for (int a = 0; a < aggregators.size(); a++) { | ||
aggBlockCounts[a] = aggregators.get(a).evaluateBlockCount(); | ||
blockCount += aggBlockCounts[a]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found it a lot easier to read if I encoded the resultOffset
s into the GroupKeys. It'd be even easier to read if the offsets were encoded into the aggregators too. Or if we returns Block[]
.
@jan-elastic have a look at this one. It's closer, I think. Once we can figure out how this is supposed to work in the unit test I think we can iterate some more on the language side to figure out how to make it build that. |
...in/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/TokenListCategorizer.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public void replaceIntermediateKeys(BlockFactory blockFactory, Block[] blocks) { | ||
// NOCOMMIT this offset can't be the same in the result array and intermediate array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I bumped into this a few days ago and I think I need to dig some more - in this brave new world there's two "shapes" of data coming out of these grouping functions - the intermediate shape and the final shape. This is pretty similar to how aggs work - which is something I never fully understood to be honest. Anyway, I'm using resultOffset
here - but that's the result offset of the intermediate data. not the final offset. So it can't be right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @nik9000. I might be missing some context, but it seems we're trying to include SerializableTokenListCategory alongside the aggregated results for each driver. Could we resolve this by adding infrastructure to support SerializableTokenListCategory (or a variant) as the new block hash key?
CategorizationPartOfSpeechDictionary.getInstance(), | ||
0.70f | ||
); | ||
evaluator = new CategorizeEvaluator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the CategorizeEvaluator will be executed twice: here and in toEvaluator
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering the same.
However, when running the CategorizeOperatorTests
it seems that Categorize::toEvaluator
is never executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review with some questions
int offset = 0; | ||
for (int g = 0; g < groups.size(); g++) { | ||
blocks[offset] = keys[g]; | ||
groups.get(g).finish(blocks, selected, driverContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No offset passed to the finish()
here? How does it know where to place the blocks?
return mode.isInputPartial() ? thing.evalIntermediateInput(blockFactory, page) : thing.evalRawInput(page); | ||
} | ||
|
||
public int finishBlockCount() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: We're calling this "finish", while in the aggregator it's "evaluate". Some reason to keep those names separated? From what I understand, the operation is nearly the same (?)
maxPageSize, | ||
false | ||
List.of( | ||
// NOCOMMIT double check the mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commenting just in case this was forgotten
import java.util.List; | ||
|
||
public record GroupingKey(AggregatorMode mode, Thing thing, BlockFactory blockFactory) implements EvalOperator.ExpressionEvaluator { | ||
public interface Thing extends Releasable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is "Thing" the final name here?
This adds support to the compute engine for "stateful grouping functions". Think of these like
ExpressionEvaluator
s but they can: