Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move probe rows of Join. #4283

Merged
merged 2 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/graph/executor/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class Executor : private boost::noncopyable, private cpp::NonMovable {
// Check whether the variable is movable, it's movable when reach end of lifetime
// This method shouldn't call after `finish` method!
bool movable(const Variable *var);
bool movable(const std::string &var) {
return movable(qctx_->symTable()->getVar(var));
}

// Store the result of this executor to execution context
Status finish(Result &&result);
Expand Down
74 changes: 57 additions & 17 deletions src/graph/executor/query/InnerJoinExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,25 @@ folly::Future<Status> InnerJoinExecutor::join(const std::vector<Expression*>& ha
hashTable.reserve(bucketSize);
if (lhsIter_->size() < rhsIter_->size()) {
buildSingleKeyHashTable(hashKeys.front(), lhsIter_.get(), hashTable);
mv_ = movable(rightVar());
result = singleKeyProbe(probeKeys.front(), rhsIter_.get(), hashTable);
} else {
exchange_ = true;
buildSingleKeyHashTable(probeKeys.front(), rhsIter_.get(), hashTable);
mv_ = movable(leftVar());
result = singleKeyProbe(hashKeys.front(), lhsIter_.get(), hashTable);
}
} else {
std::unordered_map<List, std::vector<const Row*>> hashTable;
hashTable.reserve(bucketSize);
if (lhsIter_->size() < rhsIter_->size()) {
buildHashTable(hashKeys, lhsIter_.get(), hashTable);
mv_ = movable(rightVar());
result = probe(probeKeys, rhsIter_.get(), hashTable);
} else {
exchange_ = true;
buildHashTable(probeKeys, rhsIter_.get(), hashTable);
mv_ = movable(leftVar());
result = probe(hashKeys, lhsIter_.get(), hashTable);
}
}
Expand All @@ -74,7 +78,13 @@ DataSet InnerJoinExecutor::probe(
Value val = col->eval(ctx(probeIter));
list.values.emplace_back(std::move(val));
}
buildNewRow<List>(hashTable, list, *probeIter->row(), ds);
if (mv_) {
// Probe row only match key in HashTable once, so we could move it directly,
// key/value in HashTable will be matched multiple times, so we can't move it.
buildNewRow<List>(hashTable, list, probeIter->moveRow(), ds);
} else {
buildNewRow<List>(hashTable, list, *probeIter->row(), ds);
}
}
return ds;
}
Expand All @@ -87,37 +97,66 @@ DataSet InnerJoinExecutor::singleKeyProbe(
QueryExpressionContext ctx(ectx_);
for (; probeIter->valid(); probeIter->next()) {
auto& val = probeKey->eval(ctx(probeIter));
buildNewRow<Value>(hashTable, val, *probeIter->row(), ds);
if (mv_) {
// Probe row only match key in HashTable once, so we could move it directly,
// key/value in HashTable will be matched multiple times, so we can't move it.
buildNewRow<Value>(hashTable, val, probeIter->moveRow(), ds);
} else {
buildNewRow<Value>(hashTable, val, *probeIter->row(), ds);
}
}
return ds;
}

