Skip to content

Commit

Permalink
Merge pull request #215 from matt-frey/attribute-internal-buffer
Browse files Browse the repository at this point in the history
Particle attribute internal buffer for MPI communication
  • Loading branch information
matt-frey committed Sep 5, 2023
2 parents 14d59ed + cfe31ab commit d5aa807
Show file tree
Hide file tree
Showing 21 changed files with 90 additions and 171 deletions.
8 changes: 3 additions & 5 deletions alpine/BumponTailInstability.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,6 @@ int main(int argc, char* argv[]) {

P->initializeFields(mesh, FL);

bunch_type bunchBuffer(PL);

P->initSolver();
P->time_m = 0.0;
P->loadbalancethreshold_m = std::atof(argv[arg++]);
Expand Down Expand Up @@ -358,7 +356,7 @@ int main(int argc, char* argv[]) {
Kokkos::fence();

P->initializeORB(FL, mesh);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
}

Expand Down Expand Up @@ -465,14 +463,14 @@ int main(int argc, char* argv[]) {

// Since the particles have moved spatially update them to correct processors
IpplTimings::startTimer(updateTimer);
PL.update(*P, bunchBuffer);
P->update();
IpplTimings::stopTimer(updateTimer);

// Domain Decomposition
if (P->balance(totalP, it + 1)) {
msg << "Starting repartition" << endl;
IpplTimings::startTimer(domainDecomposition);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
// IpplTimings::startTimer(dumpDataTimer);
// P->dumpLocalDomains(FL, it+1);
Expand Down
20 changes: 4 additions & 16 deletions alpine/ChargedParticles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,6 @@ class ChargedParticles : public ippl::ParticleBase<PLayout> {
typename Base::particle_position_type P; // particle velocity
typename Base::particle_position_type E; // electric field at particle position

/*
This constructor is mandatory for all derived classes from
ParticleBase as the bunch buffer uses this
*/
ChargedParticles(PLayout& pl)
: Base(pl) {
registerAttributes();
setPotentialBCs();
}

ChargedParticles(PLayout& pl, Vector_t<double, Dim> hr, Vector_t<double, Dim> rmin,
Vector_t<double, Dim> rmax, ippl::e_dim_tag decomp[Dim], double Q,
std::string solver)
Expand Down Expand Up @@ -280,8 +270,7 @@ class ChargedParticles : public ippl::ParticleBase<PLayout> {

void setupBCs() { setBCAllPeriodic(); }

void updateLayout(FieldLayout_t<Dim>& fl, Mesh_t<Dim>& mesh,
ChargedParticles<PLayout, T, Dim>& buffer, bool& isFirstRepartition) {
void updateLayout(FieldLayout_t<Dim>& fl, Mesh_t<Dim>& mesh, bool& isFirstRepartition) {
// Update local fields
static IpplTimings::TimerRef tupdateLayout = IpplTimings::getTimer("updateLayout");
IpplTimings::startTimer(tupdateLayout);
Expand All @@ -299,7 +288,7 @@ class ChargedParticles : public ippl::ParticleBase<PLayout> {
static IpplTimings::TimerRef tupdatePLayout = IpplTimings::getTimer("updatePB");
IpplTimings::startTimer(tupdatePLayout);
if (!isFirstRepartition) {
layout.update(*this, buffer);
this->update();
}
IpplTimings::stopTimer(tupdatePLayout);
}
Expand All @@ -317,8 +306,7 @@ class ChargedParticles : public ippl::ParticleBase<PLayout> {
orb.initialize(fl, mesh, rho_m);
}

void repartition(FieldLayout_t<Dim>& fl, Mesh_t<Dim>& mesh,
ChargedParticles<PLayout, T, Dim>& buffer, bool& isFirstRepartition) {
void repartition(FieldLayout_t<Dim>& fl, Mesh_t<Dim>& mesh, bool& isFirstRepartition) {
// Repartition the domains
bool res = orb.binaryRepartition(this->R, fl, isFirstRepartition);

Expand All @@ -327,7 +315,7 @@ class ChargedParticles : public ippl::ParticleBase<PLayout> {
return;
}
// Update
this->updateLayout(fl, mesh, buffer, isFirstRepartition);
this->updateLayout(fl, mesh, isFirstRepartition);
if constexpr (Dim == 2 || Dim == 3) {
if (stype_m == "FFT") {
std::get<FFTSolver_t<T, Dim>>(solver_m).setRhs(rho_m);
Expand Down
8 changes: 3 additions & 5 deletions alpine/LandauDamping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,6 @@ int main(int argc, char* argv[]) {

P->initializeFields(mesh, FL);

bunch_type bunchBuffer(PL);

P->initSolver();
P->time_m = 0.0;
P->loadbalancethreshold_m = std::atof(argv[arg++]);
Expand Down Expand Up @@ -244,7 +242,7 @@ int main(int argc, char* argv[]) {
Kokkos::fence();

P->initializeORB(FL, mesh);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
}

Expand Down Expand Up @@ -334,14 +332,14 @@ int main(int argc, char* argv[]) {

// Since the particles have moved spatially update them to correct processors
IpplTimings::startTimer(updateTimer);
PL.update(*P, bunchBuffer);
P->update();
IpplTimings::stopTimer(updateTimer);

// Domain Decomposition
if (P->balance(totalP, it + 1)) {
msg << "Starting repartition" << endl;
IpplTimings::startTimer(domainDecomposition);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
// IpplTimings::startTimer(dumpDataTimer);
// P->dumpLocalDomains(FL, it+1);
Expand Down
8 changes: 3 additions & 5 deletions alpine/LandauDampingMixedExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,6 @@ int main(int argc, char* argv[]) {

P->initializeFields(mesh, FL);

bunch_type bunchBuffer(PL);

P->initSolver();
P->time_m = 0.0;
P->loadbalancethreshold_m = std::atof(argv[arg++]);
Expand Down Expand Up @@ -249,7 +247,7 @@ int main(int argc, char* argv[]) {
Kokkos::fence();

P->initializeORB(FL, mesh);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
}

Expand Down Expand Up @@ -343,14 +341,14 @@ int main(int argc, char* argv[]) {

// Since the particles have moved spatially update them to correct processors
IpplTimings::startTimer(updateTimer);
PL.update(*P, bunchBuffer);
P->update();
IpplTimings::stopTimer(updateTimer);

// Domain Decomposition
if (P->balance(totalP, it + 1)) {
msg << "Starting repartition" << endl;
IpplTimings::startTimer(domainDecomposition);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
// IpplTimings::startTimer(dumpDataTimer);
// P->dumpLocalDomains(FL, it+1);
Expand Down
8 changes: 3 additions & 5 deletions alpine/LandauDampingMixedPrecision.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ int main(int argc, char* argv[]) {

P->initializeFields(mesh, FL);

bunch_type bunchBuffer(PL);

P->initSolver();
P->time_m = 0.0;
P->loadbalancethreshold_m = std::atof(argv[arg++]);
Expand Down Expand Up @@ -240,7 +238,7 @@ int main(int argc, char* argv[]) {
Kokkos::fence();

P->initializeORB(FL, mesh);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
}

Expand Down Expand Up @@ -332,14 +330,14 @@ int main(int argc, char* argv[]) {

// Since the particles have moved spatially update them to correct processors
IpplTimings::startTimer(updateTimer);
PL.update(*P, bunchBuffer);
P->update();
IpplTimings::stopTimer(updateTimer);

// Domain Decomposition
if (P->balance(totalP, it + 1)) {
msg << "Starting repartition" << endl;
IpplTimings::startTimer(domainDecomposition);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
// IpplTimings::startTimer(dumpDataTimer);
// P->dumpLocalDomains(FL, it+1);
Expand Down
8 changes: 3 additions & 5 deletions alpine/PenningTrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ int main(int argc, char* argv[]) {

P->initializeFields(mesh, FL);

bunch_type bunchBuffer(PL);

P->initSolver();
P->time_m = 0.0;
P->loadbalancethreshold_m = std::atof(argv[7]);
Expand Down Expand Up @@ -251,7 +249,7 @@ int main(int argc, char* argv[]) {
Kokkos::fence();

P->initializeORB(FL, mesh);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
}

Expand Down Expand Up @@ -362,14 +360,14 @@ int main(int argc, char* argv[]) {

// Since the particles have moved spatially update them to correct processors
IpplTimings::startTimer(updateTimer);
PL.update(*P, bunchBuffer);
P->update();
IpplTimings::stopTimer(updateTimer);

// Domain Decomposition
if (P->balance(totalP, it + 1)) {
msg << "Starting repartition" << endl;
IpplTimings::startTimer(domainDecomposition);
P->repartition(FL, mesh, bunchBuffer, isFirstRepartition);
P->repartition(FL, mesh, isFirstRepartition);
IpplTimings::stopTimer(domainDecomposition);
// IpplTimings::startTimer(dumpDataTimer);
// P->dumpLocalDomains(FL, it+1);
Expand Down
8 changes: 3 additions & 5 deletions alpine/UniformPlasmaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,8 @@ int main(int argc, char* argv[]) {

P->initializeFields(mesh, FL);

bunch_type bunchBuffer(PL);

IpplTimings::startTimer(updateTimer);
PL.update(*P, bunchBuffer);
P->update();
IpplTimings::stopTimer(updateTimer);

msg << "particles created and initial conditions assigned " << endl;
Expand Down Expand Up @@ -218,14 +216,14 @@ int main(int argc, char* argv[]) {

// Since the particles have moved spatially update them to correct processors
IpplTimings::startTimer(updateTimer);
PL.update(*P, bunchBuffer);
P->update();
IpplTimings::stopTimer(updateTimer);

// Domain Decomposition
if (P->balance(totalP, it + 1)) {
msg << "Starting repartition" << endl;
IpplTimings::startTimer(domainDecomposition);
P->repartition(FL, mesh, bunchBuffer, fromAnalyticDensity);
P->repartition(FL, mesh, fromAnalyticDensity);
IpplTimings::stopTimer(domainDecomposition);
}

Expand Down
10 changes: 6 additions & 4 deletions src/Particle/ParticleAttrib.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace ippl {
using hash_type = typename Base::hash_type;

using view_type = typename detail::ViewType<T, 1, Properties...>::view_type;

using HostMirror = typename view_type::host_mirror_type;

using memory_space = typename view_type::memory_space;
Expand All @@ -59,16 +60,16 @@ namespace ippl {
void destroy(const hash_type& deleteIndex, const hash_type& keepIndex,
size_type invalidCount) override;

void pack(void*, const hash_type&) const override;
void pack(const hash_type&) override;

void unpack(void*, size_type) override;
void unpack(size_type) override;

void serialize(detail::Archive<memory_space>& ar, size_type nsends) override {
ar.serialize(dview_m, nsends);
ar.serialize(buf_m, nsends);
}

void deserialize(detail::Archive<memory_space>& ar, size_type nrecvs) override {
ar.deserialize(dview_m, nrecvs);
ar.deserialize(buf_m, nrecvs);
}

virtual ~ParticleAttrib() = default;
Expand Down Expand Up @@ -132,6 +133,7 @@ namespace ippl {

private:
view_type dview_m;
view_type buf_m;
};
} // namespace ippl

Expand Down
18 changes: 6 additions & 12 deletions src/Particle/ParticleAttrib.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,22 @@ namespace ippl {
}

template <typename T, class... Properties>
void ParticleAttrib<T, Properties...>::pack(void* buffer, const hash_type& hash) const {
using this_type = ParticleAttrib<T, Properties...>;
this_type* buffer_p = static_cast<this_type*>(buffer);
auto& view = buffer_p->dview_m;
void ParticleAttrib<T, Properties...>::pack(const hash_type& hash) {
auto size = hash.extent(0);
if (view.extent(0) < size) {
if (buf_m.extent(0) < size) {
int overalloc = Comm->getDefaultOverallocation();
Kokkos::realloc(view, size * overalloc);
Kokkos::realloc(buf_m, size * overalloc);
}

using policy_type = Kokkos::RangePolicy<execution_space>;
Kokkos::parallel_for(
"ParticleAttrib::pack()", policy_type(0, size),
KOKKOS_CLASS_LAMBDA(const size_t i) { view(i) = dview_m(hash(i)); });
KOKKOS_CLASS_LAMBDA(const size_t i) { buf_m(i) = dview_m(hash(i)); });
Kokkos::fence();
}

template <typename T, class... Properties>
void ParticleAttrib<T, Properties...>::unpack(void* buffer, size_type nrecvs) {
using this_type = ParticleAttrib<T, Properties...>;
this_type* buffer_p = static_cast<this_type*>(buffer);
auto& view = buffer_p->dview_m;
void ParticleAttrib<T, Properties...>::unpack(size_type nrecvs) {
auto size = dview_m.extent(0);
size_type required = *(this->localNum_mp) + nrecvs;
if (size < required) {
Expand All @@ -78,7 +72,7 @@ namespace ippl {
using policy_type = Kokkos::RangePolicy<execution_space>;
Kokkos::parallel_for(
"ParticleAttrib::unpack()", policy_type(0, nrecvs),
KOKKOS_CLASS_LAMBDA(const size_t i) { dview_m(count + i) = view(i); });
KOKKOS_CLASS_LAMBDA(const size_t i) { dview_m(count + i) = buf_m(i); });
Kokkos::fence();
}

Expand Down
4 changes: 2 additions & 2 deletions src/Particle/ParticleAttribBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ namespace ippl {
virtual void destroy(const hash_type&, const hash_type&, size_type) = 0;
virtual size_type packedSize(const size_type) const = 0;

virtual void pack(void*, const hash_type&) const = 0;
virtual void pack(const hash_type&) = 0;

virtual void unpack(void*, size_type) = 0;
virtual void unpack(size_type) = 0;

virtual void serialize(Archive<memory_space>& ar, size_type nsends) = 0;

Expand Down
17 changes: 7 additions & 10 deletions src/Particle/ParticleBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,11 @@ namespace ippl {
template <typename... Properties>
void destroy(const Kokkos::View<bool*, Properties...>& invalid, const size_type destroyNum);

template <typename HashType, typename BufferType>
template <typename HashType>
void sendToRank(int rank, int tag, int sendNum, std::vector<MPI_Request>& requests,
const HashType& hash, BufferType& buffer);
const HashType& hash);

template <typename BufferType>
void recvFromRank(int rank, int tag, int recvNum, size_type nRecvs, BufferType& buffer);
void recvFromRank(int rank, int tag, int recvNum, size_type nRecvs);

/*!
* Serialize to do MPI calls.
Expand All @@ -278,23 +277,21 @@ namespace ippl {
template <typename MemorySpace>
size_type packedSize(const size_type count) const;

void update() { layout_m->update(*this); }

protected:
/*!
* Fill attributes of buffer.
* @tparam Buffer is a bunch type
* @param buffer to send
* @param hash function to access index.
*/
template <class Buffer>
void pack(Buffer& buffer, const hash_container_type& hash);
void pack(const hash_container_type& hash);

/*!
* Fill my attributes.
* @tparam Buffer is a bunch type
* @param buffer received
*/
template <class Buffer>
void unpack(Buffer& buffer, size_type nrecvs);
void unpack(size_type nrecvs);

private:
//! particle layout
Expand Down
Loading

0 comments on commit d5aa807

Please sign in to comment.