Skip to content

Commit

Permalink
A basic implementation of vertex-cut fragment to scale-out on large g…
Browse files Browse the repository at this point in the history
…raphs. (#172)
  • Loading branch information
luoxiaojian committed Sep 19, 2024
1 parent 27ea3d2 commit 627416c
Show file tree
Hide file tree
Showing 43 changed files with 2,774 additions and 667 deletions.
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ option(USE_JEMALLOC "Whether to use jemalloc." OFF)
option(USE_HUGEPAGES "Whether to use hugepages." OFF)
option(BUILD_SHARED_LIBS "Whether to build libgrape-lite as shared library" ON)
option(PROFILING "Whether to enable profiling" OFF)
option(TRACKING_MEMORY "Whether to enable memory tracking" OFF)
option(WITH_ASAN "Build with Address Sanitizer" OFF)
option(BUILD_LIBGRAPELITE_DOCS "Build libgrape-lite documentation" ON)
option(BUILD_LIBGRAPELITE_TESTS "Build libgrape-lite test cases" ON)
Expand All @@ -38,6 +39,11 @@ if (PROFILING)
add_definitions(-DPROFILING)
endif ()

if (TRACKING_MEMORY)
message(STATUS "Enable memory tracking")
add_definitions(-DTRACKING_MEMORY)
endif ()

if (WCC_USE_GID)
add_definitions(-DWCC_USE_GID)
endif ()
Expand Down
5 changes: 0 additions & 5 deletions examples/analytical_apps/bc/bc_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ class BCContext : public VertexDataContext<FRAG_T, float> {
LOG(INFO) << "[frag-" << frag.fid()
<< "] BC(0) = " << centrality_value[s];
}
#ifdef PROFILING
VLOG(2) << "preprocess_time: " << preprocess_time << "s.";
VLOG(2) << "exec_time: " << exec_time << "s.";
VLOG(2) << "postprocess_time: " << postprocess_time << "s.";
#endif
}

oid_t source_id;
Expand Down
5 changes: 0 additions & 5 deletions examples/analytical_apps/bfs/bfs_opt_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ class BFSOptContext : public VertexDataContext<FRAG_T, int64_t> {
for (auto v : inner_vertices) {
os << frag.GetId(v) << " " << partial_result[v] << std::endl;
}
#ifdef PROFILING
VLOG(2) << "preprocess_time: " << preprocess_time << "s.";
VLOG(2) << "exec_time: " << exec_time << "s.";
VLOG(2) << "postprocess_time: " << postprocess_time << "s.";
#endif
}

