Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
costin committed May 30, 2024
1 parent 63c8c80 commit 49a55d2
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ search employees [
]
| keep emp_no
| sort emp_no
| limit 5
;

emp_no:integer | _score:double
10013 | Eberhardt
emp_no:integer
10013
;

scoreOnNumeric
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
import org.elasticsearch.xpack.esql.plan.logical.search.Rank;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
Expand Down Expand Up @@ -300,6 +301,7 @@ public static List<PlanNameRegistry.Entry> namedTypeEntries() {
of(LogicalPlan.class, MvExpand.class, PlanNamedTypes::writeMvExpand, PlanNamedTypes::readMvExpand),
of(LogicalPlan.class, OrderBy.class, PlanNamedTypes::writeOrderBy, PlanNamedTypes::readOrderBy),
of(LogicalPlan.class, Project.class, PlanNamedTypes::writeProject, PlanNamedTypes::readProject),
of(LogicalPlan.class, Rank.class, PlanNamedTypes::writeRank, PlanNamedTypes::readRank),
of(LogicalPlan.class, TopN.class, PlanNamedTypes::writeTopN, PlanNamedTypes::readTopN),
// Attributes
of(Attribute.class, FieldAttribute.class, PlanNamedTypes::writeFieldAttribute, PlanNamedTypes::readFieldAttribute),
Expand Down Expand Up @@ -1013,6 +1015,16 @@ static void writeProject(PlanStreamOutput out, Project project) throws IOExcepti
writeNamedExpressions(out, project.projections());
}

static Rank readRank(PlanStreamInput in) throws IOException {
return new Rank(in.readSource(), in.readLogicalPlanNode(), in.readExpression());
}

static void writeRank(PlanStreamOutput out, Rank rank) throws IOException {
out.writeNoSource();
out.writeLogicalPlanNode(rank.child());
out.writeExpression(rank.query());
}