template <class T>
void InnerJoinExecutor::buildNewRow(const std::unordered_map<T, std::vector<const Row*>>& hashTable,
const T& val,
const Row& rRow,
Row rRow,
DataSet& ds) const {
const auto& range = hashTable.find(val);
if (range == hashTable.end()) {
return;
}
for (auto* row : range->second) {
auto& lRow = *row;
Row newRow;
newRow.reserve(lRow.size() + rRow.size());
auto& values = newRow.values;
for (std::size_t i = 0; i < (range->second.size() - 1); ++i) {
if (exchange_) {
values.insert(values.end(),
std::make_move_iterator(rRow.values.begin()),
std::make_move_iterator(rRow.values.end()));
values.insert(values.end(), lRow.values.begin(), lRow.values.end());
ds.rows.emplace_back(newRow(rRow, *range->second[i]));
} else {
values.insert(values.end(), lRow.values.begin(), lRow.values.end());
values.insert(values.end(),
std::make_move_iterator(rRow.values.begin()),
std::make_move_iterator(rRow.values.end()));
ds.rows.emplace_back(newRow(*range->second[i], rRow));
}
ds.rows.emplace_back(std::move(newRow));
}
// Move probe row in last new row creating
if (exchange_) {
ds.rows.emplace_back(newRow(std::move(rRow), *range->second.back()));
} else {
ds.rows.emplace_back(newRow(*range->second.back(), std::move(rRow)));
}
}

Row InnerJoinExecutor::newRow(Row left, Row right) const {
Row r;
r.reserve(left.size() + right.size());
r.values.insert(r.values.end(),
std::make_move_iterator(left.values.begin()),
std::make_move_iterator(left.values.end()));
r.values.insert(r.values.end(),
std::make_move_iterator(right.values.begin()),
std::make_move_iterator(right.values.end()));
return r;
}

const std::string& InnerJoinExecutor::leftVar() const {
if (node_->kind() == PlanNode::Kind::kBiInnerJoin) {
return node_->asNode<BiJoin>()->leftInputVar();
} else {
return node_->asNode<Join>()->leftVar().first;
}
}

const std::string& InnerJoinExecutor::rightVar() const {
if (node_->kind() == PlanNode::Kind::kBiInnerJoin) {
return node_->asNode<BiJoin>()->rightInputVar();
} else {
return node_->asNode<Join>()->rightVar().first;
}
}

Expand All @@ -132,5 +171,6 @@ folly::Future<Status> BiInnerJoinExecutor::execute() {
NG_RETURN_IF_ERROR(checkBiInputDataSets());
return join(joinNode->hashKeys(), joinNode->probeKeys(), joinNode->colNames());
}

} // namespace graph
} // namespace nebula
11 changes: 10 additions & 1 deletion src/graph/executor/query/InnerJoinExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,20 @@ class InnerJoinExecutor : public JoinExecutor {
template <class T>
void buildNewRow(const std::unordered_map<T, std::vector<const Row*>>& hashTable,
const T& val,
const Row& rRow,
Row rRow,
DataSet& ds) const;

// concat rows
Row newRow(Row left, Row right) const;

const std::string& leftVar() const;

const std::string& rightVar() const;

private:
bool exchange_{false};
// Does the probe result movable?
bool mv_{false};
};

// No diffrence with inner join in processing data, but the dependencies would be executed in
Expand Down
12 changes: 12 additions & 0 deletions src/graph/executor/query/JoinExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,17 @@ void JoinExecutor::buildSingleKeyHashTable(
}
}

Row JoinExecutor::newRow(Row left, Row right) const {
Row r;
r.reserve(left.size() + right.size());
r.values.insert(r.values.end(),
std::make_move_iterator(left.values.begin()),
std::make_move_iterator(left.values.end()));
r.values.insert(r.values.end(),
std::make_move_iterator(right.values.begin()),
std::make_move_iterator(right.values.end()));
return r;
}

} // namespace graph
} // namespace nebula
3 changes: 3 additions & 0 deletions src/graph/executor/query/JoinExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class JoinExecutor : public Executor {
Iterator* iter,
std::unordered_map<Value, std::vector<const Row*>>& hashTable) const;

// concat rows
Row newRow(Row left, Row right) const;

std::unique_ptr<Iterator> lhsIter_;
std::unique_ptr<Iterator> rhsIter_;
size_t colSize_{0};
Expand Down
34 changes: 21 additions & 13 deletions src/graph/executor/query/LeftJoinExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ folly::Future<Status> LeftJoinExecutor::join(const std::vector<Expression*>& has
hashTable.reserve(rhsIter_->empty() ? 1 : rhsIter_->size());
if (!lhsIter_->empty()) {
buildSingleKeyHashTable(probeKeys.front(), rhsIter_.get(), hashTable);
mv_ = movable(node()->inputVars()[0]);
result = singleKeyProbe(hashKeys.front(), lhsIter_.get(), hashTable);
}
} else {
std::unordered_map<List, std::vector<const Row*>> hashTable;
hashTable.reserve(rhsIter_->empty() ? 1 : rhsIter_->size());
if (!lhsIter_->empty()) {
buildHashTable(probeKeys, rhsIter_.get(), hashTable);
mv_ = movable(node()->inputVars()[0]);
result = probe(hashKeys, lhsIter_.get(), hashTable);
}
}
Expand All @@ -62,7 +64,13 @@ DataSet LeftJoinExecutor::probe(
list.values.emplace_back(std::move(val));
}

