Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
luoxiaojian committed Dec 6, 2023
1 parent df54e26 commit c3a5c5c
Show file tree
Hide file tree
Showing 17 changed files with 171 additions and 144 deletions.
8 changes: 8 additions & 0 deletions examples/analytical_apps/bfs/bfs_opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ class BFSOpt : public ParallelAppBase<FRAG_T, BFSOptContext<FRAG_T>,

ctx.next_inner_updated.Swap(ctx.curr_inner_updated);
}

void EstimateMessageSize(const fragment_t& frag, size_t& send_size,
size_t& recv_size) {
send_size = frag.GetOuterVerticesNum();
send_size *= sizeof(vertex_t);
recv_size = frag.GetInnerVerticesNum();
recv_size *= (sizeof(vertex_t) * (frag.fnum() - 1));
}
};

} // namespace grape
Expand Down
7 changes: 7 additions & 0 deletions examples/analytical_apps/cdlp/cdlp_opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ class CDLPOpt : public ParallelAppBase<FRAG_T, CDLPOptContext<FRAG_T, LABEL_T>,
PropagateLabelSparse(frag, ctx, messages);
}
}

void EstimateMessageSize(const fragment_t& frag, size_t& send_size,
size_t& recv_size) {
send_size = frag.OEDestsSize() * (sizeof(vertex_t) + sizeof(label_t));
recv_size = frag.GetOuterVerticesNum();
recv_size *= (sizeof(vertex_t) + sizeof(label_t));
}
};
} // namespace grape

Expand Down
7 changes: 7 additions & 0 deletions examples/analytical_apps/cdlp/cdlp_opt_ud.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,13 @@ class CDLPOptUD
}
}
}

void EstimateMessageSize(const fragment_t& frag, size_t& send_size,
size_t& recv_size) {
send_size = frag.OEDestsSize() * (sizeof(vertex_t) + sizeof(label_t));
recv_size = frag.GetOuterVerticesNum();
recv_size *= (sizeof(vertex_t) + sizeof(label_t));
}
};
} // namespace grape

Expand Down
7 changes: 7 additions & 0 deletions examples/analytical_apps/cdlp/cdlp_opt_ud_dense.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,13 @@ class CDLPOptUDDense
}
}
}

void EstimateMessageSize(const fragment_t& frag, size_t& send_size,
size_t& recv_size) {
send_size = frag.OEDestsSize() * (sizeof(vertex_t) + sizeof(label_t));
recv_size = frag.GetOuterVerticesNum();
recv_size *= (sizeof(vertex_t) + sizeof(label_t));
}
};

} // namespace grape
Expand Down
28 changes: 28 additions & 0 deletions examples/analytical_apps/lcc/lcc_directed.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,20 @@ class LCCDirected
});
}
}

void EstimateMessageSize(const fragment_t& frag, size_t& send_size,
size_t& recv_size) {
size_t avg_degree =
(frag.GetOutgoingEdgeNum() + frag.GetIncomingEdgeNum()) /
frag.GetInnerVerticesNum() +
1;
send_size =
(avg_degree * (sizeof(vid_t) + sizeof(uint8_t)) + sizeof(vertex_t)) *
frag.IOEDestsSize();
recv_size =
(avg_degree * (sizeof(vid_t) + sizeof(uint8_t)) + sizeof(vertex_t)) *
frag.GetOuterVerticesNum();
}
};

#ifdef USE_BMISS_STTNI_INTERSECT
Expand Down Expand Up @@ -702,6 +716,20 @@ class LCCDirected<FRAG_T, COUNT_T,
});
}
}

void EstimateMessageSize(const fragment_t& frag, size_t& send_size,
size_t& recv_size) {
size_t avg_degree =
(frag.GetOutgoingEdgeNum() + frag.GetIncomingEdgeNum()) /
frag.GetInnerVerticesNum() +
1;
send_size =
(avg_degree * (sizeof(vid_t) + sizeof(uint8_t)) + sizeof(vertex_t)) *
frag.IOEDestsSize();
recv_size =
(avg_degree * (sizeof(vid_t) + sizeof(uint8_t)) + sizeof(vertex_t)) *
frag.GetOuterVerticesNum();
}
};

#endif
Expand Down
16 changes: 16 additions & 0 deletions examples/analytical_apps/lcc/lcc_opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,14 @@ class LCCOpt : public ParallelAppBase<FRAG_T, LCCOptContext<FRAG_T, COUNT_T>,
});
}
}

