Skip to content

Commit

Permalink
optimize for k-hop
Browse files Browse the repository at this point in the history
  • Loading branch information
jievince committed Aug 18, 2022
1 parent af1e1e8 commit a693a31
Show file tree
Hide file tree
Showing 19 changed files with 542 additions and 40 deletions.
34 changes: 34 additions & 0 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

using nebula::cpp2::PropertyType;
using nebula::storage::cpp2::ExecResponse;
using nebula::storage::cpp2::GetDstBySrcResponse;
using nebula::storage::cpp2::GetNeighborsResponse;
using nebula::storage::cpp2::GetPropResponse;

Expand Down Expand Up @@ -107,6 +108,39 @@ StorageRpcRespFuture<cpp2::GetNeighborsResponse> StorageClient::getNeighbors(
});
}

StorageRpcRespFuture<cpp2::GetDstBySrcResponse> StorageClient::getDstBySrc(
const CommonRequestParam& param, const List& vertices, const std::vector<EdgeType>& edgeTypes) {
auto cbStatus = getIdFromValue(param.space);
if (!cbStatus.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::GetDstBySrcResponse>>(
std::runtime_error(cbStatus.status().toString()));
}

auto status = clusterIdsToHosts(param.space, vertices.values, std::move(cbStatus).value());
if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::GetDstBySrcResponse>>(
std::runtime_error(status.status().toString()));
}

auto& clusters = status.value();
auto common = param.toReqCommon();
std::unordered_map<HostAddr, cpp2::GetDstBySrcRequest> requests;
for (auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
req.space_id_ref() = param.space;
req.parts_ref() = std::move(c.second);
req.edge_types_ref() = edgeTypes;
req.common_ref() = common;
}

return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::GetDstBySrcRequest& r) {
return client->future_getDstBySrc(r);
});
}

StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addVertices(
const CommonRequestParam& param,
std::vector<cpp2::NewVertex> vertices,
Expand Down
5 changes: 5 additions & 0 deletions src/clients/storage/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ class StorageClient
int64_t limit = std::numeric_limits<int64_t>::max(),
const Expression* filter = nullptr);

StorageRpcRespFuture<cpp2::GetDstBySrcResponse> getDstBySrc(
const CommonRequestParam& param,
const List& vertices,
const std::vector<EdgeType>& edgeTypes);

