diff --git a/src/graph/executor/algo/AllPathsExecutor.cpp b/src/graph/executor/algo/AllPathsExecutor.cpp index eca75e09c97..0cf34444d80 100644 --- a/src/graph/executor/algo/AllPathsExecutor.cpp +++ b/src/graph/executor/algo/AllPathsExecutor.cpp @@ -224,18 +224,37 @@ folly::Future AllPathsExecutor::buildResult() { folly::Future AllPathsExecutor::buildPathMultiJobs() { std::vector>> futures; - futures.emplace_back( - folly::via(runner(), [this]() { return doBuildPath(1, 0, 0, nullptr, false); })); - futures.emplace_back( - folly::via(runner(), [this]() { return doBuildPath(1, 0, 0, nullptr, true); })); + auto leftFuture = folly::via(runner(), [this]() { return doBuildPath(1, 0, 0, nullptr, false); }) + .thenError(folly::tag_t{}, + [this](const std::bad_alloc&) { + memoryExceeded_ = true; + return std::vector(); + }) + .thenError(folly::tag_t{}, + [](const std::exception&) { return std::vector(); }); + auto rightFuture = folly::via(runner(), [this]() { return doBuildPath(1, 0, 0, nullptr, true); }) + .thenError(folly::tag_t{}, + [this](const std::bad_alloc&) { + memoryExceeded_ = true; + return std::vector(); + }) + .thenError(folly::tag_t{}, + [](const std::exception&) { return std::vector(); }); + futures.emplace_back(std::move(leftFuture)); + futures.emplace_back(std::move(rightFuture)); time::Duration conjunctPathTime; - return folly::collect(futures) + return folly::collectAll(futures) .via(runner()) - .thenValue([this](std::vector>&& paths) { + .thenValue([this](std::vector>>&& paths) { + if (memoryExceeded_.load(std::memory_order_acquire) == true) { + return folly::makeFuture(Executor::memoryExceededStatus()); + } memory::MemoryCheckGuard guard; - auto& leftPaths = paths.front(); - auto& rightPaths = paths.back(); + auto& leftPathsValues = paths.front(); + auto& rightPathsValues = paths.back(); + auto leftPaths = std::move(leftPathsValues).value(); + auto rightPaths = std::move(rightPathsValues).value(); if (leftSteps_ == 0) { buildOneWayPath(rightPaths, false); @@ -261,6 +280,7 @@ folly::Future> AllPathsExecutor::doBuildPa size_t end, std::shared_ptr> pathsPtr, bool reverse) { + memory::MemoryCheckGuard guard; auto maxStep = reverse ? rightSteps_ : leftSteps_; if (step > maxStep) { return folly::makeFuture>(std::vector()); @@ -274,6 +294,9 @@ folly::Future> AllPathsExecutor::doBuildPa if (step == 1) { auto& initVids = reverse ? rightInitVids_ : leftInitVids_; for (auto& vid : initVids) { + if (memoryExceeded_.load(std::memory_order_acquire) == true) { + return folly::makeFuture>(std::vector()); + } auto vidIter = adjList.find(vid); if (vidIter == adjList.end()) { continue; @@ -290,6 +313,9 @@ folly::Future> AllPathsExecutor::doBuildPa } } else { for (auto i = start; i < end; ++i) { + if (memoryExceeded_.load(std::memory_order_acquire) == true) { + return folly::makeFuture>(std::vector()); + } auto path = (*pathsPtr)[i]; auto& edgeValue = path->edge; DCHECK(edgeValue.isEdge()); @@ -316,28 +342,46 @@ folly::Future> AllPathsExecutor::doBuildPa } auto newPathsSize = newPathsPtr->size(); - if (newPathsSize == 0) { + if (newPathsSize == 0 || memoryExceeded_.load(std::memory_order_acquire) == true) { return folly::makeFuture>(std::vector()); } std::vector>> futures; if (newPathsSize < FLAGS_path_batch_size) { - futures.emplace_back(folly::via(runner(), [this, step, newPathsSize, newPathsPtr, reverse]() { - return doBuildPath(step + 1, 0, newPathsSize, newPathsPtr, reverse); - })); + auto future = folly::via(runner(), + [this, step, newPathsSize, newPathsPtr, reverse]() { + return doBuildPath(step + 1, 0, newPathsSize, newPathsPtr, reverse); + }) + .thenError(folly::tag_t{}, + [this](const std::bad_alloc&) { + memoryExceeded_ = true; + return std::vector(); + }) + .thenError(folly::tag_t{}, + [](const std::exception&) { return std::vector(); }); + futures.emplace_back(std::move(future)); } else { for (size_t _start = 0; _start < newPathsSize; _start += FLAGS_path_batch_size) { auto tmp = _start + FLAGS_path_batch_size; auto _end = tmp > newPathsSize ? newPathsSize : tmp; - futures.emplace_back(folly::via(runner(), [this, step, _start, _end, newPathsPtr, reverse]() { - return doBuildPath(step + 1, _start, _end, newPathsPtr, reverse); - })); + auto future = folly::via(runner(), + [this, step, _start, _end, newPathsPtr, reverse]() { + return doBuildPath(step + 1, _start, _end, newPathsPtr, reverse); + }) + .thenError(folly::tag_t{}, + [this](const std::bad_alloc&) { + memoryExceeded_ = true; + return std::vector(); + }) + .thenError(folly::tag_t{}, + [](const std::exception&) { return std::vector(); }); + futures.emplace_back(std::move(future)); } } - return folly::collect(futures).via(runner()).thenValue( - [currentStepResult = newPathsPtr](std::vector>&& paths) { - memory::MemoryCheckGuard guard; + return folly::collectAll(futures).via(runner()).thenValue( + [currentStepResult = newPathsPtr](std::vector>>&& paths) { std::vector result = std::move(*currentStepResult); - for (auto& path : paths) { + for (auto& pathValue : paths) { + auto path = std::move(pathValue).value(); if (path.empty()) { continue; }