diff --git a/src/graph/executor/Executor.h b/src/graph/executor/Executor.h index 73848e0eef6..236390d1b0a 100644 --- a/src/graph/executor/Executor.h +++ b/src/graph/executor/Executor.h @@ -195,7 +195,7 @@ auto Executor::runMultiJobs(ScatterFunc &&scatter, GatherFunc &&gather, Iterator } // Gather all results and do post works - return folly::collect(futures).via(runner()).thenValue(std::move(gather)); + return folly::collectAll(futures).via(runner()).thenValue(std::move(gather)); } } // namespace graph } // namespace nebula diff --git a/src/graph/executor/query/AppendVerticesExecutor.cpp b/src/graph/executor/query/AppendVerticesExecutor.cpp index f709c14bf37..a8b8ac81d43 100644 --- a/src/graph/executor/query/AppendVerticesExecutor.cpp +++ b/src/graph/executor/query/AppendVerticesExecutor.cpp @@ -185,11 +185,19 @@ folly::Future AppendVerticesExecutor::handleRespMultiJobs( return buildVerticesResult(begin, end, tmpIter); }; - auto gather = [this](auto &&results) -> Status { + auto gather = [this](std::vector>> &&results) -> Status { memory::MemoryCheckGuard guard; - for (auto &r : results) { - NG_RETURN_IF_ERROR(r); - auto &&rows = std::move(r).value(); + for (auto &respVal : results) { + if (respVal.hasException()) { + auto ex = respVal.exception().get_exception(); + if (ex) { + throw std::bad_alloc(); + } else { + throw std::runtime_error(respVal.exception().what().c_str()); + } + } + auto res = std::move(respVal).value(); + auto &&rows = std::move(res).value(); std::move(rows.begin(), rows.end(), std::back_inserter(result_.rows)); } return finish(ResultBuilder().value(Value(std::move(result_))).build()); @@ -203,20 +211,38 @@ folly::Future AppendVerticesExecutor::handleRespMultiJobs( }; auto gather = - [this, inputIterNew = std::move(inputIter)](auto &&prepareResult) -> folly::Future { + [this, inputIterNew = std::move(inputIter)]( + std::vector> &&prepareResult) -> folly::Future { memory::MemoryCheckGuard guard1; - UNUSED(prepareResult); + for (auto &respVal : prepareResult) { + if (respVal.hasException()) { + auto ex = respVal.exception().get_exception(); + if (ex) { + throw std::bad_alloc(); + } else { + throw std::runtime_error(respVal.exception().what().c_str()); + } + } + } auto scatterInput = [this](size_t begin, size_t end, Iterator *tmpIter) mutable -> StatusOr { return handleJob(begin, end, tmpIter); }; - auto gatherFinal = [this](auto &&results) -> Status { + auto gatherFinal = [this](std::vector>> &&results) -> Status { memory::MemoryCheckGuard guard2; - for (auto &r : results) { - NG_RETURN_IF_ERROR(r); - auto &&rows = std::move(r).value(); + for (auto &respVal : results) { + if (respVal.hasException()) { + auto ex = respVal.exception().get_exception(); + if (ex) { + throw std::bad_alloc(); + } else { + throw std::runtime_error(respVal.exception().what().c_str()); + } + } + auto res = std::move(respVal).value(); + auto &&rows = std::move(res).value(); std::move(rows.begin(), rows.end(), std::back_inserter(result_.rows)); } return finish(ResultBuilder().value(Value(std::move(result_))).build()); diff --git a/src/graph/executor/query/FilterExecutor.cpp b/src/graph/executor/query/FilterExecutor.cpp index 1b7a5f39fad..75976d9ee5a 100644 --- a/src/graph/executor/query/FilterExecutor.cpp +++ b/src/graph/executor/query/FilterExecutor.cpp @@ -34,15 +34,21 @@ folly::Future FilterExecutor::execute() { return handleJob(begin, end, tmpIter); }; - auto gather = - [this, result = std::move(ds), kind = iter->kind()](auto &&results) mutable -> Status { + auto gather = [this, result = std::move(ds), kind = iter->kind()]( + std::vector>> &&results) mutable -> Status { // MemoryTrackerVerified memory::MemoryCheckGuard guard; - for (auto &r : results) { - if (!r.ok()) { - return r.status(); + for (auto &respVal : results) { + if (respVal.hasException()) { + auto ex = respVal.exception().get_exception(); + if (ex) { + throw std::bad_alloc(); + } else { + throw std::runtime_error(respVal.exception().what().c_str()); + } } - auto &&rows = std::move(r).value(); + auto res = std::move(respVal).value(); + auto &&rows = std::move(res).value(); result.rows.insert(result.rows.end(), std::make_move_iterator(rows.begin()), std::make_move_iterator(rows.end())); diff --git a/src/graph/executor/query/InnerJoinExecutor.cpp b/src/graph/executor/query/InnerJoinExecutor.cpp index 24e8cc13e9f..50e3639b7dc 100644 --- a/src/graph/executor/query/InnerJoinExecutor.cpp +++ b/src/graph/executor/query/InnerJoinExecutor.cpp @@ -172,13 +172,22 @@ folly::Future InnerJoinExecutor::probe(const std::vector& p return ds; }; - auto gather = [this](auto&& results) mutable -> Status { + auto gather = [this](std::vector>>&& results) mutable -> Status { memory::MemoryCheckGuard guard; DataSet result; auto* joinNode = asNode(node()); result.colNames = joinNode->colNames(); - for (auto& r : results) { - auto&& rows = std::move(r).value(); + for (auto& respVal : results) { + if (respVal.hasException()) { + auto ex = respVal.exception().get_exception(); + if (ex) { + throw std::bad_alloc(); + } else { + throw std::runtime_error(respVal.exception().what().c_str()); + } + } + auto res = std::move(respVal).value(); + auto&& rows = std::move(res).value(); result.rows.insert(result.rows.end(), std::make_move_iterator(rows.begin()), std::make_move_iterator(rows.end())); @@ -204,13 +213,22 @@ folly::Future InnerJoinExecutor::singleKeyProbe(Expression* probeKey, It return ds; }; - auto gather = [this](auto&& results) mutable -> Status { + auto gather = [this](std::vector>>&& results) mutable -> Status { memory::MemoryCheckGuard guard; DataSet result; auto* joinNode = asNode(node()); result.colNames = joinNode->colNames(); - for (auto& r : results) { - auto&& rows = std::move(r).value(); + for (auto& respVal : results) { + if (respVal.hasException()) { + auto ex = respVal.exception().get_exception(); + if (ex) { + throw std::bad_alloc(); + } else { + throw std::runtime_error(respVal.exception().what().c_str()); + } + } + auto res = std::move(respVal).value(); + auto&& rows = std::move(res).value(); result.rows.insert(result.rows.end(), std::make_move_iterator(rows.begin()), std::make_move_iterator(rows.end())); diff --git a/src/graph/executor/query/LeftJoinExecutor.cpp b/src/graph/executor/query/LeftJoinExecutor.cpp index d2b12131b1c..cca1b9ea931 100644 --- a/src/graph/executor/query/LeftJoinExecutor.cpp +++ b/src/graph/executor/query/LeftJoinExecutor.cpp @@ -149,13 +149,22 @@ folly::Future LeftJoinExecutor::probe(const std::vector& pr return ds; }; - auto gather = [this](auto&& results) mutable -> Status { + auto gather = [this](std::vector>>&& results) mutable -> Status { memory::MemoryCheckGuard guard; DataSet result; auto* joinNode = asNode(node()); result.colNames = joinNode->colNames(); - for (auto& r : results) { - auto&& rows = std::move(r).value(); + for (auto& respVal : results) { + if (respVal.hasException()) { + auto ex = respVal.exception().get_exception(); + if (ex) { + throw std::bad_alloc(); + } else { + throw std::runtime_error(respVal.exception().what().c_str()); + } + } + auto res = std::move(respVal).value(); + auto&& rows = std::move(res).value(); result.rows.insert(result.rows.end(), std::make_move_iterator(rows.begin()), std::make_move_iterator(rows.end())); @@ -180,13 +189,22 @@ folly::Future LeftJoinExecutor::singleKeyProbe(Expression* probeKey, Ite return ds; }; - auto gather = [this](auto&& results) mutable -> Status { + auto gather = [this](std::vector>>&& results) mutable -> Status { memory::MemoryCheckGuard guard; DataSet result; auto* joinNode = asNode(node()); result.colNames = joinNode->colNames(); - for (auto& r : results) { - auto&& rows = std::move(r).value(); + for (auto& respVal : results) { + if (respVal.hasException()) { + auto ex = respVal.exception().get_exception(); + if (ex) { + throw std::bad_alloc(); + } else { + throw std::runtime_error(respVal.exception().what().c_str()); + } + } + auto res = std::move(respVal).value(); + auto&& rows = std::move(res).value(); result.rows.insert(result.rows.end(), std::make_move_iterator(rows.begin()), std::make_move_iterator(rows.end())); diff --git a/src/graph/executor/query/ProjectExecutor.cpp b/src/graph/executor/query/ProjectExecutor.cpp index 1fd85a08e64..464b8c4fcf4 100644 --- a/src/graph/executor/query/ProjectExecutor.cpp +++ b/src/graph/executor/query/ProjectExecutor.cpp @@ -29,10 +29,20 @@ folly::Future ProjectExecutor::execute() { return handleJob(begin, end, tmpIter); }; - auto gather = [this, result = std::move(ds)](auto &&results) mutable { + auto gather = [this, result = std::move(ds)]( + std::vector>> &&results) mutable { memory::MemoryCheckGuard guard; - for (auto &r : results) { - auto &&rows = std::move(r).value(); + for (auto &respVal : results) { + if (respVal.hasException()) { + auto ex = respVal.exception().get_exception(); + if (ex) { + throw std::bad_alloc(); + } else { + throw std::runtime_error(respVal.exception().what().c_str()); + } + } + auto res = std::move(respVal).value(); + auto &&rows = std::move(res).value(); result.rows.insert(result.rows.end(), std::make_move_iterator(rows.begin()), std::make_move_iterator(rows.end())); diff --git a/src/graph/executor/query/TraverseExecutor.cpp b/src/graph/executor/query/TraverseExecutor.cpp index 218e41e54de..0ced0963f9d 100644 --- a/src/graph/executor/query/TraverseExecutor.cpp +++ b/src/graph/executor/query/TraverseExecutor.cpp @@ -456,10 +456,19 @@ folly::Future TraverseExecutor::buildPathMultiJobs(size_t minStep, size_ return rows; }; - auto gather = [this](std::vector> resp) mutable -> Status { + auto gather = [this](std::vector>>&& resps) mutable -> Status { // MemoryTrackerVerified memory::MemoryCheckGuard guard; - for (auto& rows : resp) { + for (auto& respVal : resps) { + if (respVal.hasException()) { + auto ex = respVal.exception().get_exception(); + if (ex) { + throw std::bad_alloc(); + } else { + throw std::runtime_error(respVal.exception().what().c_str()); + } + } + auto rows = std::move(respVal).value(); if (rows.empty()) { continue; }