static TopN readTopN(PlanStreamInput in) throws IOException {
return new TopN(
in.readSource(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.core.common.Failure;
import org.elasticsearch.xpack.esql.core.expression.Alias;
Expand Down Expand Up @@ -60,6 +61,8 @@
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
import org.elasticsearch.xpack.esql.plan.physical.search.EsScoreQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.search.RankExec;
import org.elasticsearch.xpack.esql.planner.AbstractPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.EsqlTranslatorHandler;
import org.elasticsearch.xpack.esql.stats.SearchStats;
Expand Down Expand Up @@ -109,6 +112,7 @@ protected List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
esSourceRules.add(new PushTopNToSource());
esSourceRules.add(new PushLimitToSource());
esSourceRules.add(new PushFiltersToSource());
esSourceRules.add(new PushRankToSource());
esSourceRules.add(new PushStatsToSource());
}

Expand Down Expand Up @@ -353,6 +357,36 @@ private static boolean canPushSorts(PhysicalPlan plan) {
return false;
}

private static class PushRankToSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
RankExec,
LocalPhysicalOptimizerContext> {
@Override
protected PhysicalPlan rule(RankExec rankExec, LocalPhysicalOptimizerContext ctx) {
PhysicalPlan plan = rankExec;
PhysicalPlan child = rankExec.child();

Query queryDSL = TRANSLATOR_HANDLER.asQuery(rankExec.query());
QueryBuilder planQuery = queryDSL.asBuilder();

if (child instanceof EsQueryExec esQueryExec) {
// create a scoring source
EsScoreQueryExec scoreQueryExec = new EsScoreQueryExec(
esQueryExec.source(),
esQueryExec.index(),
esQueryExec.attrs(),
planQuery,
esQueryExec.limit(),
emptyList(),
esQueryExec.estimatedRowSize()
);

return esQueryExec;
} else {
throw new EsqlIllegalArgumentException("Unexpected branch");
}
}
}

/**
* Looks for the case where certain stats exist right before the query and thus can be pushed down.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ public Rank(Source source, LogicalPlan child, Expression query) {
this.query = query;
}

public Expression query() {
return query;
}

@Override
public List<Attribute> output() {
if (lazyOutput == null) {
MetadataAttribute score = new MetadataAttribute(source(), "_score", DataTypes.FLOAT, false);
lazyOutput = mergeOutputAttributes(asList(score), child().output());
lazyOutput = mergeOutputAttributes(asList(SearchUtils.scoreField(source())), child().output());
}

return lazyOutput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,38 @@

package org.elasticsearch.xpack.esql.plan.logical.search;

import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.core.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;

import java.util.List;
import java.util.Objects;

import static java.util.Arrays.asList;
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;

public class Score extends UnaryPlan {

private final Expression query;
private List<Attribute> lazyOutput;

public Score(Source source, LogicalPlan child, Expression query) {
super(source, child);
this.query = query;
}

@Override
public List<Attribute> output() {
if (lazyOutput == null) {
lazyOutput = mergeOutputAttributes(asList(SearchUtils.scoreField(source())), child().output());
}

return lazyOutput;
}

@Override
public boolean expressionsResolved() {
return query.resolved();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plan.logical.search;

import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataTypes;

class SearchUtils {

private SearchUtils() {}

static Attribute scoreField(Source source) {
return new MetadataAttribute(source, "_score", DataTypes.DOUBLE, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plan.physical.search;

import org.elasticsearch.common.Strings;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.index.EsIndex;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataTypes;
import org.elasticsearch.xpack.esql.core.type.EsField;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;

import java.util.List;
import java.util.Map;
import java.util.Objects;

public class EsScoreQueryExec extends EsQueryExec {

static final EsField SCORE_FIELD = new EsField("_score", DataTypes.DOUBLE, Map.of(), false);

public EsScoreQueryExec(Source source,
EsIndex index,
List<Attribute> attrs,
QueryBuilder query,
Expression limit,
List<FieldSort> sorts,
Integer estimatedRowSize) {
super(source, index, IndexMode.STANDARD, attrs, query, limit, sorts, estimatedRowSize);
}

public static boolean isScoreAttribute(Attribute attr) {
return attr instanceof MetadataAttribute && SCORE_FIELD.getName().equals(attr.name());
}

@Override
protected NodeInfo<EsQueryExec> info() {
return NodeInfo.create(this, EsScoreQueryExec::new, index(), attrs(), query(), limit(), sorts(), estimatedRowSize());
}

public EsScoreQueryExec withLimit(Expression limit) {
return Objects.equals(this.limit(), limit)
? this
: new EsScoreQueryExec(source(), index(), attrs(), query(), limit, sorts(), estimatedRowSize());
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null || getClass() != obj.getClass()) {
return false;
}

return super.equals(obj);
}

@Override
public int hashCode() {
return super.hashCode();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plan.physical.search;

import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;

import java.util.List;
import java.util.Objects;

public class RankExec extends UnaryExec {

private final Expression query;

public RankExec(Source source, PhysicalPlan child, Expression query) {
super(source, child);
this.query = query;
}

@Override
protected NodeInfo<RankExec> info() {
return NodeInfo.create(this, RankExec::new, child(), query);
}

@Override
public RankExec replaceChild(PhysicalPlan newChild) {
return new RankExec(source(), newChild, query);
}

public Expression query() {
return query;
}

@Override
public List<Attribute> output() {
return child().output();
}

@Override
public int hashCode() {
return Objects.hash(query, child());
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null || getClass() != obj.getClass()) {
return false;
}

RankExec other = (RankExec) obj;
return Objects.equals(query, other.query) && Objects.equals(child(), other.child());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.meta.MetaFunctions;
import org.elasticsearch.xpack.esql.plan.logical.search.Rank;
import org.elasticsearch.xpack.esql.plan.logical.show.ShowInfo;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
Expand All @@ -46,6 +47,7 @@
import org.elasticsearch.xpack.esql.plan.physical.RowExec;
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
import org.elasticsearch.xpack.esql.plan.physical.search.RankExec;

import static org.elasticsearch.xpack.esql.plan.physical.AggregateExec.Mode;
import static org.elasticsearch.xpack.esql.plan.physical.AggregateExec.Mode.FINAL;
Expand Down Expand Up @@ -162,6 +164,10 @@ private PhysicalPlan map(UnaryPlan p, PhysicalPlan child) {
return new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded());
}

if (p instanceof Rank rank) {
return new RankExec(rank.source(), map(rank.child()), rank.query());
}

//
// Pipeline breakers
//
Expand Down

0 comments on commit 49a55d2

Please sign in to comment.