Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Compute engine support for stateful grouping functions #112757

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

nik9000
Copy link
Member

@nik9000 nik9000 commented Sep 11, 2024

This adds support to the compute engine for "stateful grouping functions". Think of these like ExpressionEvaluators but they can:

  • Encode extra state to be passed to the coordinating node as part of the agg
  • Use that extra state to transform the group keys on the coordinating node
  • Apply a transformation using the output after the aggregation is complete
  • Use a different intermediate representation as final representation (think, "I group on an integer, but when finished I transform into a string")

import java.util.ArrayList;
import java.util.List;

public record GroupingKey(AggregatorMode mode, Thing thing) implements EvalOperator.ExpressionEvaluator {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've somewhat mirrored the way we do grouping aggs with this and it seems to have worked out fairly well. It's not perfect, but it's a lot less confusing than I thought it would be.


for (int i = 0; i < prepared.length; i++) {
prepared[i] = aggregators.get(i).prepareProcessPage(blockHash, page);
}

blockHash.add(wrapPage(page), add);
blockHash.add(new Page(keys), add);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to modify BlockHash to take a Block[] with the blocks in the right position. But that seems like something for another time.


for (int i = 0; i < prepared.length; i++) {
prepared[i] = aggregators.get(i).prepareProcessPage(blockHash, page);
}

blockHash.add(wrapPage(page), add);
blockHash.add(new Page(keys), add);
hashNanos += System.nanoTime() - add.hashStart;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably worth timing the evaluation here.

int[] aggBlockCounts = new int[aggregators.size()];
for (int a = 0; a < aggregators.size(); a++) {
aggBlockCounts[a] = aggregators.get(a).evaluateBlockCount();
blockCount += aggBlockCounts[a];
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it a lot easier to read if I encoded the resultOffsets into the GroupKeys. It'd be even easier to read if the offsets were encoded into the aggregators too. Or if we returns Block[].

@nik9000
Copy link
Member Author

nik9000 commented Sep 17, 2024

@jan-elastic have a look at this one. It's closer, I think.

Once we can figure out how this is supposed to work in the unit test I think we can iterate some more on the language side to figure out how to make it build that.


@Override
public void replaceIntermediateKeys(BlockFactory blockFactory, Block[] blocks) {
// NOCOMMIT this offset can't be the same in the result array and intermediate array
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bumped into this a few days ago and I think I need to dig some more - in this brave new world there's two "shapes" of data coming out of these grouping functions - the intermediate shape and the final shape. This is pretty similar to how aggs work - which is something I never fully understood to be honest. Anyway, I'm using resultOffset here - but that's the result offset of the intermediate data. not the final offset. So it can't be right.

Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nik9000. I might be missing some context, but it seems we're trying to include SerializableTokenListCategory alongside the aggregated results for each driver. Could we resolve this by adding infrastructure to support SerializableTokenListCategory (or a variant) as the new block hash key?

CategorizationPartOfSpeechDictionary.getInstance(),
0.70f
);
evaluator = new CategorizeEvaluator(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the CategorizeEvaluator will be executed twice: here and in toEvaluator?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering the same.

However, when running the CategorizeOperatorTests it seems that Categorize::toEvaluator is never executed.

@ivancea ivancea self-requested a review September 30, 2024 15:27
@alex-spies alex-spies self-requested a review September 30, 2024 15:28
@costin costin self-requested a review September 30, 2024 15:28
@iverase iverase self-requested a review September 30, 2024 15:29
Copy link
Contributor

@ivancea ivancea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review with some questions

int offset = 0;
for (int g = 0; g < groups.size(); g++) {
blocks[offset] = keys[g];
groups.get(g).finish(blocks, selected, driverContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No offset passed to the finish() here? How does it know where to place the blocks?

return mode.isInputPartial() ? thing.evalIntermediateInput(blockFactory, page) : thing.evalRawInput(page);
}

public int finishBlockCount() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We're calling this "finish", while in the aggregator it's "evaluate". Some reason to keep those names separated? From what I understand, the operation is nearly the same (?)

maxPageSize,
false
List.of(
// NOCOMMIT double check the mode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting just in case this was forgotten

import java.util.List;

public record GroupingKey(AggregatorMode mode, Thing thing, BlockFactory blockFactory) implements EvalOperator.ExpressionEvaluator {
public interface Thing extends Releasable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is "Thing" the final name here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants