From 49a55d29c1413072d3b2aae152994d1d8e63b49c Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Wed, 29 May 2024 20:50:23 -0700 Subject: [PATCH] wip --- .../src/main/resources/search.csv-spec | 5 +- .../xpack/esql/io/stream/PlanNamedTypes.java | 12 +++ .../optimizer/LocalPhysicalPlanOptimizer.java | 34 +++++++++ .../xpack/esql/plan/logical/search/Rank.java | 7 +- .../xpack/esql/plan/logical/search/Score.java | 15 ++++ .../esql/plan/logical/search/SearchUtils.java | 22 ++++++ .../physical/search/EsScoreQueryExec.java | 74 +++++++++++++++++++ .../esql/plan/physical/search/RankExec.java | 66 +++++++++++++++++ .../xpack/esql/planner/Mapper.java | 6 ++ 9 files changed, 237 insertions(+), 4 deletions(-) create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/search/SearchUtils.java create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/search/EsScoreQueryExec.java create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/search/RankExec.java diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/search.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/search.csv-spec index 142ffb2918289..9923e8f82cfc8 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/search.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/search.csv-spec @@ -4,10 +4,11 @@ search employees [ ] | keep emp_no | sort emp_no +| limit 5 ; -emp_no:integer | _score:double -10013 | Eberhardt +emp_no:integer +10013 ; scoreOnNumeric diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java index d64d70470c605..d699ebfd08ab3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java @@ -186,6 +186,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.TopN; import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject; +import org.elasticsearch.xpack.esql.plan.logical.search.Rank; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.DissectExec; import org.elasticsearch.xpack.esql.plan.physical.EnrichExec; @@ -300,6 +301,7 @@ public static List namedTypeEntries() { of(LogicalPlan.class, MvExpand.class, PlanNamedTypes::writeMvExpand, PlanNamedTypes::readMvExpand), of(LogicalPlan.class, OrderBy.class, PlanNamedTypes::writeOrderBy, PlanNamedTypes::readOrderBy), of(LogicalPlan.class, Project.class, PlanNamedTypes::writeProject, PlanNamedTypes::readProject), + of(LogicalPlan.class, Rank.class, PlanNamedTypes::writeRank, PlanNamedTypes::readRank), of(LogicalPlan.class, TopN.class, PlanNamedTypes::writeTopN, PlanNamedTypes::readTopN), // Attributes of(Attribute.class, FieldAttribute.class, PlanNamedTypes::writeFieldAttribute, PlanNamedTypes::readFieldAttribute), @@ -1013,6 +1015,16 @@ static void writeProject(PlanStreamOutput out, Project project) throws IOExcepti writeNamedExpressions(out, project.projections()); } + static Rank readRank(PlanStreamInput in) throws IOException { + return new Rank(in.readSource(), in.readLogicalPlanNode(), in.readExpression()); + } + + static void writeRank(PlanStreamOutput out, Rank rank) throws IOException { + out.writeNoSource(); + out.writeLogicalPlanNode(rank.child()); + out.writeExpression(rank.query()); + } + static TopN readTopN(PlanStreamInput in) throws IOException { return new TopN( in.readSource(), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java index d1eb8c32dfcb0..5392b65f3a80f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java @@ -10,6 +10,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.core.common.Failure; import org.elasticsearch.xpack.esql.core.expression.Alias; @@ -60,6 +61,8 @@ import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; import org.elasticsearch.xpack.esql.plan.physical.UnaryExec; +import org.elasticsearch.xpack.esql.plan.physical.search.EsScoreQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.search.RankExec; import org.elasticsearch.xpack.esql.planner.AbstractPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.EsqlTranslatorHandler; import org.elasticsearch.xpack.esql.stats.SearchStats; @@ -109,6 +112,7 @@ protected List> rules(boolean optimizeForEsSource) { esSourceRules.add(new PushTopNToSource()); esSourceRules.add(new PushLimitToSource()); esSourceRules.add(new PushFiltersToSource()); + esSourceRules.add(new PushRankToSource()); esSourceRules.add(new PushStatsToSource()); } @@ -353,6 +357,36 @@ private static boolean canPushSorts(PhysicalPlan plan) { return false; } + private static class PushRankToSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule< + RankExec, + LocalPhysicalOptimizerContext> { + @Override + protected PhysicalPlan rule(RankExec rankExec, LocalPhysicalOptimizerContext ctx) { + PhysicalPlan plan = rankExec; + PhysicalPlan child = rankExec.child(); + + Query queryDSL = TRANSLATOR_HANDLER.asQuery(rankExec.query()); + QueryBuilder planQuery = queryDSL.asBuilder(); + + if (child instanceof EsQueryExec esQueryExec) { + // create a scoring source + EsScoreQueryExec scoreQueryExec = new EsScoreQueryExec( + esQueryExec.source(), + esQueryExec.index(), + esQueryExec.attrs(), + planQuery, + esQueryExec.limit(), + emptyList(), + esQueryExec.estimatedRowSize() + ); + + return esQueryExec; + } else { + throw new EsqlIllegalArgumentException("Unexpected branch"); + } + } + } + /** * Looks for the case where certain stats exist right before the query and thus can be pushed down. */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/search/Rank.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/search/Rank.java index e434899834a17..41ede9108f311 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/search/Rank.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/search/Rank.java @@ -37,11 +37,14 @@ public Rank(Source source, LogicalPlan child, Expression query) { this.query = query; } + public Expression query() { + return query; + } + @Override public List output() { if (lazyOutput == null) { - MetadataAttribute score = new MetadataAttribute(source(), "_score", DataTypes.FLOAT, false); - lazyOutput = mergeOutputAttributes(asList(score), child().output()); + lazyOutput = mergeOutputAttributes(asList(SearchUtils.scoreField(source())), child().output()); } return lazyOutput; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/search/Score.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/search/Score.java index 790a745ce14ba..4aa0a3b1848b7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/search/Score.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/search/Score.java @@ -7,23 +7,38 @@ package org.elasticsearch.xpack.esql.plan.logical.search; +import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.core.plan.logical.UnaryPlan; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; +import java.util.List; import java.util.Objects; +import static java.util.Arrays.asList; +import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; + public class Score extends UnaryPlan { private final Expression query; + private List lazyOutput; public Score(Source source, LogicalPlan child, Expression query) { super(source, child); this.query = query; } + @Override + public List output() { + if (lazyOutput == null) { + lazyOutput = mergeOutputAttributes(asList(SearchUtils.scoreField(source())), child().output()); + } + + return lazyOutput; + } + @Override public boolean expressionsResolved() { return query.resolved(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/search/SearchUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/search/SearchUtils.java new file mode 100644 index 0000000000000..0edd6ff44c356 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/search/SearchUtils.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical.search; + +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataTypes; + +class SearchUtils { + + private SearchUtils() {} + + static Attribute scoreField(Source source) { + return new MetadataAttribute(source, "_score", DataTypes.DOUBLE, false); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/search/EsScoreQueryExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/search/EsScoreQueryExec.java new file mode 100644 index 0000000000000..b90ad50dd0536 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/search/EsScoreQueryExec.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.physical.search; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; +import org.elasticsearch.xpack.esql.core.index.EsIndex; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.NodeUtils; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataTypes; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class EsScoreQueryExec extends EsQueryExec { + + static final EsField SCORE_FIELD = new EsField("_score", DataTypes.DOUBLE, Map.of(), false); + + public EsScoreQueryExec(Source source, + EsIndex index, + List attrs, + QueryBuilder query, + Expression limit, + List sorts, + Integer estimatedRowSize) { + super(source, index, IndexMode.STANDARD, attrs, query, limit, sorts, estimatedRowSize); + } + + public static boolean isScoreAttribute(Attribute attr) { + return attr instanceof MetadataAttribute && SCORE_FIELD.getName().equals(attr.name()); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, EsScoreQueryExec::new, index(), attrs(), query(), limit(), sorts(), estimatedRowSize()); + } + + public EsScoreQueryExec withLimit(Expression limit) { + return Objects.equals(this.limit(), limit) + ? this + : new EsScoreQueryExec(source(), index(), attrs(), query(), limit, sorts(), estimatedRowSize()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + return super.equals(obj); + } + + @Override + public int hashCode() { + return super.hashCode(); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/search/RankExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/search/RankExec.java new file mode 100644 index 0000000000000..b5e15a4ac0174 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/search/RankExec.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.physical.search; + +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.UnaryExec; + +import java.util.List; +import java.util.Objects; + +public class RankExec extends UnaryExec { + + private final Expression query; + + public RankExec(Source source, PhysicalPlan child, Expression query) { + super(source, child); + this.query = query; + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, RankExec::new, child(), query); + } + + @Override + public RankExec replaceChild(PhysicalPlan newChild) { + return new RankExec(source(), newChild, query); + } + + public Expression query() { + return query; + } + + @Override + public List output() { + return child().output(); + } + + @Override + public int hashCode() { + return Objects.hash(query, child()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + RankExec other = (RankExec) obj; + return Objects.equals(query, other.query) && Objects.equals(child(), other.child()); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java index 12052b92432cf..4d1477880ee57 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Mapper.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.esql.plan.logical.TopN; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; import org.elasticsearch.xpack.esql.plan.logical.meta.MetaFunctions; +import org.elasticsearch.xpack.esql.plan.logical.search.Rank; import org.elasticsearch.xpack.esql.plan.logical.show.ShowInfo; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.DissectExec; @@ -46,6 +47,7 @@ import org.elasticsearch.xpack.esql.plan.physical.RowExec; import org.elasticsearch.xpack.esql.plan.physical.ShowExec; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; +import org.elasticsearch.xpack.esql.plan.physical.search.RankExec; import static org.elasticsearch.xpack.esql.plan.physical.AggregateExec.Mode; import static org.elasticsearch.xpack.esql.plan.physical.AggregateExec.Mode.FINAL; @@ -162,6 +164,10 @@ private PhysicalPlan map(UnaryPlan p, PhysicalPlan child) { return new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded()); } + if (p instanceof Rank rank) { + return new RankExec(rank.source(), map(rank.child()), rank.query()); + } + // // Pipeline breakers //