Skip to content

Commit

Permalink
zarr support: parallel write
Browse files Browse the repository at this point in the history
  • Loading branch information
nritsche committed Apr 13, 2021
1 parent bbeca61 commit 8b6656c
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 43 deletions.
7 changes: 3 additions & 4 deletions caput/memh5.py
Original file line number Diff line number Diff line change
Expand Up @@ -2711,13 +2711,12 @@ def _copy_to_file(memgroup, group):
data = check_unicode(item)

# Write to file from MPIArray
data.to_hdf5(
data.to_file(
group,
key,
chunks=item.chunks,
compressor=fileformats.Zarr.compression_kwargs(
item.compression, item.compression_opts
)["compressor"],
compression=item.compression,
compression_opts=item.compression_opts,
file_format=fileformats.Zarr,
)
dset = group[key]
Expand Down
4 changes: 3 additions & 1 deletion caput/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ def open_h5py_mpi(f, mode, use_mpi=True, comm=None):
fh = f
fh.opened = False
else:
raise ValueError("Did not receive a h5py.File or filename")
raise ValueError(
f"Can't write to {f} (Expected a h5py.File, h5py.Group or str filename)."
)

fh.is_mpi = fh.file.driver == "mpio"

Expand Down
124 changes: 86 additions & 38 deletions caput/mpiarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@

import numpy as np

from caput import mpiutil, misc
from caput import fileformats, mpiutil, misc


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -708,82 +708,130 @@ def to_hdf5(
compression=None,
compression_opts=None,
):
"""Parallel write into a contiguous HDF5 dataset.
self.to_file(
f,
dataset,
create,
chunks,
compression,
compression_opts,
file_format=fileformats.HDF5,
)

def to_file(
self,
f,
dataset,
create=False,
chunks=None,
compression=None,
compression_opts=None,
file_format=fileformats.HDF5,
):
"""Parallel write into a contiguous HDF5/Zarr dataset.
Parameters
----------
filename : str, h5py.File or h5py.Group
f : str, h5py.File, h5py.Group or zarr.Group
File to write dataset into.
dataset : string
Name of dataset to write into. Should not exist.
"""
if file_format == fileformats.HDF5:
import h5py

if not h5py.get_config().mpi:
if isinstance(f, str):
self._to_hdf5_serial(f, dataset, create)
return
else:
raise ValueError(
"Argument must be a filename if h5py does not have MPI support"
)

import h5py
mode = "a" if create else "r+"

if file_format == fileformats.HDF5:
fh = misc.open_h5py_mpi(f, mode, self.comm)

# Check that there are no null slices, otherwise we need to turn off
# collective IO to work around an h5py issue (#965)
no_null_slices = self.global_shape[self.axis] >= self.comm.size

# Only use collective IO if:
# - there are no null slices (h5py bug)
# - we are not distributed over axis=0 as there is no advantage for
# collective IO which is usually slow
# - unless we want to use compression/chunking
# TODO: change if h5py bug fixed
# https://github.com/h5py/h5py/issues/965
# TODO: better would be a test on contiguous IO size
use_collective = (
fh.is_mpi
and no_null_slices
and (self.axis > 0 or compression is not None)
)

if fh.is_mpi and not use_collective:
# Need to disable compression if we can't use collective IO
chunks, compression, compression_opts = None, None, None
else:
import zarr

if not h5py.get_config().mpi:
if isinstance(f, str):
self._to_hdf5_serial(f, dataset, create)
return
fh = file_format.open(f, mode, path=dataset)
elif isinstance(f, zarr.Group):
fh = f
else:
raise ValueError(
"Argument must be a filename if h5py does not have MPI support"
f"Can't write to {f} (Expected a zarr.Group or str filename)."
)

mode = "a" if create else "r+"

fh = misc.open_h5py_mpi(f, mode, self.comm)

start = self.local_offset[self.axis]
end = start + self.local_shape[self.axis]

# Construct slices for axis
sel = ([slice(None, None)] * self.axis) + [slice(start, end)]
sel = _expand_sel(sel, self.ndim)

# Check that there are no null slices, otherwise we need to turn off
# collective IO to work around an h5py issue (#965)
no_null_slices = self.global_shape[self.axis] >= self.comm.size

# Split the axis to get the IO size under ~2GB (only if MPI-IO)
split_axis, partitions = self._partition_io(skip=(not fh.is_mpi))

# Only use collective IO if:
# - there are no null slices (h5py bug)
# - we are not distributed over axis=0 as there is no advantage for
# collective IO which is usually slow
# - unless we want to use compression/chunking
# TODO: change if h5py bug fixed
# TODO: better would be a test on contiguous IO size
use_collective = (
fh.is_mpi and no_null_slices and (self.axis > 0 or compression is not None)
split_axis, partitions = self._partition_io(
skip=(file_format == fileformats.HDF5 and not fh.is_mpi)
)

if fh.is_mpi and not use_collective:
# Need to disable compression if we can't use collective IO
chunks, compression, compression_opts = None, None, None
compression_kwargs = file_format.compression_kwargs(
compression=compression,
compression_opts=compression_opts,
)

dset = fh.create_dataset(
dataset,
shape=self.global_shape,
dtype=self.dtype,
chunks=chunks,
compression=compression,
compression_opts=compression_opts,
**compression_kwargs,
)

# Read using collective MPI-IO if specified
with dset.collective if use_collective else DummyContext():
if file_format == fileformats.HDF5:
# Read using collective MPI-IO if specified
with dset.collective if use_collective else DummyContext():

# Loop over partitions of the IO and perform them
# Loop over partitions of the IO and perform them
for part in partitions:
islice, fslice = _partition_sel(
sel, split_axis, self.global_shape[split_axis], part
)
dset[islice] = self[fslice]

if fh.opened:
fh.close()
else:
for part in partitions:
islice, fslice = _partition_sel(
sel, split_axis, self.global_shape[split_axis], part
)
dset[islice] = self[fslice]

if fh.opened:
fh.close()

def transpose(self, *axes):
"""Transpose the array axes.
Expand Down

0 comments on commit 8b6656c

Please sign in to comment.