Skip to content

Commit

Permalink
enhancement/pattern-expression
Browse files Browse the repository at this point in the history
fix pattern apply executor

add pattern predicate executor

add pattern predicate flag

fix edge cases

fix unused rollup apply

remove unnecessary project before PatternApply

small rename

fix anti-predicate

fix anti-predicate

fix where planner

fix not flattened expression

fix tck

fix tck

small delete
  • Loading branch information
czpmango committed Dec 5, 2022
1 parent ad34bcd commit e51fe53
Show file tree
Hide file tree
Showing 19 changed files with 531 additions and 22 deletions.
4 changes: 4 additions & 0 deletions src/graph/context/ast/CypherAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ struct Path final {
// "(v)-[:like]->()" in (v)-[:like]->()
std::string collectVariable;

// Flag for pattern predicate
bool isPred{false};
bool isAntiPred{false};

enum PathType : int8_t { kDefault, kAllShortest, kSingleShortest };
PathType pathType{PathType::kDefault};
};
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ nebula_add_library(
query/TraverseExecutor.cpp
query/AppendVerticesExecutor.cpp
query/RollUpApplyExecutor.cpp
query/PatternApplyExecutor.cpp
query/GetDstBySrcExecutor.cpp
algo/BFSShortestPathExecutor.cpp
algo/MultiShortestPathExecutor.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
#include "graph/executor/query/LeftJoinExecutor.h"
#include "graph/executor/query/LimitExecutor.h"
#include "graph/executor/query/MinusExecutor.h"
#include "graph/executor/query/PatternApplyExecutor.h"
#include "graph/executor/query/ProjectExecutor.h"
#include "graph/executor/query/RollUpApplyExecutor.h"
#include "graph/executor/query/SampleExecutor.h"
Expand Down Expand Up @@ -542,6 +543,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kRollUpApply: {
return pool->makeAndAdd<RollUpApplyExecutor>(node, qctx);
}
case PlanNode::Kind::kPatternApply: {
return pool->makeAndAdd<PatternApplyExecutor>(node, qctx);
}
case PlanNode::Kind::kArgument: {
return pool->makeAndAdd<ArgumentExecutor>(node, qctx);
}
Expand Down
154 changes: 154 additions & 0 deletions src/graph/executor/query/PatternApplyExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "graph/executor/query/PatternApplyExecutor.h"

#include "graph/context/Iterator.h"
#include "graph/context/QueryExpressionContext.h"
#include "graph/planner/plan/Query.h"

namespace nebula {
namespace graph {

folly::Future<Status> PatternApplyExecutor::execute() {
SCOPED_TIMER(&execTime_);
return patternApply();
}

Status PatternApplyExecutor::checkBiInputDataSets() {
auto* patternApply = asNode<PatternApply>(node());
lhsIter_ = ectx_->getResult(patternApply->leftInputVar()).iter();
DCHECK(!!lhsIter_);
if (lhsIter_->isGetNeighborsIter() || lhsIter_->isDefaultIter()) {
std::stringstream ss;
ss << "PatternApply executor does not support " << lhsIter_->kind();
return Status::Error(ss.str());
}
rhsIter_ = ectx_->getResult(patternApply->rightInputVar()).iter();
DCHECK(!!rhsIter_);
if (rhsIter_->isGetNeighborsIter() || rhsIter_->isDefaultIter()) {
std::stringstream ss;
ss << "PatternApply executor does not support " << rhsIter_->kind();
return Status::Error(ss.str());
}
isAntiPred_ = patternApply->isAntiPredicate();

return Status::OK();
}

void PatternApplyExecutor::collectValidKeys(const std::vector<Expression*>& keyCols,
Iterator* iter,
std::unordered_set<List>& validKeys) const {
QueryExpressionContext ctx(ectx_);
for (; iter->valid(); iter->next()) {
List list;
list.values.reserve(keyCols.size());
for (auto& col : keyCols) {
Value val = col->eval(ctx(iter));
list.values.emplace_back(std::move(val));
}
validKeys.emplace(std::move(list));
}
}

void PatternApplyExecutor::collectValidKey(Expression* keyCol,
Iterator* iter,
std::unordered_set<Value>& validKey) const {
QueryExpressionContext ctx(ectx_);
for (; iter->valid(); iter->next()) {
auto& val = keyCol->eval(ctx(iter));
validKey.emplace(val);
}
}

DataSet PatternApplyExecutor::applyZeroKey(Iterator* appliedIter, const bool allValid) {
DataSet ds;
ds.rows.reserve(appliedIter->size());
QueryExpressionContext ctx(ectx_);
for (; appliedIter->valid(); appliedIter->next()) {
Row row = mv_ ? appliedIter->moveRow() : *appliedIter->row();
if (allValid) {
ds.rows.emplace_back(std::move(row));
}
}
return ds;
}

DataSet PatternApplyExecutor::applySingleKey(Expression* appliedKey,
Iterator* appliedIter,
const std::unordered_set<Value>& validKey) {
DataSet ds;
ds.rows.reserve(appliedIter->size());
QueryExpressionContext ctx(ectx_);
for (; appliedIter->valid(); appliedIter->next()) {
auto& val = appliedKey->eval(ctx(appliedIter));
bool applyFlag = (validKey.find(val) != validKey.end()) ^ isAntiPred_;
if (applyFlag) {
Row row = mv_ ? appliedIter->moveRow() : *appliedIter->row();
ds.rows.emplace_back(std::move(row));
}
}
return ds;
}

DataSet PatternApplyExecutor::applyMultiKey(std::vector<Expression*> appliedKeys,
Iterator* appliedIter,
const std::unordered_set<List>& validKeys) {
DataSet ds;
ds.rows.reserve(appliedIter->size());
QueryExpressionContext ctx(ectx_);
for (; appliedIter->valid(); appliedIter->next()) {
List list;
list.values.reserve(appliedKeys.size());
for (auto& col : appliedKeys) {
Value val = col->eval(ctx(appliedIter));
list.values.emplace_back(std::move(val));
}

bool applyFlag = (validKeys.find(list) != validKeys.end()) ^ isAntiPred_;
if (applyFlag) {
Row row = mv_ ? appliedIter->moveRow() : *appliedIter->row();
ds.rows.emplace_back(std::move(row));
}
}
return ds;
}

folly::Future<Status> PatternApplyExecutor::patternApply() {
auto* patternApplyNode = asNode<PatternApply>(node());
NG_RETURN_IF_ERROR(checkBiInputDataSets());

DataSet result;
mv_ = movable(node()->inputVars()[0]);
auto keyCols = patternApplyNode->keyCols();
if (keyCols.size() == 0) {
// Reverse the valid flag if the pattern predicate is an anti-predicate
applyZeroKey(lhsIter_.get(), (rhsIter_->size() > 0) ^ isAntiPred_);
} else if (keyCols.size() == 1) {
std::unordered_set<Value> validKey;
collectValidKey(keyCols[0]->clone(), rhsIter_.get(), validKey);
result = applySingleKey(keyCols[0]->clone(), lhsIter_.get(), validKey);
} else {
// Copy the keyCols to refresh the inside propIndex_ cache
auto cloneExpr = [](std::vector<Expression*> exprs) {
std::vector<Expression*> applyColsCopy;
applyColsCopy.reserve(exprs.size());
for (auto& expr : exprs) {
applyColsCopy.emplace_back(expr->clone());
}
return applyColsCopy;
};

std::unordered_set<List> validKeys;
collectValidKeys(cloneExpr(keyCols), rhsIter_.get(), validKeys);
result = applyMultiKey(cloneExpr(keyCols), lhsIter_.get(), validKeys);
}

result.colNames = patternApplyNode->colNames();
return finish(ResultBuilder().value(Value(std::move(result))).build());
}

} // namespace graph
} // namespace nebula
52 changes: 52 additions & 0 deletions src/graph/executor/query/PatternApplyExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#pragma once

#include "graph/executor/Executor.h"

namespace nebula {
namespace graph {

class PatternApplyExecutor : public Executor {
public:
PatternApplyExecutor(const PlanNode* node, QueryContext* qctx)
: Executor("PatternApplyExecutor", node, qctx) {}

folly::Future<Status> execute() override;

protected:
Status checkBiInputDataSets();

void collectValidKeys(const std::vector<Expression*>& keyCols,
Iterator* iter,
std::unordered_set<List>& validKeys) const;

void collectValidKey(Expression* keyCol,
Iterator* iter,
std::unordered_set<Value>& validKey) const;

DataSet applyZeroKey(Iterator* appliedIter, const bool allValid);

DataSet applySingleKey(Expression* appliedCol,
Iterator* appliedIter,
const std::unordered_set<Value>& validKey);

DataSet applyMultiKey(std::vector<Expression*> appliedKeys,
Iterator* appliedIter,
const std::unordered_set<List>& validKeys);

folly::Future<Status> patternApply();
std::unique_ptr<Iterator> lhsIter_;
std::unique_ptr<Iterator> rhsIter_;

// Should apply the reverse when the pattern is an anti-predicate
bool isAntiPred_{false};
// Check if the apply side dataset movable
bool mv_{false};
};

} // namespace graph
} // namespace nebula
1 change: 1 addition & 0 deletions src/graph/optimizer/rule/RemoveNoopProjectRule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ namespace opt {
PlanNode::Kind::kHashInnerJoin,
PlanNode::Kind::kCrossJoin,
PlanNode::Kind::kRollUpApply,
PlanNode::Kind::kPatternApply,
PlanNode::Kind::kArgument};

std::unique_ptr<OptRule> RemoveNoopProjectRule::kInstance =
Expand Down
6 changes: 5 additions & 1 deletion src/graph/planner/match/MatchPathPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ StatusOr<SubPlan> MatchPathPlanner::transform(WhereClauseContext* bindWhere,
NG_RETURN_IF_ERROR(findStarts(bindWhere, nodeAliasesSeen, startFromEdge, startIndex, subplan));
NG_RETURN_IF_ERROR(expand(startFromEdge, startIndex, subplan));

MatchSolver::buildProjectColumns(ctx_->qctx, path_, subplan);
// No need to actually build path if the path is just a predicate
if (!path_.isPred) {
MatchSolver::buildProjectColumns(ctx_->qctx, path_, subplan);
}

return subplan;
}

Expand Down
24 changes: 23 additions & 1 deletion src/graph/planner/match/SegmentsConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,34 @@ SubPlan SegmentsConnector::rollUpApply(CypherClauseContextBase* ctx,
return newPlan;
}

/*static*/ SubPlan SegmentsConnector::patternApply(CypherClauseContextBase* ctx,
const SubPlan& left,
const SubPlan& right,
const graph::Path& path) {
SubPlan newPlan = left;
auto qctx = ctx->qctx;
std::vector<Expression*> keyProps;
for (const auto& col : path.compareVariables) {
keyProps.emplace_back(FunctionCallExpression::make(
qctx->objPool(), "id", {InputPropertyExpression::make(qctx->objPool(), col)}));
}
auto* patternApply = PatternApply::make(
qctx, left.root, DCHECK_NOTNULL(right.root), std::move(keyProps), path.isAntiPred);
// Left side input may be nullptr, which will be filled later
std::vector<std::string> colNames =
left.root != nullptr ? left.root->colNames() : ctx->inputColNames;
patternApply->setColNames(std::move(colNames));
newPlan.root = patternApply;
newPlan.tail = (newPlan.tail == nullptr ? patternApply : newPlan.tail);
return newPlan;
}

SubPlan SegmentsConnector::addInput(const SubPlan& left, const SubPlan& right, bool copyColNames) {
if (left.root == nullptr) {
return right;
}
SubPlan newPlan = left;
DCHECK(left.root->isSingleInput());

if (left.tail->isSingleInput()) {
auto* mutableLeft = const_cast<PlanNode*>(left.tail);
auto* siLeft = static_cast<SingleInputNode*>(mutableLeft);
Expand Down
5 changes: 5 additions & 0 deletions src/graph/planner/match/SegmentsConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class SegmentsConnector final {
const SubPlan& right,
const graph::Path& path);

static SubPlan patternApply(CypherClauseContextBase* ctx,
const SubPlan& left,
const SubPlan& right,
const graph::Path& path);

/*
* left->right
*/
Expand Down
38 changes: 24 additions & 14 deletions src/graph/planner/match/WhereClausePlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,37 @@ StatusOr<SubPlan> WhereClausePlanner::transform(CypherClauseContextBase* ctx) {
}

auto* wctx = static_cast<WhereClauseContext*>(ctx);
SubPlan wherePlan;
if (wctx->filter) {
auto* newFilter = MatchSolver::doRewrite(wctx->qctx, wctx->aliasesAvailable, wctx->filter);
wherePlan.root = Filter::make(wctx->qctx, nullptr, newFilter, needStableFilter_);
wherePlan.tail = wherePlan.root;

SubPlan subPlan;
// Build plan for pattern from expression
SubPlan plan;
if (!wctx->paths.empty()) {
SubPlan pathsPlan;
// Build plan for pattern expression
for (auto& path : wctx->paths) {
auto status = MatchPathPlanner(wctx, path).transform(nullptr, {});
NG_RETURN_IF_ERROR(status);
subPlan = SegmentsConnector::rollUpApply(wctx, subPlan, std::move(status).value(), path);
}
if (subPlan.root != nullptr) {
wherePlan = SegmentsConnector::addInput(wherePlan, subPlan, true);
auto pathPlan = std::move(status).value();

if (path.isPred) {
// Build plan for pattern predicates
pathsPlan = SegmentsConnector::patternApply(wctx, pathsPlan, pathPlan, path);
} else {
pathsPlan = SegmentsConnector::rollUpApply(wctx, pathsPlan, pathPlan, path);
}
}
plan = pathsPlan;
}

return wherePlan;
if (wctx->filter) {
SubPlan wherePlan;
auto* newFilter = MatchSolver::doRewrite(wctx->qctx, wctx->aliasesAvailable, wctx->filter);
wherePlan.root = Filter::make(wctx->qctx, nullptr, newFilter, needStableFilter_);
wherePlan.tail = wherePlan.root;
if (plan.root == nullptr) {
return wherePlan;
}
plan = SegmentsConnector::addInput(wherePlan, plan, true);
}

return wherePlan;
return plan;
}
} // namespace graph
} // namespace nebula
2 changes: 2 additions & 0 deletions src/graph/planner/plan/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ const char* PlanNode::toString(PlanNode::Kind kind) {
return "Argument";
case Kind::kRollUpApply:
return "RollUpApply";
case Kind::kPatternApply:
return "PatternApply";
case Kind::kGetDstBySrc:
return "GetDstBySrc";
// no default so the compiler will warning when lack
Expand Down
1 change: 1 addition & 0 deletions src/graph/planner/plan/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class PlanNode {
kHashInnerJoin,
kCrossJoin,
kRollUpApply,
kPatternApply,
kArgument,

// Logic
Expand Down
Loading

0 comments on commit e51fe53

Please sign in to comment.