oid_t source_id;
Expand Down
3 changes: 3 additions & 0 deletions examples/analytical_apps/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ DEFINE_string(serialization_prefix, "",
DEFINE_int32(app_concurrency, -1, "concurrency of application");
DEFINE_int32(load_concurrency, 1, "concurrency of loading graph");

DEFINE_bool(vc, false, "whether to use vertex-cut storage.");
DEFINE_bool(single_scan_load, true, "whether to load graph in single scan.");

DEFINE_string(lb, "cta",
"Load balancing policy, these options can be used: "
" none, cta, cm, wm, strict");
3 changes: 3 additions & 0 deletions examples/analytical_apps/flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,8 @@ DECLARE_string(serialization_prefix);
DECLARE_int32(app_concurrency);
DECLARE_int32(load_concurrency);

DECLARE_bool(vc);
DECLARE_bool(single_scan_load);

DECLARE_string(lb);
#endif // EXAMPLES_ANALYTICAL_APPS_FLAGS_H_
219 changes: 219 additions & 0 deletions examples/analytical_apps/pagerank/pagerank_vc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/** Copyright 2020 Alibaba Group Holding Limited.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#ifndef EXAMPLES_ANALYTICAL_APPS_PAGERANK_PAGERANK_VC_H_
#define EXAMPLES_ANALYTICAL_APPS_PAGERANK_PAGERANK_VC_H_

#include <grape/grape.h>

#include "pagerank/pagerank_vc_context.h"

namespace grape {

template <typename T>
struct NumericSum {
static T init() { return 0; }

static void aggregate(T& a, const T& b) { a += b; }
};

template <typename FRAG_T>
class PageRankVC
: public GatherScatterAppBase<FRAG_T, PageRankVCContext<FRAG_T>>,
public ParallelEngine,
public Communicator {
using vertex_t = Vertex<typename FRAG_T::oid_t>;

public:
INSTALL_GATHER_SCATTER_WORKER(PageRankVC<FRAG_T>, PageRankVCContext<FRAG_T>,
FRAG_T)

void PEval(const fragment_t& frag, context_t& ctx,
message_manager_t& messages) {
if (ctx.max_round <= 0) {
return;
}

ctx.step = 0;
ctx.graph_vnum = frag.GetTotalVerticesNum();

{
#ifdef PROFILING
ctx.t0 -= GetCurrentTime();
#endif
typename fragment_t::template both_vertex_array_t<int> degree(
frag.Vertices(), 0);
#ifdef TRACKING_MEMORY
// allocate degree array for both src and dst vertices
MemoryTracker::GetInstance().allocate(frag.Vertices().size() *
sizeof(int));
#endif
int bucket_num = frag.GetBucketNum();
int concurrency = thread_num();
if (bucket_num < (concurrency / 2)) {
ForEach(
frag.GetEdges().begin(), frag.GetEdges().end(),
[&](int tid, const typename fragment_t::edge_t& e) {
atomic_add(degree[vertex_t(e.src)], 1);
atomic_add(degree[vertex_t(e.dst)], 1);
},
4096);
} else {
ForEach(0, bucket_num,
[&degree, &frag, bucket_num](int tid, int bucket_id) {
for (int i = 0; i < bucket_num; ++i) {
for (auto& e : frag.GetEdgesOfBucket(i, bucket_id)) {
degree[vertex_t(e.dst)] += 1;
}
}
for (int i = 0; i < bucket_num; ++i) {
for (auto& e : frag.GetEdgesOfBucket(bucket_id, i)) {
degree[vertex_t(e.src)] += 1;
}
}
});
}
#ifdef PROFILING
ctx.t0 += GetCurrentTime();
#endif

messages.GatherMasterVertices<fragment_t, int, NumericSum<int>>(
frag, degree, ctx.master_degree);
#ifdef TRACKING_MEMORY
// deallocate degree array for both src and dst vertices
MemoryTracker::GetInstance().deallocate(frag.Vertices().size() *
sizeof(int));
#endif
}

double p = 1.0 / ctx.graph_vnum;
int64_t dangling_vnum_local = 0;
#ifdef PROFILING
ctx.t1 -= GetCurrentTime();
#endif
std::vector<int64_t> dangling_vnum_local_vec(thread_num(), 0);
ForEach(frag.MasterVertices(), [&](int tid, vertex_t v) {
if (ctx.master_degree[v] == 0) {
++dangling_vnum_local_vec[tid];
ctx.master_result[v] = p;
} else {
ctx.master_result[v] = p / ctx.master_degree[v];
}
});
for (auto x : dangling_vnum_local_vec) {
dangling_vnum_local += x;
}
#ifdef PROFILING
ctx.t1 += GetCurrentTime();
#endif

Sum(dangling_vnum_local, ctx.total_dangling_vnum);
ctx.dangling_sum = p * ctx.total_dangling_vnum;

messages.ScatterMasterVertices<fragment_t, double>(frag, ctx.master_result,
ctx.curr_result);
messages.ForceContinue();
}

void IncEval(const fragment_t& frag, context_t& ctx,
message_manager_t& messages) {
if (ctx.step == 0) {
messages.AllocateGatherBuffers<fragment_t, double>(frag);
}
++ctx.step;

double base = (1.0 - ctx.delta) / ctx.graph_vnum +
ctx.delta * ctx.dangling_sum / ctx.graph_vnum;
ctx.dangling_sum = base * ctx.total_dangling_vnum;

ForEach(frag.Vertices(),
[&ctx](int tid, vertex_t v) { ctx.next_result[v] = 0; });

int bucket_num = frag.GetBucketNum();
int concurrency = thread_num();

#ifdef PROFILING
ctx.t2 -= GetCurrentTime();
#endif
if (bucket_num < (concurrency / 2)) {
ForEach(
frag.GetEdges().begin(), frag.GetEdges().end(),
[&ctx](int tid, const typename fragment_t::edge_t& e) {
atomic_add(ctx.next_result[vertex_t(e.dst)],
ctx.curr_result[vertex_t(e.src)]);
atomic_add(ctx.next_result[vertex_t(e.src)],
ctx.curr_result[vertex_t(e.dst)]);
},
4096);
} else {
ForEach(0, bucket_num, [&ctx, &frag, bucket_num](int tid, int bucket_id) {
for (int i = 0; i < bucket_num; ++i) {
for (auto& e : frag.GetEdgesOfBucket(i, bucket_id)) {
ctx.next_result[vertex_t(e.dst)] +=
ctx.curr_result[vertex_t(e.src)];
}
}
for (int i = 0; i < bucket_num; ++i) {
for (auto& e : frag.GetEdgesOfBucket(bucket_id, i)) {
ctx.next_result[vertex_t(e.src)] +=
ctx.curr_result[vertex_t(e.dst)];
}
}
});
}

#ifdef PROFILING
ctx.t2 += GetCurrentTime();
#endif

messages.GatherMasterVertices<fragment_t, double, NumericSum<double>>(
frag, ctx.next_result, ctx.master_result);

if (ctx.step != ctx.max_round) {
#ifdef PROFILING
ctx.t1 -= GetCurrentTime();
#endif
ForEach(frag.MasterVertices(), [&ctx, base](int tid, vertex_t v) {
if (ctx.master_degree[v] > 0) {
ctx.master_result[v] =
(base + ctx.delta * ctx.master_result[v]) / ctx.master_degree[v];
} else {
ctx.master_result[v] = base;
}
});
#ifdef PROFILING
ctx.t1 += GetCurrentTime();
#endif

messages.ScatterMasterVertices<fragment_t, double>(
frag, ctx.master_result, ctx.curr_result);
messages.ForceContinue();
} else {
#ifdef PROFILING
ctx.t1 -= GetCurrentTime();
#endif
ForEach(frag.MasterVertices(), [&ctx, base](int tid, vertex_t v) {
ctx.master_result[v] = ctx.master_result[v] * ctx.delta + base;
});
#ifdef PROFILING
ctx.t1 += GetCurrentTime();
#endif
}
}
};

} // namespace grape

#endif // EXAMPLES_ANALYTICAL_APPS_PAGERANK_PAGERANK_VC_H_
93 changes: 93 additions & 0 deletions examples/analytical_apps/pagerank/pagerank_vc_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/** Copyright 2020 Alibaba Group Holding Limited.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#ifndef EXAMPLES_ANALYTICAL_APPS_PAGERANK_PAGERANK_VC_CONTEXT_H_
#define EXAMPLES_ANALYTICAL_APPS_PAGERANK_PAGERANK_VC_CONTEXT_H_

#include "grape/utils/memory_tracker.h"

#include <iomanip>

namespace grape {

template <typename FRAG_T>
class PageRankVCContext : public VertexDataContext<FRAG_T, double> {
using oid_t = typename FRAG_T::oid_t;

public:
explicit PageRankVCContext(const FRAG_T& fragment)
: VertexDataContext<FRAG_T, double>(fragment),
master_result(this->data()) {
curr_result.Init(fragment.Vertices());
next_result.Init(fragment.Vertices());
master_degree.Init(fragment.MasterVertices());

#ifdef TRACKING_MEMORY
MemoryTracker::GetInstance().allocate(fragment.Vertices().size() *
sizeof(double));
MemoryTracker::GetInstance().allocate(fragment.Vertices().size() *
sizeof(double));
MemoryTracker::GetInstance().allocate(fragment.MasterVertices().size() *
sizeof(double));
MemoryTracker::GetInstance().allocate(fragment.MasterVertices().size() *
sizeof(int));
#endif
}

void Init(GatherScatterMessageManager& messages, double delta,
int max_round) {
this->delta = delta;
this->max_round = max_round;
step = 0;
}

void Output(std::ostream& os) {
auto& frag = this->fragment();
auto master_vertices = frag.MasterVertices();
for (auto v : master_vertices) {
os << v.GetValue() << " " << std::scientific << std::setprecision(15)
<< master_result[v] << std::endl;
}

#ifdef PROFILING
VLOG(2) << "[frag-" << frag.fid() << "]: init degree: " << t0 << " s, "
<< "calc master result: " << t1 << " s, "
<< "propogate: " << t2 << " s";
#endif
}

typename FRAG_T::template both_vertex_array_t<double> curr_result;
typename FRAG_T::template both_vertex_array_t<double> next_result;
typename FRAG_T::template vertex_array_t<double>& master_result;
typename FRAG_T::template vertex_array_t<int> master_degree;

int64_t total_dangling_vnum = 0;
int64_t graph_vnum;
int step = 0;
int max_round = 0;
double delta = 0;

double dangling_sum = 0.0;

#ifdef PROFILING
double t0 = 0; // init degree
double t1 = 0; // calc master result
double t2 = 0; // propogate
#endif
};

} // namespace grape

#endif // EXAMPLES_ANALYTICAL_APPS_PAGERANK_PAGERANK_VC_CONTEXT_H_
3 changes: 3 additions & 0 deletions examples/analytical_apps/run_app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
#include <glog/logging.h>

#include "run_app_opt.h"
#include "run_app_vc.h"

int main(int argc, char* argv[]) {
FLAGS_stderrthreshold = 0;
Expand All @@ -41,6 +42,8 @@ int main(int argc, char* argv[]) {
std::string name = FLAGS_application;
if (FLAGS_opt) {
grape::RunOpt();
} else if (FLAGS_vc) {
grape::RunVC();
} else {
if (name.find("sssp") != std::string::npos) {
grape::Run<int64_t, uint32_t, grape::EmptyType, double>();
Expand Down
Loading

0 comments on commit 627416c

Please sign in to comment.