diff --git a/caput/misc.py b/caput/misc.py index 89a4b728..6ff5ac47 100644 --- a/caput/misc.py +++ b/caput/misc.py @@ -61,6 +61,7 @@ def __call__(self, *args, **kwargs): return arr def __get__(self, obj, type=None): + # As a descriptor, this gets called whenever this is used to wrap a # function, and simply binds it to the instance @@ -73,7 +74,7 @@ def __get__(self, obj, type=None): return _vectorize_desc -def open_h5py_mpi(f, mode, comm=None): +def open_h5py_mpi(f, mode, use_mpi=True, comm=None): """Ensure that we have an h5py File object. Opens with MPI-IO if possible. @@ -89,6 +90,8 @@ def open_h5py_mpi(f, mode, comm=None): is just returned as is. mode : string Mode to open file in. + use_mpi : bool, optional + Whether to use MPI-IO or not (default True) comm : mpi4py.Comm, optional MPI communicator to use. Uses `COMM_WORLD` if not set. @@ -103,7 +106,7 @@ def open_h5py_mpi(f, mode, comm=None): if isinstance(f, basestring): # Open using MPI-IO if we can - if has_mpi: + if has_mpi and use_mpi: from mpi4py import MPI comm = comm if comm is not None else MPI.COMM_WORLD fh = h5py.File(f, mode, driver='mpio', comm=comm) diff --git a/caput/mpiarray.py b/caput/mpiarray.py index 29be299d..ea9720d3 100644 --- a/caput/mpiarray.py +++ b/caput/mpiarray.py @@ -110,11 +110,16 @@ from past.builtins import basestring import os import time +import logging + import numpy as np from caput import mpiutil, misc +logger = logging.getLogger(__name__) + + class _global_resolver(object): # Private class implementing the global sampling for MPIArray @@ -568,7 +573,14 @@ def from_hdf5(cls, f, dataset, comm=None, axis=0, sel=None): ------- array : MPIArray """ - fh = misc.open_h5py_mpi(f, 'r', comm) + from mpi4py import MPI + + # Don't both using MPI where the axis is not zero. It's probably just slower. + # TODO: with tuning this might not be true. Keep an eye on this. + use_mpi = (axis > 0) + + # Read the file. Opening with MPI if requested, and we can + fh = misc.open_h5py_mpi(f, 'r', use_mpi=use_mpi, comm=comm) dset = fh[dataset] dshape = dset.shape # Shape of the underlying dataset @@ -577,22 +589,17 @@ def from_hdf5(cls, f, dataset, comm=None, axis=0, sel=None): # Check that the axis is valid and wrap to an actual position if axis < -naxis or axis >= naxis: - raise ValueError("Distributed axis %i not in range (%i, %i)" % (axis, -naxis, naxis-1)) + raise ValueError("Distributed axis %i not in range (%i, %i)" % + (axis, -naxis, naxis-1)) axis = naxis + axis if axis < 0 else axis # Ensure sel is defined to cover all axes - if sel is None: - sel = [slice(None)] * naxis - if len(sel) < naxis: - sel = list(sel) + [slice(None)] * (naxis - len(sel)) - sel = list(sel) + sel = _expand_sel(sel, naxis) # Figure out the final array size and create it gshape = [] for l, sl in zip(dshape, sel): - start, end, stride = sl.indices(l) - n = 1 + (end - start - 1) // stride - gshape.append(n) + gshape.append(_len_slice(sl, l)) dist_arr = cls(gshape, axis=axis, comm=comm, dtype=dtype) # Get the local start and end indices @@ -600,20 +607,32 @@ def from_hdf5(cls, f, dataset, comm=None, axis=0, sel=None): lend = lstart + dist_arr.local_shape[axis] # Create the slice object into the dataset by resolving the rank's slice on the sel - dstart, dend, dstride = sel[axis].indices(dshape[axis]) - sel[axis] = slice(dstart + lstart * dstride, dstart + lend * dstride, dstride) + sel[axis] = _reslice(sel[axis], dshape[axis], slice(lstart, lend)) sel = tuple(sel) + # Split the axis to get the IO size under ~2GB (only if MPI-IO) + split_axis, partitions = dist_arr._partition_io(skip=(not fh.is_mpi)) + # Check that there are no null slices, otherwise we need to turn off # collective IO to work around an h5py issue (#965) no_null_slices = dist_arr.global_shape[axis] >= dist_arr.comm.size - # Read using MPI-IO if possible - if fh.is_mpi and no_null_slices: - with dset.collective: - dist_arr[:] = dset[sel] - else: - dist_arr[:] = dset[sel] + # Only use collective IO if: + # - there are no null slices (h5py bug) + # - we are not distributed over axis=0 as there is no advantage for + # collective IO which is usually slow + # TODO: change if h5py bug fixed + # TODO: better would be a test on contiguous IO size + use_collective = fh.is_mpi and no_null_slices and axis > 0 + + # Read using collective MPI-IO if specified + with dset.collective if use_collective else DummyContext(): + + # Loop over partitions of the IO and perform them + for part in partitions: + islice, fslice = _partition_sel(sel, split_axis, + gshape[split_axis], part) + dist_arr[fslice] = dset[islice] if fh.opened: fh.close() @@ -654,19 +673,33 @@ def to_hdf5(self, f, dataset, create=False): end = start + self.local_shape[self.axis] # Construct slices for axis - sl = [slice(None, None)] * self.axis - sl += [slice(start, end)] - sl = tuple(sl) + sel = ([slice(None, None)] * self.axis) + [slice(start, end)] + sel = _expand_sel(sel, self.ndim) # Check that there are no null slices, otherwise we need to turn off # collective IO to work around an h5py issue (#965) no_null_slices = self.global_shape[self.axis] >= self.comm.size - if fh.is_mpi and no_null_slices: - with dset.collective: - dset[sl] = self[:] - else: - dset[sl] = self[:] + # Split the axis to get the IO size under ~2GB (only if MPI-IO) + split_axis, partitions = self._partition_io(skip=(not fh.is_mpi)) + + # Only use collective IO if: + # - there are no null slices (h5py bug) + # - we are not distributed over axis=0 as there is no advantage for + # collective IO which is usually slow + # TODO: change if h5py bug fixed + # TODO: better would be a test on contiguous IO size + use_collective = fh.is_mpi and no_null_slices and self.axis > 0 + + # Read using collective MPI-IO if specified + with dset.collective if use_collective else DummyContext(): + + # Loop over partitions of the IO and perform them + for part in partitions: + islice, fslice = _partition_sel( + sel, split_axis, self.global_shape[split_axis], part + ) + dset[islice] = self[fslice] if fh.opened: fh.close() @@ -807,3 +840,111 @@ def _to_hdf5_serial(self, filename, dataset, create=False): fh[dataset][start:end] = dist_arr dist_arr.comm.Barrier() + + def _partition_io(self, skip=False, threshold=1.99): + """Split IO of this array into local sections under `threshold`. + + Parameters + ---------- + skip : bool, optional + Don't partition, just find and return a full axis. + threshold : float, optional + Maximum size of IO (in GB). + + Returns + ------- + split_axis : int + Which axis are we going to split along. + partitions : list of slice objects + List of slices. + """ + from mpi4py import MPI + + threshold_bytes = threshold * 2**30 + largest_size = self.comm.allreduce(self.nbytes, op=MPI.MAX) + num_split = int(np.ceil(largest_size / threshold_bytes)) + + # Return early if we can + if skip or num_split == 1: + return 0, [slice(0, self.local_shape[0])] + + if self.ndim == 1: + raise RuntimeError("To parition an array we must have multiple axes.") + + # Try and find the axis to split over + for split_axis in range(self.ndim): + if split_axis != self.axis and self.global_shape[split_axis] >= num_split: + break + else: + raise RuntimeError( + "Can't identify an IO partition less than %.2f GB in size: " + "shape=%s, distributed axis=%i" % (threshold, self.global_shape, self.axis) + ) + + logger.debug("Splitting along axis %i, %i ways", split_axis, num_split) + + # Figure out the start and end of the splits and return + nums, starts, ends = mpiutil.split_m(self.global_shape[split_axis], num_split) + + slices = [slice(start, end) for start, end in zip(starts, ends)] + return split_axis, slices + + + +def _partition_sel(sel, split_axis, n, slice_): + # Take a selection (a tuple of slices) and re-slice along the split_axis + # (which has length n) + # + # Returns the new selections for the initial (pre-selection) space and the final + # (post-selection) space. + + l = _len_slice(sel[split_axis], n) + + # Reconstruct the slice for the split axis + slice_init = _reslice(sel[split_axis], n, slice_) + + # Construct the final selection + sel_final = [slice(None)] * len(sel) + sel_final[split_axis] = slice_ + + # Construct the initial selection + sel_initial = list(sel) + sel_initial[split_axis] = slice_init + + return tuple(sel_initial), tuple(sel_final) + + +def _len_slice(slice_, n): + # Calculate the output length of a slice applied to an axis of length n + start, stop, step = slice_.indices(n) + return 1 + (stop - start - 1) // step + + +def _reslice(slice_, n, subslice): + # For a slice along an axis of length n, return the slice that would select the + # start:end elements of the final array + dstart, dstop, dstep = slice_.indices(n) + + if subslice.step is not None and subslice.step > 1: + raise ValueError("stride > 1 not supported. subslice: %s" % subslice) + + return slice(dstart + subslice.start * dstep, + dstart + subslice.stop * dstep, dstep) + +def _expand_sel(sel, naxis): + # Expand the selection to the full dimensions + if sel is None: + sel = [slice(None)] * naxis + if len(sel) < naxis: + sel = list(sel) + [slice(None)] * (naxis - len(sel)) + return list(sel) + + +class DummyContext(object): + """A completely dummy context manager.""" + + def __enter__(self): + return self + + def __exit__(self, *exc): + return False