-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ES|QL categorize v1 #112860
ES|QL categorize v1 #112860
Changes from all commits
b49703b
82d75ac
2f01aa0
3e18f56
22ff782
fd2b13c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
categorize | ||
required_capability: categorize | ||
|
||
FROM sample_data | ||
| STATS count=COUNT(), values=VALUES(message) BY category=CATEGORIZE(message) | ||
| SORT count DESC, category ASC | ||
; | ||
|
||
count:long | values:keyword | category:integer | ||
3 | [Connected to 10.1.0.1, Connected to 10.1.0.2, Connected to 10.1.0.3] | 0 | ||
3 | [Connection error] | 1 | ||
1 | [Disconnected] | 2 | ||
; |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.esql.expression.function.grouping; | ||
|
||
import org.apache.lucene.analysis.TokenStream; | ||
import org.apache.lucene.analysis.core.WhitespaceTokenizer; | ||
import org.apache.lucene.util.BytesRef; | ||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.util.BytesRefHash; | ||
import org.elasticsearch.compute.ann.Evaluator; | ||
import org.elasticsearch.compute.ann.Fixed; | ||
import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; | ||
import org.elasticsearch.index.analysis.CharFilterFactory; | ||
import org.elasticsearch.index.analysis.CustomAnalyzer; | ||
import org.elasticsearch.index.analysis.TokenFilterFactory; | ||
import org.elasticsearch.index.analysis.TokenizerFactory; | ||
import org.elasticsearch.xpack.esql.capabilities.Validatable; | ||
import org.elasticsearch.xpack.esql.core.expression.Expression; | ||
import org.elasticsearch.xpack.esql.core.tree.NodeInfo; | ||
import org.elasticsearch.xpack.esql.core.tree.Source; | ||
import org.elasticsearch.xpack.esql.core.type.DataType; | ||
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; | ||
import org.elasticsearch.xpack.esql.expression.function.Param; | ||
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; | ||
import org.elasticsearch.xpack.ml.aggs.categorization.CategorizationBytesRefHash; | ||
import org.elasticsearch.xpack.ml.aggs.categorization.CategorizationPartOfSpeechDictionary; | ||
import org.elasticsearch.xpack.ml.aggs.categorization.TokenListCategorizer; | ||
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.function.Function; | ||
|
||
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; | ||
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isString; | ||
|
||
/** | ||
* Categorizes text messages. | ||
* | ||
* This implementation is incomplete and comes with the following caveats: | ||
* - it only works correctly on a single node. | ||
* - when running on multiple nodes, category IDs of the different nodes are | ||
* aggregated, even though the same ID can correspond to a totally different | ||
* category | ||
* - the output consists of category IDs, which should be replaced by category | ||
* regexes or keys | ||
* | ||
* TODO(jan, nik): fix this | ||
*/ | ||
public class Categorize extends GroupingFunction implements Validatable { | ||
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( | ||
Expression.class, | ||
"Categorize", | ||
Categorize::new | ||
); | ||
|
||
private final Expression field; | ||
|
||
@FunctionInfo(returnType = { "integer" }, description = "Categorizes text messages") | ||
public Categorize( | ||
Source source, | ||
@Param(name = "field", type = { "text", "keyword" }, description = "Expression to categorize") Expression field | ||
) { | ||
super(source, List.of(field)); | ||
this.field = field; | ||
} | ||
|
||
private Categorize(StreamInput in) throws IOException { | ||
this(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class)); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
source().writeTo(out); | ||
out.writeNamedWriteable(field); | ||
} | ||
|
||
@Override | ||
public String getWriteableName() { | ||
return ENTRY.name; | ||
} | ||
|
||
@Override | ||
public boolean foldable() { | ||
return field.foldable(); | ||
} | ||
|
||
@Evaluator | ||
static int process( | ||
BytesRef v, | ||
@Fixed(includeInToString = false, build = true) CategorizationAnalyzer analyzer, | ||
@Fixed(includeInToString = false, build = true) TokenListCategorizer.CloseableTokenListCategorizer categorizer | ||
) { | ||
String s = v.utf8ToString(); | ||
try (TokenStream ts = analyzer.tokenStream("text", s)) { | ||
return categorizer.computeCategory(ts, s.length(), 1).getId(); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We'd talked about allows IOException from a lot of things in ESQL - we do read from files and such. But we never go around to it and I'm not sure we'd do it for |
||
} | ||
|
||
@Override | ||
public ExpressionEvaluator.Factory toEvaluator(Function<Expression, ExpressionEvaluator.Factory> toEvaluator) { | ||
return new CategorizeEvaluator.Factory( | ||
source(), | ||
toEvaluator.apply(field), | ||
context -> new CategorizationAnalyzer( | ||
// TODO(jan): get the correct analyzer in here, see CategorizationAnalyzerConfig::buildStandardCategorizationAnalyzer | ||
new CustomAnalyzer( | ||
TokenizerFactory.newFactory("whitespace", WhitespaceTokenizer::new), | ||
new CharFilterFactory[0], | ||
new TokenFilterFactory[0] | ||
), | ||
true | ||
), | ||
context -> new TokenListCategorizer.CloseableTokenListCategorizer( | ||
new CategorizationBytesRefHash(new BytesRefHash(2048, context.bigArrays())), | ||
CategorizationPartOfSpeechDictionary.getInstance(), | ||
0.70f | ||
) | ||
); | ||
} | ||
|
||
@Override | ||
protected TypeResolution resolveType() { | ||
return isString(field(), sourceText(), DEFAULT); | ||
} | ||
|
||
@Override | ||
public DataType dataType() { | ||
return DataType.INTEGER; | ||
} | ||
|
||
@Override | ||
public Expression replaceChildren(List<Expression> newChildren) { | ||
return new Categorize(source(), newChildren.get(0)); | ||
} | ||
|
||
@Override | ||
protected NodeInfo<? extends Expression> info() { | ||
return NodeInfo.create(this, Categorize::new, field); | ||
} | ||
|
||
public Expression field() { | ||
return field; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "Categorize{field=" + field + "}"; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's super expensive to do all this. But such is life.
I spent a little looking and am pretty sure there's a nice way to make a
Reader
that works on theBytesRef
directly and you don't need to make a String here. I couldn't find anything easy to just plug in, so I think it can wait.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing that out. I'll add it to the to do list.