void EstimateMessageSize(const fragment_t& frag, size_t& send_size,
size_t& recv_size) {
size_t avg_degree =
frag.GetOutgoingEdgeNum() / frag.GetInnerVerticesNum() + 1;
send_size = (avg_degree + 1) * sizeof(vid_t) * frag.OEDestsSize();
recv_size = (avg_degree + 1) * frag.GetOuterVerticesNum() * sizeof(vid_t);
}
};

#ifdef USE_BMISS_STTNI_INTERSECT
Expand Down Expand Up @@ -609,6 +617,14 @@ class LCCOpt<FRAG_T, COUNT_T,
});
}
}

void EstimateMessageSize(const fragment_t& frag, size_t& send_size,
size_t& recv_size) {
size_t avg_degree =
frag.GetOutgoingEdgeNum() / frag.GetInnerVerticesNum() + 1;
send_size = (avg_degree + 1) * sizeof(vid_t) * frag.OEDestsSize();
recv_size = (avg_degree + 1) * frag.GetOuterVerticesNum() * sizeof(vid_t);
}
};

#endif
Expand Down
8 changes: 8 additions & 0 deletions examples/analytical_apps/pagerank/pagerank_push_opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ class PageRankPushOpt
});
}
}

void EstimateMessageSize(const fragment_t& frag, size_t& send_size,
size_t& recv_size) {
send_size = frag.GetOuterVerticesNum();
send_size *= (sizeof(vertex_t) + sizeof(double));
recv_size = frag.GetInnerVerticesNum();
recv_size *= ((sizeof(vertex_t) + sizeof(double)) * (frag.fnum() - 1));
}
};

} // namespace grape
Expand Down
123 changes: 0 additions & 123 deletions examples/analytical_apps/run_app_opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ using LCCBeta32 = LCCBeta<FRAG_T, uint32_t>;
template <typename FRAG_T>
using LCCDirected32 = LCCDirected<FRAG_T, uint32_t>;

const size_t kDefaultPoolBatchSize = 16ull * 1024 * 1024;
void init_buffer_pool(size_t pool_size, size_t batch_size) {
MessageBufferPool::Default().init(pool_size, batch_size);
}

