Skip to content

Commit

Permalink
Fix the promise/future. (vesoft-inc#3814)
Browse files Browse the repository at this point in the history
Co-authored-by: jimingquan <mingquan.ji@vesoft.com>
  • Loading branch information
2 people authored and liwenhui-soul committed Jan 27, 2022
1 parent 218ad2b commit 2f535f8
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 16 deletions.
24 changes: 11 additions & 13 deletions src/graph/executor/query/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@ folly::Future<Status> TraverseExecutor::traverse() {
DataSet emptyResult;
return finish(ResultBuilder().value(Value(std::move(emptyResult))).build());
}
getNeighbors();
return promise_.getFuture();
return getNeighbors();
}

void TraverseExecutor::getNeighbors() {
folly::Future<Status> TraverseExecutor::getNeighbors() {
currentStep_++;
time::Duration getNbrTime;
StorageClient* storageClient = qctx_->getStorageClient();
Expand All @@ -93,7 +92,7 @@ void TraverseExecutor::getNeighbors() {
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
qctx()->plan()->isProfileEnabled());
storageClient
return storageClient
->getNeighbors(param,
reqDs_.colNames,
std::move(reqDs_.rows),
Expand All @@ -112,7 +111,7 @@ void TraverseExecutor::getNeighbors() {
.thenValue([this, getNbrTime](StorageRpcResponse<GetNeighborsResponse>&& resp) mutable {
SCOPED_TIMER(&execTime_);
addStats(resp, getNbrTime.elapsedInUSec());
handleResponse(resp);
return handleResponse(std::move(resp));
});
}

Expand Down Expand Up @@ -140,11 +139,11 @@ void TraverseExecutor::addStats(RpcResponse& resp, int64_t getNbrTimeInUSec) {
otherStats_.emplace(folly::sformat("step {}", currentStep_), ss.str());
}

void TraverseExecutor::handleResponse(RpcResponse& resps) {
folly::Future<Status> TraverseExecutor::handleResponse(RpcResponse&& resps) {
SCOPED_TIMER(&execTime_);
auto result = handleCompleteness(resps, FLAGS_accept_partial_success);
if (!result.ok()) {
promise_.setValue(std::move(result).status());
return folly::makeFuture<Status>(std::move(result).status());
}

auto& responses = resps.responses();
Expand All @@ -162,21 +161,20 @@ void TraverseExecutor::handleResponse(RpcResponse& resps) {

auto status = buildInterimPath(iter.get());
if (!status.ok()) {
promise_.setValue(status);
return;
return folly::makeFuture<Status>(std::move(status));
}
if (!isFinalStep()) {
if (reqDs_.rows.empty()) {
if (range_ != nullptr) {
promise_.setValue(buildResult());
return folly::makeFuture<Status>(buildResult());
} else {
promise_.setValue(Status::OK());
return folly::makeFuture<Status>(Status::OK());
}
} else {
getNeighbors();
return getNeighbors();
}
} else {
promise_.setValue(buildResult());
return folly::makeFuture<Status>(buildResult());
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/graph/executor/query/TraverseExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ class TraverseExecutor final : public StorageAccessExecutor {

void addStats(RpcResponse& resps, int64_t getNbrTimeInUSec);

void getNeighbors();
folly::Future<Status> getNeighbors();

void handleResponse(RpcResponse& resps);
folly::Future<Status> handleResponse(RpcResponse&& resps);

Status buildInterimPath(GetNeighborsIter* iter);

Expand Down Expand Up @@ -74,7 +74,6 @@ class TraverseExecutor final : public StorageAccessExecutor {
private:
DataSet reqDs_;
const Traverse* traverse_{nullptr};
folly::Promise<Status> promise_;
MatchStepRange* range_{nullptr};
size_t currentStep_{0};
std::list<std::unordered_map<Value, Paths>> paths_;
Expand Down

0 comments on commit 2f535f8

Please sign in to comment.