buildNewRow<List>(hashTable, list, *probeIter->row(), ds);
if (mv_) {
// Probe row only match key in HashTable once, so we could move it directly,
// key/value in HashTable will be matched multiple times, so we can't move it.
buildNewRow<List>(hashTable, list, probeIter->moveRow(), ds);
} else {
buildNewRow<List>(hashTable, list, *probeIter->row(), ds);
}
}
return ds;
}
Expand All @@ -76,15 +84,21 @@ DataSet LeftJoinExecutor::singleKeyProbe(
QueryExpressionContext ctx(ectx_);
for (; probeIter->valid(); probeIter->next()) {
auto& val = probeKey->eval(ctx(probeIter));
buildNewRow<Value>(hashTable, val, *probeIter->row(), ds);
if (mv_) {
// Probe row only match key in HashTable once, so we could move it directly,
// key/value in HashTable will be matched multiple times, so we can't move it.
buildNewRow<Value>(hashTable, val, probeIter->moveRow(), ds);
} else {
buildNewRow<Value>(hashTable, val, *probeIter->row(), ds);
}
}
return ds;
}

template <class T>
void LeftJoinExecutor::buildNewRow(const std::unordered_map<T, std::vector<const Row*>>& hashTable,
const T& val,
const Row& lRow,
Row lRow,
DataSet& ds) const {
auto range = hashTable.find(val);
if (range == hashTable.end()) {
Expand All @@ -98,17 +112,11 @@ void LeftJoinExecutor::buildNewRow(const std::unordered_map<T, std::vector<const
values.insert(values.end(), colSize_ - lRowSize, Value::kNullValue);
ds.rows.emplace_back(std::move(newRow));
} else {
for (auto* row : range->second) {
auto& rRow = *row;
Row newRow;
auto& values = newRow.values;
values.reserve(lRow.size() + rRow.size());
values.insert(values.end(),
std::make_move_iterator(lRow.values.begin()),
std::make_move_iterator(lRow.values.end()));
values.insert(values.end(), rRow.values.begin(), rRow.values.end());
ds.rows.emplace_back(std::move(newRow));
for (std::size_t i = 0; i < (range->second.size() - 1); ++i) {
ds.rows.emplace_back(newRow(lRow, *range->second[i]));
}
// Move probe row in last new row creating
ds.rows.emplace_back(newRow(std::move(lRow), *range->second.back()));
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/graph/executor/query/LeftJoinExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ class LeftJoinExecutor : public JoinExecutor {
template <class T>
void buildNewRow(const std::unordered_map<T, std::vector<const Row*>>& hashTable,
const T& val,
const Row& lRow,
Row lRow,
DataSet& ds) const;

// Does the probe result movable?
bool mv_{false};
size_t rightColSize_{0};
};

Expand Down
7 changes: 4 additions & 3 deletions src/graph/executor/query/RollUpApplyExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ DataSet RollUpApplyExecutor::probeZeroKey(Iterator* probeIter, const List& hashT
ds.rows.reserve(probeIter->size());
QueryExpressionContext ctx(ectx_);
for (; probeIter->valid(); probeIter->next()) {
Row row = *probeIter->row();
Row row = mv_ ? probeIter->moveRow() : *probeIter->row();
row.emplace_back(std::move(hashTable));
ds.rows.emplace_back(std::move(row));
}
Expand All @@ -104,7 +104,7 @@ DataSet RollUpApplyExecutor::probeSingleKey(Expression* probeKey,
if (found != hashTable.end()) {
vals = found->second;
}
Row row = *probeIter->row();
Row row = mv_ ? probeIter->moveRow() : *probeIter->row();
row.emplace_back(std::move(vals));
ds.rows.emplace_back(std::move(row));
}
Expand All @@ -130,7 +130,7 @@ DataSet RollUpApplyExecutor::probe(std::vector<Expression*> probeKeys,
if (found != hashTable.end()) {
vals = found->second;
}
Row row = *probeIter->row();
Row row = mv_ ? probeIter->moveRow() : *probeIter->row();
row.emplace_back(std::move(vals));
ds.rows.emplace_back(std::move(row));
}
Expand All @@ -141,6 +141,7 @@ folly::Future<Status> RollUpApplyExecutor::rollUpApply() {
auto* rollUpApplyNode = asNode<RollUpApply>(node());
NG_RETURN_IF_ERROR(checkBiInputDataSets());
DataSet result;
mv_ = movable(node()->inputVars()[0]);
if (rollUpApplyNode->compareCols().size() == 0) {
List hashTable;
buildZeroKeyHashTable(rollUpApplyNode->collectCol(), rhsIter_.get(), hashTable);
Expand Down
2 changes: 2 additions & 0 deletions src/graph/executor/query/RollUpApplyExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class RollUpApplyExecutor : public Executor {
std::unique_ptr<Iterator> lhsIter_;
std::unique_ptr<Iterator> rhsIter_;
size_t colSize_{0};
// Does the probe result movable?
bool mv_{false};
};

} // namespace graph
Expand Down