Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Particle attribute internal buffer for MPI communication #215

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions alpine/BumponTailInstability.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,6 @@ int main(int argc, char* argv[]) {

P->initializeFields(mesh, FL);

bunch_type bunchBuffer(PL);

P->initSolver();
P->time_m = 0.0;
P->loadbalancethreshold_m = std::atof(argv[arg++]);
Expand Down Expand Up @@ -358,7 +356,7 @@ int main(int argc, char* argv[]) {
Kokkos::fence();

P->initializeORB(FL, mesh);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
}

Expand Down Expand Up @@ -465,14 +463,14 @@ int main(int argc, char* argv[]) {

// Since the particles have moved spatially update them to correct processors
IpplTimings::startTimer(updateTimer);
PL.update(*P, bunchBuffer);
P->update();
IpplTimings::stopTimer(updateTimer);

// Domain Decomposition
if (P->balance(totalP, it + 1)) {
msg << "Starting repartition" << endl;
IpplTimings::startTimer(domainDecomposition);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
// IpplTimings::startTimer(dumpDataTimer);
// P->dumpLocalDomains(FL, it+1);
Expand Down
20 changes: 4 additions & 16 deletions alpine/ChargedParticles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,6 @@ class ChargedParticles : public ippl::ParticleBase<PLayout> {
typename Base::particle_position_type P; // particle velocity
typename Base::particle_position_type E; // electric field at particle position

/*
This constructor is mandatory for all derived classes from
ParticleBase as the bunch buffer uses this
*/
ChargedParticles(PLayout& pl)
: Base(pl) {
registerAttributes();
setPotentialBCs();
}

ChargedParticles(PLayout& pl, Vector_t<double, Dim> hr, Vector_t<double, Dim> rmin,
Vector_t<double, Dim> rmax, ippl::e_dim_tag decomp[Dim], double Q,
std::string solver)
Expand Down Expand Up @@ -280,8 +270,7 @@ class ChargedParticles : public ippl::ParticleBase<PLayout> {

void setupBCs() { setBCAllPeriodic(); }

void updateLayout(FieldLayout_t<Dim>& fl, Mesh_t<Dim>& mesh,
ChargedParticles<PLayout, T, Dim>& buffer, bool& isFirstRepartition) {
void updateLayout(FieldLayout_t<Dim>& fl, Mesh_t<Dim>& mesh, bool& isFirstRepartition) {
// Update local fields
static IpplTimings::TimerRef tupdateLayout = IpplTimings::getTimer("updateLayout");
IpplTimings::startTimer(tupdateLayout);
Expand All @@ -299,7 +288,7 @@ class ChargedParticles : public ippl::ParticleBase<PLayout> {
static IpplTimings::TimerRef tupdatePLayout = IpplTimings::getTimer("updatePB");
IpplTimings::startTimer(tupdatePLayout);
if (!isFirstRepartition) {
layout.update(*this, buffer);
this->update();
}
IpplTimings::stopTimer(tupdatePLayout);
}
Expand All @@ -317,8 +306,7 @@ class ChargedParticles : public ippl::ParticleBase<PLayout> {
orb.initialize(fl, mesh, rho_m);
}

void repartition(FieldLayout_t<Dim>& fl, Mesh_t<Dim>& mesh,
ChargedParticles<PLayout, T, Dim>& buffer, bool& isFirstRepartition) {
void repartition(FieldLayout_t<Dim>& fl, Mesh_t<Dim>& mesh, bool& isFirstRepartition) {
// Repartition the domains
bool res = orb.binaryRepartition(this->R, fl, isFirstRepartition);

Expand All @@ -327,7 +315,7 @@ class ChargedParticles : public ippl::ParticleBase<PLayout> {
return;
}
// Update
this->updateLayout(fl, mesh, buffer, isFirstRepartition);
this->updateLayout(fl, mesh, isFirstRepartition);
if constexpr (Dim == 2 || Dim == 3) {
if (stype_m == "FFT") {
std::get<FFTSolver_t<T, Dim>>(solver_m).setRhs(rho_m);
Expand Down
8 changes: 3 additions & 5 deletions alpine/LandauDamping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,6 @@ int main(int argc, char* argv[]) {

P->initializeFields(mesh, FL);

bunch_type bunchBuffer(PL);

P->initSolver();
P->time_m = 0.0;
P->loadbalancethreshold_m = std::atof(argv[arg++]);
Expand Down Expand Up @@ -244,7 +242,7 @@ int main(int argc, char* argv[]) {
Kokkos::fence();

P->initializeORB(FL, mesh);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
}

Expand Down Expand Up @@ -334,14 +332,14 @@ int main(int argc, char* argv[]) {

// Since the particles have moved spatially update them to correct processors
IpplTimings::startTimer(updateTimer);
PL.update(*P, bunchBuffer);
P->update();
IpplTimings::stopTimer(updateTimer);

// Domain Decomposition
if (P->balance(totalP, it + 1)) {
msg << "Starting repartition" << endl;
IpplTimings::startTimer(domainDecomposition);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
// IpplTimings::startTimer(dumpDataTimer);
// P->dumpLocalDomains(FL, it+1);
Expand Down
8 changes: 3 additions & 5 deletions alpine/LandauDampingMixedExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,6 @@ int main(int argc, char* argv[]) {

P->initializeFields(mesh, FL);

bunch_type bunchBuffer(PL);

P->initSolver();
P->time_m = 0.0;
P->loadbalancethreshold_m = std::atof(argv[arg++]);
Expand Down Expand Up @@ -249,7 +247,7 @@ int main(int argc, char* argv[]) {
Kokkos::fence();

P->initializeORB(FL, mesh);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
}

Expand Down Expand Up @@ -343,14 +341,14 @@ int main(int argc, char* argv[]) {

// Since the particles have moved spatially update them to correct processors
IpplTimings::startTimer(updateTimer);
PL.update(*P, bunchBuffer);
P->update();
IpplTimings::stopTimer(updateTimer);

// Domain Decomposition
if (P->balance(totalP, it + 1)) {
msg << "Starting repartition" << endl;
IpplTimings::startTimer(domainDecomposition);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
// IpplTimings::startTimer(dumpDataTimer);
// P->dumpLocalDomains(FL, it+1);
Expand Down
8 changes: 3 additions & 5 deletions alpine/LandauDampingMixedPrecision.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ int main(int argc, char* argv[]) {

P->initializeFields(mesh, FL);

bunch_type bunchBuffer(PL);

P->initSolver();
P->time_m = 0.0;
P->loadbalancethreshold_m = std::atof(argv[arg++]);
Expand Down Expand Up @@ -240,7 +238,7 @@ int main(int argc, char* argv[]) {
Kokkos::fence();

P->initializeORB(FL, mesh);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
}

Expand Down Expand Up @@ -332,14 +330,14 @@ int main(int argc, char* argv[]) {

// Since the particles have moved spatially update them to correct processors
IpplTimings::startTimer(updateTimer);
PL.update(*P, bunchBuffer);
P->update();
IpplTimings::stopTimer(updateTimer);

// Domain Decomposition
if (P->balance(totalP, it + 1)) {
msg << "Starting repartition" << endl;
IpplTimings::startTimer(domainDecomposition);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
// IpplTimings::startTimer(dumpDataTimer);
// P->dumpLocalDomains(FL, it+1);
Expand Down
8 changes: 3 additions & 5 deletions alpine/PenningTrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ int main(int argc, char* argv[]) {

P->initializeFields(mesh, FL);

bunch_type bunchBuffer(PL);

P->initSolver();
P->time_m = 0.0;
P->loadbalancethreshold_m = std::atof(argv[7]);
Expand Down Expand Up @@ -251,7 +249,7 @@ int main(int argc, char* argv[]) {
Kokkos::fence();

P->initializeORB(FL, mesh);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
}

Expand Down Expand Up @@ -362,14 +360,14 @@ int main(int argc, char* argv[]) {

// Since the particles have moved spatially update them to correct processors
IpplTimings::startTimer(updateTimer);
PL.update(*P, bunchBuffer);
P->update();
IpplTimings::stopTimer(updateTimer);

// Domain Decomposition
if (P->balance(totalP, it + 1)) {
msg << "Starting repartition" << endl;
IpplTimings::startTimer(domainDecomposition);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
// IpplTimings::startTimer(dumpDataTimer);
// P->dumpLocalDomains(FL, it+1);
Expand Down
8 changes: 3 additions & 5 deletions alpine/UniformPlasmaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,8 @@ int main(int argc, char* argv[]) {

P->initializeFields(mesh, FL);

bunch_type bunchBuffer(PL);

IpplTimings::startTimer(updateTimer);
PL.update(*P, bunchBuffer);
P->update();
IpplTimings::stopTimer(updateTimer);

msg << "particles created and initial conditions assigned " << endl;
Expand Down Expand Up @@ -218,14 +216,14 @@ int main(int argc, char* argv[]) {

// Since the particles have moved spatially update them to correct processors
IpplTimings::startTimer(updateTimer);
PL.update(*P, bunchBuffer);
P->update();
IpplTimings::stopTimer(updateTimer);

// Domain Decomposition
if (P->balance(totalP, it + 1)) {
msg << "Starting repartition" << endl;
IpplTimings::startTimer(domainDecomposition);
P->repartition(FL, mesh, bunchBuffer, fromAnalyticDensity);
P->repartition(FL, mesh, fromAnalyticDensity);
IpplTimings::stopTimer(domainDecomposition);
}

Expand Down
10 changes: 6 additions & 4 deletions src/Particle/ParticleAttrib.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace ippl {
using hash_type = typename Base::hash_type;

using view_type = typename detail::ViewType<T, 1, Properties...>::view_type;

using HostMirror = typename view_type::host_mirror_type;

using memory_space = typename view_type::memory_space;
Expand All @@ -59,16 +60,16 @@ namespace ippl {
void destroy(const hash_type& deleteIndex, const hash_type& keepIndex,
size_type invalidCount) override;

void pack(void*, const hash_type&) const override;
void pack(const hash_type&) override;

void unpack(void*, size_type) override;
void unpack(size_type) override;

void serialize(detail::Archive<memory_space>& ar, size_type nsends) override {
ar.serialize(dview_m, nsends);
ar.serialize(buf_m, nsends);
}

void deserialize(detail::Archive<memory_space>& ar, size_type nrecvs) override {
ar.deserialize(dview_m, nrecvs);
ar.deserialize(buf_m, nrecvs);
}

virtual ~ParticleAttrib() = default;
Expand Down Expand Up @@ -132,6 +133,7 @@ namespace ippl {

private:
view_type dview_m;
view_type buf_m;
};
} // namespace ippl

Expand Down
18 changes: 6 additions & 12 deletions src/Particle/ParticleAttrib.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,22 @@ namespace ippl {
}

template <typename T, class... Properties>
void ParticleAttrib<T, Properties...>::pack(void* buffer, const hash_type& hash) const {
using this_type = ParticleAttrib<T, Properties...>;
this_type* buffer_p = static_cast<this_type*>(buffer);
auto& view = buffer_p->dview_m;
void ParticleAttrib<T, Properties...>::pack(const hash_type& hash) {
auto size = hash.extent(0);
if (view.extent(0) < size) {
if (buf_m.extent(0) < size) {
int overalloc = Comm->getDefaultOverallocation();
Kokkos::realloc(view, size * overalloc);
Kokkos::realloc(buf_m, size * overalloc);
}

using policy_type = Kokkos::RangePolicy<execution_space>;
Kokkos::parallel_for(
"ParticleAttrib::pack()", policy_type(0, size),
KOKKOS_CLASS_LAMBDA(const size_t i) { view(i) = dview_m(hash(i)); });
KOKKOS_CLASS_LAMBDA(const size_t i) { buf_m(i) = dview_m(hash(i)); });
Kokkos::fence();
}

template <typename T, class... Properties>
void ParticleAttrib<T, Properties...>::unpack(void* buffer, size_type nrecvs) {
using this_type = ParticleAttrib<T, Properties...>;
this_type* buffer_p = static_cast<this_type*>(buffer);
auto& view = buffer_p->dview_m;
void ParticleAttrib<T, Properties...>::unpack(size_type nrecvs) {
auto size = dview_m.extent(0);
size_type required = *(this->localNum_mp) + nrecvs;
if (size < required) {
Expand All @@ -78,7 +72,7 @@ namespace ippl {
using policy_type = Kokkos::RangePolicy<execution_space>;
Kokkos::parallel_for(
"ParticleAttrib::unpack()", policy_type(0, nrecvs),
KOKKOS_CLASS_LAMBDA(const size_t i) { dview_m(count + i) = view(i); });
KOKKOS_CLASS_LAMBDA(const size_t i) { dview_m(count + i) = buf_m(i); });
Kokkos::fence();
}

Expand Down
4 changes: 2 additions & 2 deletions src/Particle/ParticleAttribBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ namespace ippl {
virtual void destroy(const hash_type&, const hash_type&, size_type) = 0;
virtual size_type packedSize(const size_type) const = 0;

virtual void pack(void*, const hash_type&) const = 0;
virtual void pack(const hash_type&) = 0;

virtual void unpack(void*, size_type) = 0;
virtual void unpack(size_type) = 0;

virtual void serialize(Archive<memory_space>& ar, size_type nsends) = 0;

Expand Down
17 changes: 7 additions & 10 deletions src/Particle/ParticleBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,11 @@ namespace ippl {
template <typename... Properties>
void destroy(const Kokkos::View<bool*, Properties...>& invalid, const size_type destroyNum);

template <typename HashType, typename BufferType>
template <typename HashType>
void sendToRank(int rank, int tag, int sendNum, std::vector<MPI_Request>& requests,
const HashType& hash, BufferType& buffer);
const HashType& hash);

template <typename BufferType>
void recvFromRank(int rank, int tag, int recvNum, size_type nRecvs, BufferType& buffer);
void recvFromRank(int rank, int tag, int recvNum, size_type nRecvs);

/*!
* Serialize to do MPI calls.
Expand All @@ -278,23 +277,21 @@ namespace ippl {
template <typename MemorySpace>
size_type packedSize(const size_type count) const;

void update() { layout_m->update(*this); }

protected:
/*!
* Fill attributes of buffer.
* @tparam Buffer is a bunch type
* @param buffer to send
* @param hash function to access index.
*/
template <class Buffer>
void pack(Buffer& buffer, const hash_container_type& hash);
void pack(const hash_container_type& hash);

/*!
* Fill my attributes.
* @tparam Buffer is a bunch type
* @param buffer received
*/
template <class Buffer>
void unpack(Buffer& buffer, size_type nrecvs);
void unpack(size_type nrecvs);

private:
//! particle layout
Expand Down
Loading