Skip to content

Commit

Permalink
fix: partition parallel IO to be under the 2GB HDF5 limit
Browse files Browse the repository at this point in the history
This includes a large refactoring of the mpiarray IO code to break it up
and add the ability to split IO into a chunks to ensure that they fit
within HDF5's limits. Additionally collective IO is turned off for
writes along axis=0, and parallel IO entirely for reads along axis=0 to
improve IO performance.

fix: fixed up logic
  • Loading branch information
jrs65 committed Jun 20, 2019
1 parent 2462a2f commit bee591e
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 28 deletions.
7 changes: 5 additions & 2 deletions caput/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __call__(self, *args, **kwargs):
return arr

def __get__(self, obj, type=None):

# As a descriptor, this gets called whenever this is used to wrap a
# function, and simply binds it to the instance

Expand All @@ -73,7 +74,7 @@ def __get__(self, obj, type=None):
return _vectorize_desc


def open_h5py_mpi(f, mode, comm=None):
def open_h5py_mpi(f, mode, use_mpi=True, comm=None):
"""Ensure that we have an h5py File object.
Opens with MPI-IO if possible.
Expand All @@ -89,6 +90,8 @@ def open_h5py_mpi(f, mode, comm=None):
is just returned as is.
mode : string
Mode to open file in.
use_mpi : bool, optional
Whether to use MPI-IO or not (default True)
comm : mpi4py.Comm, optional
MPI communicator to use. Uses `COMM_WORLD` if not set.
Expand All @@ -103,7 +106,7 @@ def open_h5py_mpi(f, mode, comm=None):

if isinstance(f, basestring):
# Open using MPI-IO if we can
if has_mpi:
if has_mpi and use_mpi:
from mpi4py import MPI
comm = comm if comm is not None else MPI.COMM_WORLD
fh = h5py.File(f, mode, driver='mpio', comm=comm)
Expand Down
193 changes: 167 additions & 26 deletions caput/mpiarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,16 @@
from past.builtins import basestring
import os
import time
import logging

import numpy as np

from caput import mpiutil, misc


logger = logging.getLogger(__name__)


class _global_resolver(object):
# Private class implementing the global sampling for MPIArray

Expand Down Expand Up @@ -568,7 +573,14 @@ def from_hdf5(cls, f, dataset, comm=None, axis=0, sel=None):
-------
array : MPIArray
"""
fh = misc.open_h5py_mpi(f, 'r', comm)
from mpi4py import MPI

# Don't both using MPI where the axis is not zero. It's probably just slower.
# TODO: with tuning this might not be true. Keep an eye on this.
use_mpi = (axis > 0)

# Read the file. Opening with MPI if requested, and we can
fh = misc.open_h5py_mpi(f, 'r', use_mpi=use_mpi, comm=comm)

dset = fh[dataset]
dshape = dset.shape # Shape of the underlying dataset
Expand All @@ -577,43 +589,50 @@ def from_hdf5(cls, f, dataset, comm=None, axis=0, sel=None):

# Check that the axis is valid and wrap to an actual position
if axis < -naxis or axis >= naxis:
raise ValueError("Distributed axis %i not in range (%i, %i)" % (axis, -naxis, naxis-1))
raise ValueError("Distributed axis %i not in range (%i, %i)" %
(axis, -naxis, naxis-1))
axis = naxis + axis if axis < 0 else axis

# Ensure sel is defined to cover all axes
if sel is None:
sel = [slice(None)] * naxis
if len(sel) < naxis:
sel = list(sel) + [slice(None)] * (naxis - len(sel))
sel = list(sel)
sel = _expand_sel(sel, naxis)

# Figure out the final array size and create it
gshape = []
for l, sl in zip(dshape, sel):
start, end, stride = sl.indices(l)
n = 1 + (end - start - 1) // stride
gshape.append(n)
gshape.append(_len_slice(sl, l))
dist_arr = cls(gshape, axis=axis, comm=comm, dtype=dtype)

# Get the local start and end indices
lstart = dist_arr.local_offset[axis]
lend = lstart + dist_arr.local_shape[axis]

# Create the slice object into the dataset by resolving the rank's slice on the sel
dstart, dend, dstride = sel[axis].indices(dshape[axis])
sel[axis] = slice(dstart + lstart * dstride, dstart + lend * dstride, dstride)
sel[axis] = _reslice(sel[axis], dshape[axis], slice(lstart, lend))
sel = tuple(sel)

# Split the axis to get the IO size under ~2GB (only if MPI-IO)
split_axis, partitions = dist_arr._partition_io(skip=(not fh.is_mpi))

# Check that there are no null slices, otherwise we need to turn off
# collective IO to work around an h5py issue (#965)
no_null_slices = dist_arr.global_shape[axis] >= dist_arr.comm.size

# Read using MPI-IO if possible
if fh.is_mpi and no_null_slices:
with dset.collective:
dist_arr[:] = dset[sel]
else:
dist_arr[:] = dset[sel]
# Only use collective IO if:
# - there are no null slices (h5py bug)
# - we are not distributed over axis=0 as there is no advantage for
# collective IO which is usually slow
# TODO: change if h5py bug fixed
# TODO: better would be a test on contiguous IO size
use_collective = fh.is_mpi and no_null_slices and axis > 0