StorageRpcRespFuture<cpp2::GetPropResponse> getProps(
const CommonRequestParam& param,
const DataSet& input,
Expand Down
3 changes: 3 additions & 0 deletions src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ struct GoContext final : AstContext {
bool joinInput{false};
// true when $$.tag.prop exist
bool joinDst{false};
// Optimize for some simple go sentence which only need dst id.
// eg. GO 1 TO N STEPS FROM "A" OVER like YIELD DISTINCT like._dst
bool isSimple{false};

ExpressionProps exprProps;

Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ nebula_add_library(
query/TraverseExecutor.cpp
query/AppendVerticesExecutor.cpp
query/RollUpApplyExecutor.cpp
query/GetDstBySrcExecutor.cpp
algo/BFSShortestPathExecutor.cpp
algo/MultiShortestPathExecutor.cpp
algo/ProduceAllPathsExecutor.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
#include "graph/executor/query/DataCollectExecutor.h"
#include "graph/executor/query/DedupExecutor.h"
#include "graph/executor/query/FilterExecutor.h"
#include "graph/executor/query/GetDstBySrcExecutor.h"
#include "graph/executor/query/GetEdgesExecutor.h"
#include "graph/executor/query/GetNeighborsExecutor.h"
#include "graph/executor/query/GetVerticesExecutor.h"
Expand Down Expand Up @@ -550,6 +551,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kShortestPath: {
return pool->makeAndAdd<ShortestPathExecutor>(node, qctx);
}
case PlanNode::Kind::kGetDstBySrc: {
return pool->makeAndAdd<GetDstBySrcExecutor>(node, qctx);
}
case PlanNode::Kind::kUnknown: {
LOG(FATAL) << "Unknown plan node kind " << static_cast<int32_t>(node->kind());
break;
Expand Down
48 changes: 48 additions & 0 deletions src/graph/executor/StorageAccessExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,42 @@ StatusOr<DataSet> buildRequestDataSet(const SpaceInfo &space,
return vertices;
}

template <typename VidType>
StatusOr<List> buildRequestList(const SpaceInfo &space,
QueryExpressionContext &exprCtx,
Iterator *iter,
Expression *expr,
bool dedup) {
DCHECK(iter && expr) << "iter=" << iter << ", expr=" << expr;
nebula::List vertices;
auto s = iter->size();
vertices.reserve(s);

std::unordered_set<VidType> uniqueSet;
uniqueSet.reserve(s);

const auto &vidType = *(space.spaceDesc.vid_type_ref());

for (; iter->valid(); iter->next()) {
auto vid = expr->eval(exprCtx(iter));
if (vid.empty()) {
continue;
}
if (!SchemaUtil::isValidVid(vid, vidType)) {
std::stringstream ss;
ss << "`" << vid.toString() << "', the srcs should be type of "
<< apache::thrift::util::enumNameSafe(vidType.get_type()) << ", but was`" << vid.type()
<< "'";
return Status::Error(ss.str());
}
if (dedup && !uniqueSet.emplace(Vid<VidType>::value(vid)).second) {
continue;
}
vertices.emplace_back(std::move(vid));
}
return vertices;
}

} // namespace internal

bool StorageAccessExecutor::isIntVidType(const SpaceInfo &space) const {
Expand All @@ -95,6 +131,18 @@ StatusOr<DataSet> StorageAccessExecutor::buildRequestDataSetByVidType(Iterator *
return internal::buildRequestDataSet<std::string>(space, exprCtx, iter, expr, dedup, isCypher);
}

StatusOr<List> StorageAccessExecutor::buildRequestListByVidType(Iterator *iter,
Expression *expr,
bool dedup) {
const auto &space = qctx()->rctx()->session()->space();
QueryExpressionContext exprCtx(qctx()->ectx());

if (isIntVidType(space)) {
return internal::buildRequestList<int64_t>(space, exprCtx, iter, expr, dedup);
}
return internal::buildRequestList<std::string>(space, exprCtx, iter, expr, dedup);
}

std::string StorageAccessExecutor::getStorageDetail(
optional_field_ref<const std::map<std::string, int32_t> &> ref) const {
if (ref.has_value()) {
Expand Down
2 changes: 2 additions & 0 deletions src/graph/executor/StorageAccessExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ class StorageAccessExecutor : public Executor {
Expression *expr,
bool dedup,
bool isCypher = false);

StatusOr<List> buildRequestListByVidType(Iterator *iter, Expression *expr, bool dedup);
};

} // namespace graph
Expand Down
92 changes: 92 additions & 0 deletions src/graph/executor/query/GetDstBySrcExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) 2020 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#include "graph/executor/query/GetDstBySrcExecutor.h"

#include "graph/service/GraphFlags.h"

using nebula::storage::StorageClient;
using nebula::storage::StorageRpcResponse;
using nebula::storage::cpp2::GetDstBySrcResponse;

namespace nebula {
namespace graph {

StatusOr<List> GetDstBySrcExecutor::buildRequestList() {
SCOPED_TIMER(&execTime_);
auto inputVar = gd_->inputVar();
auto iter = ectx_->getResult(inputVar).iter();
return buildRequestListByVidType(iter.get(), gd_->src(), gd_->dedup());
}

folly::Future<Status> GetDstBySrcExecutor::execute() {
auto res = buildRequestList();
NG_RETURN_IF_ERROR(res);
auto reqList = std::move(res).value();
if (reqList.empty()) {
DataSet emptyResult;
return finish(ResultBuilder()
.value(Value(std::move(emptyResult)))
.iter(Iterator::Kind::kSequential)
.build());
}

time::Duration getDstTime;
StorageClient* storageClient = qctx_->getStorageClient();
QueryExpressionContext qec(qctx()->ectx());
StorageClient::CommonRequestParam param(gd_->space(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
qctx()->plan()->isProfileEnabled());
return storageClient->getDstBySrc(param, std::move(reqList), gd_->edgeTypes())
.via(runner())
.ensure([this, getDstTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getDstTime.elapsedInUSec()));
})
.thenValue([this](StorageRpcResponse<GetDstBySrcResponse>&& resp) {
SCOPED_TIMER(&execTime_);
auto& hostLatency = resp.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
size_t size = 0u;
auto& result = resp.responses()[i];
if (result.dsts_ref().has_value()) {
size = (*result.dsts_ref()).size();
}
auto& info = hostLatency[i];
otherStats_.emplace(
folly::sformat("{} exec/total/vertices", std::get<0>(info).toString()),
folly::sformat("{}(us)/{}(us)/{},", std::get<1>(info), std::get<2>(info), size));
auto detail = getStorageDetail(result.result.latency_detail_us_ref());
if (!detail.empty()) {
otherStats_.emplace("storage_detail", detail);
}
}
return handleResponse(resp, this->gd_->colNames());
});
}

Status GetDstBySrcExecutor::handleResponse(RpcResponse& resps,
const std::vector<std::string>& colNames) {
auto result = handleCompleteness(resps, FLAGS_accept_partial_success);
NG_RETURN_IF_ERROR(result);
ResultBuilder builder;
builder.state(result.value());

auto& responses = resps.responses();
DataSet ds;
for (auto& resp : responses) {
auto* dataset = resp.get_dsts();
if (dataset == nullptr) {
continue;
}
dataset->colNames = colNames;
ds.append(std::move(*dataset));
}
builder.value(Value(std::move(ds))).iter(Iterator::Kind::kSequential);
return finish(builder.build());
}

} // namespace graph
} // namespace nebula
36 changes: 36 additions & 0 deletions src/graph/executor/query/GetDstBySrcExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2020 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#ifndef GRAPH_EXECUTOR_QUERY_GETDSTBYSRCEXECUTOR_H_
#define GRAPH_EXECUTOR_QUERY_GETDSTBYSRCEXECUTOR_H_

#include "graph/executor/StorageAccessExecutor.h"
#include "graph/planner/plan/Query.h"

// get the dst id of the src id
namespace nebula {
namespace graph {
class GetDstBySrcExecutor final : public StorageAccessExecutor {
public:
GetDstBySrcExecutor(const PlanNode* node, QueryContext* qctx)
: StorageAccessExecutor("GetDstBySrcExecutor", node, qctx) {
gd_ = asNode<GetDstBySrc>(node);
}

folly::Future<Status> execute() override;

StatusOr<List> buildRequestList();

private:
using RpcResponse = storage::StorageRpcResponse<storage::cpp2::GetDstBySrcResponse>;
Status handleResponse(RpcResponse& resps, const std::vector<std::string>& colNames);

private:
const GetDstBySrc* gd_;
};

} // namespace graph
} // namespace nebula

#endif // GRAPH_EXECUTOR_QUERY_GETDSTBYSRCEXECUTOR_H_
5 changes: 5 additions & 0 deletions src/graph/executor/test/StorageServerStub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ folly::Future<cpp2::GetNeighborsResponse> GraphStorageLocalServer::future_getNei
LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetNeighborsResponse, future_getNeighbors);
}

folly::Future<cpp2::GetDstBySrcResponse> GraphStorageLocalServer::future_getDstBySrc(
const cpp2::GetDstBySrcRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetDstBySrcResponse, future_getDstBySrc);
}

folly::Future<cpp2::ExecResponse> GraphStorageLocalServer::future_addVertices(
const cpp2::AddVerticesRequest& request) {
LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_addVertices);
Expand Down
Loading

0 comments on commit a693a31

Please sign in to comment.