Skip to content

Commit

Permalink
fix allpath memory tracker (#5640)
Browse files Browse the repository at this point in the history
fix allpath memory traker
  • Loading branch information
nevermore3 committed Jul 17, 2023
1 parent 5ffd73e commit 8765f55
Showing 1 changed file with 63 additions and 19 deletions.
82 changes: 63 additions & 19 deletions src/graph/executor/algo/AllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,18 +224,37 @@ folly::Future<Status> AllPathsExecutor::buildResult() {

folly::Future<Status> AllPathsExecutor::buildPathMultiJobs() {
std::vector<folly::Future<std::vector<NPath*>>> futures;
futures.emplace_back(
folly::via(runner(), [this]() { return doBuildPath(1, 0, 0, nullptr, false); }));
futures.emplace_back(
folly::via(runner(), [this]() { return doBuildPath(1, 0, 0, nullptr, true); }));
auto leftFuture = folly::via(runner(), [this]() { return doBuildPath(1, 0, 0, nullptr, false); })
.thenError(folly::tag_t<std::bad_alloc>{},
[this](const std::bad_alloc&) {
memoryExceeded_ = true;
return std::vector<NPath*>();
})
.thenError(folly::tag_t<std::exception>{},
[](const std::exception&) { return std::vector<NPath*>(); });
auto rightFuture = folly::via(runner(), [this]() { return doBuildPath(1, 0, 0, nullptr, true); })
.thenError(folly::tag_t<std::bad_alloc>{},
[this](const std::bad_alloc&) {
memoryExceeded_ = true;
return std::vector<NPath*>();
})
.thenError(folly::tag_t<std::exception>{},
[](const std::exception&) { return std::vector<NPath*>(); });
futures.emplace_back(std::move(leftFuture));
futures.emplace_back(std::move(rightFuture));

time::Duration conjunctPathTime;
return folly::collect(futures)
return folly::collectAll(futures)
.via(runner())
.thenValue([this](std::vector<std::vector<NPath*>>&& paths) {
.thenValue([this](std::vector<folly::Try<std::vector<NPath*>>>&& paths) {
if (memoryExceeded_.load(std::memory_order_acquire) == true) {
return folly::makeFuture<Status>(Executor::memoryExceededStatus());
}
memory::MemoryCheckGuard guard;
auto& leftPaths = paths.front();
auto& rightPaths = paths.back();
auto& leftPathsValues = paths.front();
auto& rightPathsValues = paths.back();
auto leftPaths = std::move(leftPathsValues).value();
auto rightPaths = std::move(rightPathsValues).value();

if (leftSteps_ == 0) {
buildOneWayPath(rightPaths, false);
Expand All @@ -261,6 +280,7 @@ folly::Future<std::vector<AllPathsExecutor::NPath*>> AllPathsExecutor::doBuildPa
size_t end,
std::shared_ptr<std::vector<NPath*>> pathsPtr,
bool reverse) {
memory::MemoryCheckGuard guard;
auto maxStep = reverse ? rightSteps_ : leftSteps_;
if (step > maxStep) {
return folly::makeFuture<std::vector<NPath*>>(std::vector<NPath*>());
Expand All @@ -274,6 +294,9 @@ folly::Future<std::vector<AllPathsExecutor::NPath*>> AllPathsExecutor::doBuildPa
if (step == 1) {
auto& initVids = reverse ? rightInitVids_ : leftInitVids_;
for (auto& vid : initVids) {
if (memoryExceeded_.load(std::memory_order_acquire) == true) {
return folly::makeFuture<std::vector<NPath*>>(std::vector<NPath*>());
}
auto vidIter = adjList.find(vid);
if (vidIter == adjList.end()) {
continue;
Expand All @@ -290,6 +313,9 @@ folly::Future<std::vector<AllPathsExecutor::NPath*>> AllPathsExecutor::doBuildPa
}
} else {
for (auto i = start; i < end; ++i) {
if (memoryExceeded_.load(std::memory_order_acquire) == true) {
return folly::makeFuture<std::vector<NPath*>>(std::vector<NPath*>());
}
auto path = (*pathsPtr)[i];
auto& edgeValue = path->edge;
DCHECK(edgeValue.isEdge());
Expand All @@ -316,28 +342,46 @@ folly::Future<std::vector<AllPathsExecutor::NPath*>> AllPathsExecutor::doBuildPa
}

auto newPathsSize = newPathsPtr->size();
if (newPathsSize == 0) {
if (newPathsSize == 0 || memoryExceeded_.load(std::memory_order_acquire) == true) {
return folly::makeFuture<std::vector<NPath*>>(std::vector<NPath*>());
}
std::vector<folly::Future<std::vector<NPath*>>> futures;
if (newPathsSize < FLAGS_path_batch_size) {
futures.emplace_back(folly::via(runner(), [this, step, newPathsSize, newPathsPtr, reverse]() {
return doBuildPath(step + 1, 0, newPathsSize, newPathsPtr, reverse);
}));
auto future = folly::via(runner(),
[this, step, newPathsSize, newPathsPtr, reverse]() {
return doBuildPath(step + 1, 0, newPathsSize, newPathsPtr, reverse);
})
.thenError(folly::tag_t<std::bad_alloc>{},
[this](const std::bad_alloc&) {
memoryExceeded_ = true;
return std::vector<NPath*>();
})
.thenError(folly::tag_t<std::exception>{},
[](const std::exception&) { return std::vector<NPath*>(); });
futures.emplace_back(std::move(future));
} else {
for (size_t _start = 0; _start < newPathsSize; _start += FLAGS_path_batch_size) {
auto tmp = _start + FLAGS_path_batch_size;
auto _end = tmp > newPathsSize ? newPathsSize : tmp;
futures.emplace_back(folly::via(runner(), [this, step, _start, _end, newPathsPtr, reverse]() {
return doBuildPath(step + 1, _start, _end, newPathsPtr, reverse);
}));
auto future = folly::via(runner(),
[this, step, _start, _end, newPathsPtr, reverse]() {
return doBuildPath(step + 1, _start, _end, newPathsPtr, reverse);
})
.thenError(folly::tag_t<std::bad_alloc>{},
[this](const std::bad_alloc&) {
memoryExceeded_ = true;
return std::vector<NPath*>();
})
.thenError(folly::tag_t<std::exception>{},
[](const std::exception&) { return std::vector<NPath*>(); });
futures.emplace_back(std::move(future));
}
}
return folly::collect(futures).via(runner()).thenValue(
[currentStepResult = newPathsPtr](std::vector<std::vector<NPath*>>&& paths) {
memory::MemoryCheckGuard guard;
return folly::collectAll(futures).via(runner()).thenValue(
[currentStepResult = newPathsPtr](std::vector<folly::Try<std::vector<NPath*>>>&& paths) {
std::vector<NPath*> result = std::move(*currentStepResult);
for (auto& path : paths) {
for (auto& pathValue : paths) {
auto path = std::move(pathValue).value();
if (path.empty()) {
continue;
}
Expand Down

0 comments on commit 8765f55

Please sign in to comment.