# Read using collective MPI-IO if specified
with dset.collective if use_collective else DummyContext():

# Loop over partitions of the IO and perform them
for part in partitions:
islice, fslice = _partition_sel(sel, split_axis,
gshape[split_axis], part)
dist_arr[fslice] = dset[islice]

if fh.opened:
fh.close()
Expand Down Expand Up @@ -654,19 +673,33 @@ def to_hdf5(self, f, dataset, create=False):
end = start + self.local_shape[self.axis]

# Construct slices for axis
sl = [slice(None, None)] * self.axis
sl += [slice(start, end)]
sl = tuple(sl)
sel = ([slice(None, None)] * self.axis) + [slice(start, end)]
sel = _expand_sel(sel, self.ndim)

# Check that there are no null slices, otherwise we need to turn off
# collective IO to work around an h5py issue (#965)
no_null_slices = self.global_shape[self.axis] >= self.comm.size

if fh.is_mpi and no_null_slices:
with dset.collective:
dset[sl] = self[:]
else:
dset[sl] = self[:]
# Split the axis to get the IO size under ~2GB (only if MPI-IO)
split_axis, partitions = self._partition_io(skip=(not fh.is_mpi))

# Only use collective IO if:
# - there are no null slices (h5py bug)
# - we are not distributed over axis=0 as there is no advantage for
# collective IO which is usually slow
# TODO: change if h5py bug fixed
# TODO: better would be a test on contiguous IO size
use_collective = fh.is_mpi and no_null_slices and self.axis > 0

# Read using collective MPI-IO if specified
with dset.collective if use_collective else DummyContext():

# Loop over partitions of the IO and perform them
for part in partitions:
islice, fslice = _partition_sel(
sel, split_axis, self.global_shape[split_axis], part
)
dset[islice] = self[fslice]

if fh.opened:
fh.close()
Expand Down Expand Up @@ -807,3 +840,111 @@ def _to_hdf5_serial(self, filename, dataset, create=False):
fh[dataset][start:end] = dist_arr

dist_arr.comm.Barrier()

def _partition_io(self, skip=False, threshold=1.99):
"""Split IO of this array into local sections under `threshold`.
Parameters
----------
skip : bool, optional
Don't partition, just find and return a full axis.
threshold : float, optional
Maximum size of IO (in GB).
Returns
-------
split_axis : int
Which axis are we going to split along.
partitions : list of slice objects
List of slices.
"""
from mpi4py import MPI

threshold_bytes = threshold * 2**30
largest_size = self.comm.allreduce(self.nbytes, op=MPI.MAX)
num_split = int(np.ceil(largest_size / threshold_bytes))

# Return early if we can
if skip or num_split == 1:
return 0, [slice(0, self.local_shape[0])]

if self.ndim == 1:
raise RuntimeError("To parition an array we must have multiple axes.")

# Try and find the axis to split over
for split_axis in range(self.ndim):
if split_axis != self.axis and self.global_shape[split_axis] >= num_split:
break
else:
raise RuntimeError(
"Can't identify an IO partition less than %.2f GB in size: "
"shape=%s, distributed axis=%i" % (threshold, self.global_shape, self.axis)
)

logger.debug("Splitting along axis %i, %i ways", split_axis, num_split)

# Figure out the start and end of the splits and return
nums, starts, ends = mpiutil.split_m(self.global_shape[split_axis], num_split)

slices = [slice(start, end) for start, end in zip(starts, ends)]
return split_axis, slices



def _partition_sel(sel, split_axis, n, slice_):
# Take a selection (a tuple of slices) and re-slice along the split_axis
# (which has length n)
#
# Returns the new selections for the initial (pre-selection) space and the final
# (post-selection) space.

l = _len_slice(sel[split_axis], n)

# Reconstruct the slice for the split axis
slice_init = _reslice(sel[split_axis], n, slice_)

# Construct the final selection
sel_final = [slice(None)] * len(sel)
sel_final[split_axis] = slice_

# Construct the initial selection
sel_initial = list(sel)
sel_initial[split_axis] = slice_init

return tuple(sel_initial), tuple(sel_final)


def _len_slice(slice_, n):
# Calculate the output length of a slice applied to an axis of length n
start, stop, step = slice_.indices(n)
return 1 + (stop - start - 1) // step


def _reslice(slice_, n, subslice):
# For a slice along an axis of length n, return the slice that would select the
# start:end elements of the final array
dstart, dstop, dstep = slice_.indices(n)

if subslice.step is not None and subslice.step > 1:
raise ValueError("stride > 1 not supported. subslice: %s" % subslice)

return slice(dstart + subslice.start * dstep,
dstart + subslice.stop * dstep, dstep)

def _expand_sel(sel, naxis):
# Expand the selection to the full dimensions
if sel is None:
sel = [slice(None)] * naxis
if len(sel) < naxis:
sel = list(sel) + [slice(None)] * (naxis - len(sel))
return list(sel)


class DummyContext(object):
"""A completely dummy context manager."""

def __enter__(self):
return self

def __exit__(self, *exc):
return False

0 comments on commit bee591e

Please sign in to comment.