Skip to content

Commit

Permalink
Refactor vertex map, imported pthash. (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
luoxiaojian committed Aug 23, 2024
1 parent 95dd1c0 commit c20df19
Show file tree
Hide file tree
Showing 83 changed files with 10,968 additions and 2,343 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall")
if (APPLE)
set(CMAKE_MACOSX_RPATH ON)
else ()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fopenmp -Werror -Wl,-rpath,$ORIGIN")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fopenmp -Werror -Wl,-rpath,$ORIGIN -march=native")
endif ()
if (USE_SIMD)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2 -march=native")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2")
endif ()
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g -fprofile-arcs -ftest-coverage")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -g")
Expand Down
3 changes: 3 additions & 0 deletions examples/analytical_apps/cuda/pagerank/pagerank.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ limitations under the License.
#ifdef __CUDACC__
#include "cuda/app_config.h"
#include "grape/grape.h"
#include <thrust/device_vector.h>
#include <thrust/execution_policy.h>
#include <thrust/transform_reduce.h>
namespace grape {
namespace cuda {

Expand Down
8 changes: 6 additions & 2 deletions examples/analytical_apps/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ DEFINE_int32(kcore_k, 0, "k value of kcore.");
DEFINE_int32(kclique_k, 0, "k value of kclique.");

DEFINE_bool(opt, false, "whether to use optimization.");
DEFINE_string(partitioner_type, "map",
"partitioner type, these options can be used: "
"hash, map, segment");
DEFINE_string(idxer_type, "hashmap",
"idxer type, these options can be used: "
"sorted_array, hashmap, pthash, local");

DEFINE_bool(segmented_partition, true,
"whether to use segmented partitioning.");
DEFINE_bool(rebalance, false, "whether to rebalance graph after loading.");
DEFINE_int32(rebalance_vertex_factor, 0, "vertex factor of rebalancing.");

Expand Down
3 changes: 2 additions & 1 deletion examples/analytical_apps/flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ DECLARE_int32(kclique_k);
DECLARE_int32(degree_threshold);

DECLARE_bool(opt);
DECLARE_string(partitioner_type);
DECLARE_string(idxer_type);

DECLARE_bool(segmented_partition);
DECLARE_bool(rebalance);
DECLARE_int32(rebalance_vertex_factor);

Expand Down
20 changes: 11 additions & 9 deletions examples/analytical_apps/lcc/lcc_opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,15 @@ class LCCOpt<FRAG_T, COUNT_T,
auto inner_vertices = frag.InnerVertices();

messages.InitChannels(thread_num());
auto& channels = messages.Channels();

ctx.stage = 0;

// Each vertex scatter its own out degree.
ForEach(inner_vertices, [&messages, &frag, &ctx](int tid, vertex_t v) {
ForEach(inner_vertices, [&channels, &frag, &ctx](int tid, vertex_t v) {
ctx.global_degree[v] = frag.GetLocalOutDegree(v);
messages.SendMsgThroughOEdges<fragment_t, int>(frag, v,
ctx.global_degree[v], tid);
channels[tid].SendMsgThroughOEdges<fragment_t, int>(frag, v,
ctx.global_degree[v]);
});

// Just in case we are running on single process and no messages will
Expand Down Expand Up @@ -504,14 +505,15 @@ class LCCOpt<FRAG_T, COUNT_T,
message_manager_t& messages) {
auto inner_vertices = frag.InnerVertices();
auto outer_vertices = frag.OuterVertices();
auto& channels = messages.Channels();

if (ctx.stage == 0) {
ctx.stage = 1;
messages.ParallelProcess<fragment_t, int>(
thread_num(), frag,
[&ctx](int tid, vertex_t u, int msg) { ctx.global_degree[u] = msg; });
ctx.memory_pools.resize(thread_num());
ForEach(inner_vertices, [&frag, &ctx, &messages](int tid, vertex_t v) {
ForEach(inner_vertices, [&frag, &ctx, &channels](int tid, vertex_t v) {
vid_t v_gid_hash = IdHasher<vid_t>::hash(frag.GetInnerVertexGid(v));
auto& pool = ctx.memory_pools[tid];
auto& nbr_vec = ctx.complete_neighbor[v];
Expand Down Expand Up @@ -543,8 +545,8 @@ class LCCOpt<FRAG_T, COUNT_T,
#else
std::sort(nbr_ptr, nbr_ptr + nbr_vec.size());
#endif
messages.SendMsgThroughOEdges<fragment_t, VecOutType>(frag, v, msg_vec,
tid);
channels[tid].SendMsgThroughOEdges<fragment_t, VecOutType>(frag, v,
msg_vec);
});
messages.ForceContinue();
} else if (ctx.stage == 1) {
Expand Down Expand Up @@ -586,10 +588,10 @@ class LCCOpt<FRAG_T, COUNT_T,
atomic_add(ctx.tricnt[v], v_count);
}
});
ForEach(outer_vertices, [&messages, &frag, &ctx](int tid, vertex_t v) {
ForEach(outer_vertices, [&channels, &frag, &ctx](int tid, vertex_t v) {
if (ctx.tricnt[v] != 0) {
messages.SyncStateOnOuterVertex<fragment_t, count_t>(
frag, v, ctx.tricnt[v], tid);
channels[tid].SyncStateOnOuterVertex<fragment_t, count_t>(
frag, v, ctx.tricnt[v]);
}
});
messages.ForceContinue();
Expand Down
83 changes: 30 additions & 53 deletions examples/analytical_apps/run_app.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ limitations under the License.
#include <grape/fragment/loader.h>
#include <grape/grape.h>
#include <grape/util.h>
#include <grape/vertex_map/global_vertex_map.h>

#ifdef GRANULA
#include "thirdparty/atlarge-research-granula/granula.hpp"
Expand Down Expand Up @@ -73,10 +72,7 @@ void Init() {
if (FLAGS_deserialize && FLAGS_serialization_prefix.empty()) {
LOG(FATAL) << "Please assign a serialization prefix.";
} else if (FLAGS_efile.empty()) {
LOG(FATAL) << "Please assign input edge files.";
} else if (FLAGS_vfile.empty() && FLAGS_segmented_partition) {
LOG(FATAL) << "EFragmentLoader dosen't support Segmented Partitioner. "
"Please assign vertex files or use Hash Partitioner";
LOG(FATAL) << "Please assign input edge file.";
}

if (!FLAGS_out_prefix.empty() && access(FLAGS_out_prefix.c_str(), 0) != 0) {
Expand Down Expand Up @@ -173,28 +169,19 @@ void CreateAndQuery(const CommSpec& comm_spec, const std::string& out_prefix,
} else if (FLAGS_serialize) {
graph_spec.set_serialize(true, FLAGS_serialization_prefix);
}
if (FLAGS_segmented_partition) {
using VertexMapType =
GlobalVertexMap<OID_T, VID_T, SegmentedPartitioner<OID_T>>;
using FRAG_T = ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T,
load_strategy, VertexMapType>;
std::shared_ptr<FRAG_T> fragment =
LoadGraph<FRAG_T>(FLAGS_efile, FLAGS_vfile, comm_spec, graph_spec);
using AppType = APP_T<FRAG_T>;
auto app = std::make_shared<AppType>();
DoQuery<FRAG_T, AppType, Args...>(fragment, app, comm_spec, spec,
out_prefix, args...);
} else {
graph_spec.set_rebalance(false, 0);
using FRAG_T =
ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T, load_strategy>;
std::shared_ptr<FRAG_T> fragment =
LoadGraph<FRAG_T>(FLAGS_efile, FLAGS_vfile, comm_spec, graph_spec);
using AppType = APP_T<FRAG_T>;
auto app = std::make_shared<AppType>();
DoQuery<FRAG_T, AppType, Args...>(fragment, app, comm_spec, spec,
out_prefix, args...);
}

graph_spec.partitioner_type =
grape::parse_partitioner_type_name(FLAGS_partitioner_type);
graph_spec.idxer_type = grape::parse_idxer_type_name(FLAGS_idxer_type);

using FRAG_T =
ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T, load_strategy>;
std::shared_ptr<FRAG_T> fragment =
LoadGraph<FRAG_T>(FLAGS_efile, FLAGS_vfile, comm_spec, graph_spec);
using AppType = APP_T<FRAG_T>;
auto app = std::make_shared<AppType>();
DoQuery<FRAG_T, AppType, Args...>(fragment, app, comm_spec, spec, out_prefix,
args...);
}

template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T,
Expand All @@ -212,32 +199,22 @@ void CreateAndQueryStagedApp(const CommSpec& comm_spec,
} else if (FLAGS_serialize) {
graph_spec.set_serialize(true, FLAGS_serialization_prefix);
}
if (FLAGS_segmented_partition) {
using VertexMapType =
GlobalVertexMap<OID_T, VID_T, SegmentedPartitioner<OID_T>>;
using FRAG_T = ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T,
load_strategy, VertexMapType>;
std::shared_ptr<FRAG_T> fragment =
LoadGraph<FRAG_T>(FLAGS_efile, FLAGS_vfile, comm_spec, graph_spec);
using App1Type = APP1_T<FRAG_T>;
auto app1 = std::make_shared<App1Type>();
using App2Type = APP2_T<FRAG_T>;
auto app2 = std::make_shared<App2Type>();
DoDualQuery<FRAG_T, App1Type, App2Type, Args...>(
fragment, app1, app2, comm_spec, spec, out_prefix, args...);
} else {
graph_spec.set_rebalance(false, 0);
using FRAG_T =
ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T, load_strategy>;
std::shared_ptr<FRAG_T> fragment =
LoadGraph<FRAG_T>(FLAGS_efile, FLAGS_vfile, comm_spec, graph_spec);
using App1Type = APP1_T<FRAG_T>;
auto app1 = std::make_shared<App1Type>();
using App2Type = APP2_T<FRAG_T>;
auto app2 = std::make_shared<App2Type>();
DoDualQuery<FRAG_T, App1Type, App2Type, Args...>(
fragment, app1, app2, comm_spec, spec, out_prefix, args...);
}

graph_spec.partitioner_type =
grape::parse_partitioner_type_name(FLAGS_partitioner_type);
graph_spec.idxer_type = grape::parse_idxer_type_name(FLAGS_idxer_type);

using FRAG_T =
ImmutableEdgecutFragment<OID_T, VID_T, VDATA_T, EDATA_T, load_strategy>;
std::shared_ptr<FRAG_T> fragment =
LoadGraph<FRAG_T>(FLAGS_efile, FLAGS_vfile, comm_spec, graph_spec);

using App1Type = APP1_T<FRAG_T>;
auto app1 = std::make_shared<App1Type>();
using App2Type = APP2_T<FRAG_T>;
auto app2 = std::make_shared<App2Type>();
DoDualQuery<FRAG_T, App1Type, App2Type, Args...>(
fragment, app1, app2, comm_spec, spec, out_prefix, args...);
}

template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T>
Expand Down
Loading

0 comments on commit c20df19

Please sign in to comment.