template <LoadStrategy load_strategy>
void RunUndirectedPageRankOpt(const CommSpec& comm_spec,
const std::string& out_prefix,
Expand Down Expand Up @@ -113,12 +108,6 @@ void RunUndirectedPageRankOpt(const CommSpec& comm_spec,
} else {
using AppType = PageRankPushOpt<FRAG_T>;
auto app = std::make_shared<AppType>();
size_t pool_size = estimate_pool_size(
fragment->GetOuterVerticesNum() * (sizeof(uint32_t) + sizeof(double)),
fragment->GetInnerVerticesNum() *
(sizeof(uint32_t) + sizeof(double)) * (fragment->fnum() - 1),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
DoQuery<FRAG_T, AppType, double, int>(fragment, app, comm_spec, spec,
out_prefix, delta, mr);
}
Expand Down Expand Up @@ -146,12 +135,6 @@ void RunUndirectedPageRankOpt(const CommSpec& comm_spec,
} else {
using AppType = PageRankPushOpt<FRAG_T>;
auto app = std::make_shared<AppType>();
size_t pool_size = estimate_pool_size(
fragment->GetOuterVerticesNum() * (sizeof(uint32_t) + sizeof(double)),
fragment->GetInnerVerticesNum() *
(sizeof(uint32_t) + sizeof(double)) * (fragment->fnum() - 1),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
DoQuery<FRAG_T, AppType, double, int>(fragment, app, comm_spec, spec,
out_prefix, delta, mr);
}
Expand Down Expand Up @@ -227,23 +210,11 @@ void RunDirectedCDLP(const CommSpec& comm_spec, const std::string& out_prefix,
if (is_int32(min_max_id.first) && is_int32(min_max_id.second)) {
using AppType = CDLPOpt<FRAG_T, int32_t>;
auto app = std::make_shared<AppType>();
size_t pool_size = estimate_pool_size(
fragment->GetInnerVerticesNum() * (sizeof(uint32_t) + sizeof(int)) *
(fragment->fnum() - 1),
fragment->GetOuterVerticesNum() * (sizeof(uint32_t) + sizeof(int)),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
DoQuery<FRAG_T, AppType, int>(fragment, app, comm_spec, spec, out_prefix,
FLAGS_cdlp_mr);
} else {
using AppType = CDLPOpt<FRAG_T, int64_t>;
auto app = std::make_shared<AppType>();
size_t pool_size = estimate_pool_size(
fragment->GetInnerVerticesNum() * (sizeof(uint32_t) + sizeof(int64_t)) *
(fragment->fnum() - 1),
fragment->GetOuterVerticesNum() * (sizeof(uint32_t) + sizeof(int64_t)),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
DoQuery<FRAG_T, AppType, int>(fragment, app, comm_spec, spec, out_prefix,
FLAGS_cdlp_mr);
}
Expand Down Expand Up @@ -275,12 +246,6 @@ void RunUndirectedCDLP(const CommSpec& comm_spec, const std::string& out_prefix,
std::pair<int64_t, int64_t> min_max_id =
get_min_max_id(*fragment->GetVertexMap());
if (is_int32(min_max_id.first) && is_int32(min_max_id.second)) {
size_t pool_size = estimate_pool_size(
fragment->GetInnerVerticesNum() * (sizeof(uint32_t) + sizeof(int32_t)) *
(fragment->fnum() - 1),
fragment->GetOuterVerticesNum() * (sizeof(uint32_t) + sizeof(int32_t)),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
if (avg_degree > 256) {
using AppType = CDLPOptUDDense<FRAG_T, int32_t>;
auto app = std::make_shared<AppType>();
Expand All @@ -293,12 +258,6 @@ void RunUndirectedCDLP(const CommSpec& comm_spec, const std::string& out_prefix,
FLAGS_cdlp_mr);
}
} else {
size_t pool_size = estimate_pool_size(
fragment->GetInnerVerticesNum() * (sizeof(uint32_t) + sizeof(int64_t)) *
(fragment->fnum() - 1),
fragment->GetOuterVerticesNum() * (sizeof(uint32_t) + sizeof(int64_t)),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
if (avg_degree > 256) {
using AppType = CDLPOptUDDense<FRAG_T, int64_t>;
auto app = std::make_shared<AppType>();
Expand Down Expand Up @@ -335,65 +294,6 @@ void CreateAndQueryOpt(const CommSpec& comm_spec, const std::string& out_prefix,
std::shared_ptr<FRAG_T> fragment =
LoadGraph<FRAG_T>(FLAGS_efile, FLAGS_vfile, comm_spec, graph_spec);
using AppType = APP_T<FRAG_T>;
if (std::is_same<AppType, SSSPOpt<FRAG_T>>::value) {
size_t pool_size = estimate_pool_size(
fragment->GetOuterVerticesNum() * (sizeof(uint32_t) + sizeof(double)),
fragment->GetInnerVerticesNum() *
(sizeof(uint32_t) + sizeof(double)) * (fragment->fnum() - 1),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
} else if (std::is_same<AppType, BFSOpt<FRAG_T>>::value) {
size_t pool_size = estimate_pool_size(
fragment->GetOuterVerticesNum() * (sizeof(uint32_t)),
fragment->GetInnerVerticesNum() * (sizeof(uint32_t)) *
(fragment->fnum() - 1),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
} else if (std::is_same<AppType, WCCOpt<FRAG_T>>::value) {
size_t pool_size = estimate_pool_size(
fragment->GetOuterVerticesNum() *
(sizeof(uint32_t) + sizeof(int64_t)),
fragment->GetInnerVerticesNum() *
(sizeof(uint32_t) + sizeof(int64_t)) * (fragment->fnum() - 1),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
} else if (std::is_same<AppType, LCCDirected32<FRAG_T>>::value) {
size_t avg_degree =
((FLAGS_edge_num + FLAGS_vertex_num - 1) / FLAGS_vertex_num + 1) / 2;
size_t pool_size = estimate_pool_size(
fragment->GetInnerVerticesNum() * sizeof(uint32_t) *
(avg_degree + 1) * (fragment->fnum() - 1),
fragment->GetOuterVerticesNum() * sizeof(uint32_t) * (avg_degree + 1),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
} else if (std::is_same<AppType, LCCDirected64<FRAG_T>>::value) {
size_t avg_degree =
((FLAGS_edge_num + FLAGS_vertex_num - 1) / FLAGS_vertex_num + 1) / 2;
size_t pool_size = estimate_pool_size(
fragment->GetInnerVerticesNum() * sizeof(uint64_t) *
(avg_degree + 1) * (fragment->fnum() - 1),
fragment->GetOuterVerticesNum() * sizeof(uint64_t) * (avg_degree + 1),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
} else if (std::is_same<AppType, LCC64<FRAG_T>>::value) {
size_t avg_degree =
(FLAGS_edge_num + FLAGS_vertex_num - 1) / FLAGS_vertex_num;
size_t pool_size = estimate_pool_size(
fragment->GetInnerVerticesNum() * sizeof(uint32_t) *
(avg_degree + 1) * (fragment->fnum() - 1),
fragment->GetOuterVerticesNum() * sizeof(uint32_t) * (avg_degree + 1),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
} else if (std::is_same<AppType, LCC32<FRAG_T>>::value) {
size_t avg_degree =
(FLAGS_edge_num + FLAGS_vertex_num - 1) / FLAGS_vertex_num;
size_t pool_size = estimate_pool_size(
fragment->GetInnerVerticesNum() * sizeof(uint64_t) *
(avg_degree + 1) * (fragment->fnum() - 1),
fragment->GetOuterVerticesNum() * sizeof(uint64_t) * (avg_degree + 1),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
}
auto app = std::make_shared<AppType>();
DoQuery<FRAG_T, AppType, Args...>(fragment, app, comm_spec, spec,
out_prefix, args...);
Expand All @@ -404,29 +304,6 @@ void CreateAndQueryOpt(const CommSpec& comm_spec, const std::string& out_prefix,
std::shared_ptr<FRAG_T> fragment =
LoadGraph<FRAG_T>(FLAGS_efile, FLAGS_vfile, comm_spec, graph_spec);
using AppType = APP_T<FRAG_T>;
if (std::is_same<AppType, SSSPOpt<FRAG_T>>::value) {
size_t pool_size = estimate_pool_size(
fragment->GetOuterVerticesNum() * (sizeof(uint32_t) + sizeof(double)),
fragment->GetInnerVerticesNum() *
(sizeof(uint32_t) + sizeof(double)) * (fragment->fnum() - 1),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
} else if (std::is_same<AppType, BFSOpt<FRAG_T>>::value) {
size_t pool_size = estimate_pool_size(
fragment->GetOuterVerticesNum() * (sizeof(uint32_t)),
fragment->GetInnerVerticesNum() * (sizeof(uint32_t)) *
(fragment->fnum() - 1),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
} else if (std::is_same<AppType, WCCOpt<FRAG_T>>::value) {
size_t pool_size = estimate_pool_size(
fragment->GetOuterVerticesNum() *
(sizeof(uint32_t) + sizeof(int64_t)),
fragment->GetInnerVerticesNum() *
(sizeof(uint32_t) + sizeof(int64_t)) * (fragment->fnum() - 1),
kDefaultPoolBatchSize, fragment->fnum(), spec.thread_num);
init_buffer_pool(pool_size, kDefaultPoolBatchSize);
}
auto app = std::make_shared<AppType>();
DoQuery<FRAG_T, AppType, Args...>(fragment, app, comm_spec, spec,
out_prefix, args...);
Expand Down
8 changes: 8 additions & 0 deletions examples/analytical_apps/sssp/sssp_opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ class SSSPOpt : public ParallelAppBase<FRAG_T, SSSPOptContext<FRAG_T>,
ctx.postprocess_time += GetCurrentTime();
#endif
}

void EstimateMessageSize(const fragment_t& frag, size_t& send_size,
size_t& recv_size) {
send_size = frag.GetOuterVerticesNum();
send_size *= (sizeof(vertex_t) + sizeof(double));
recv_size = frag.GetInnerVerticesNum();
recv_size *= ((sizeof(vertex_t) + sizeof(double)) * (frag.fnum() - 1));
}
};

} // namespace grape
Expand Down
9 changes: 9 additions & 0 deletions examples/analytical_apps/wcc/wcc_opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,15 @@ class WCCOpt : public ParallelAppBase<FRAG_T, WCCOptContext<FRAG_T>,

ctx.curr_modified.Swap(ctx.next_modified);
}

void EstimateMessageSize(const fragment_t& frag, size_t& send_size,
size_t& recv_size) {
LOG(INFO) << "EstimateMessageSize";
send_size = frag.GetOuterVerticesNum();
send_size *= (sizeof(vertex_t) + sizeof(oid_t));
recv_size = frag.GetInnerVerticesNum();
recv_size *= ((sizeof(vertex_t) + sizeof(oid_t)) * (frag.fnum() - 1));
}
};

#undef MIN_COMP_ID
Expand Down
6 changes: 6 additions & 0 deletions grape/app/parallel_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ class ParallelAppBase {
*/
virtual void IncEval(const FRAG_T& graph, CONTEXT_T& context,
message_manager_t& messages) = 0;

virtual void EstimateMessageSize(const FRAG_T& graph, size_t& send_size,
size_t& recv_size) {
send_size = 0;
recv_size = 0;
}
};

#define INSTALL_PARALLEL_WORKER(APP_T, CONTEXT_T, FRAG_T) \
Expand Down
Loading

0 comments on commit c3a5c5c

Please sign in to comment.