Skip to content

Commit

Permalink
add zarr support: parallel write
Browse files Browse the repository at this point in the history
  • Loading branch information
nritsche committed Apr 13, 2021
1 parent 49d0720 commit f070c2c
Showing 1 changed file with 99 additions and 0 deletions.
99 changes: 99 additions & 0 deletions caput/memh5.py
Original file line number Diff line number Diff line change
Expand Up @@ -2670,6 +2670,105 @@ def _copy_to_file(memgroup, h5group):
group.comm.Barrier()


def _distributed_group_to_zarr(
group,
fname,
mode,
hints=True,
convert_dataset_strings=False,
convert_attribute_strings=True,
**kwargs
):
"""Private routine to copy full data tree from distributed memh5 object into a Zarr file.
This paralellizes all IO."""

# == Create some internal functions for doing the read ==
# Function to perform a recursive clone of the tree structure
def _copy_to_file(memgroup, group):

# Copy over attributes
copyattrs(
memgroup.attrs, group.attrs, convert_strings=convert_attribute_strings
)

# Sort the items to ensure we insert in a consistent order across ranks
for key in sorted(memgroup):

item = memgroup[key]

# If group, create the entry and the recurse into it
if is_group(item):
new_group = group.create_group(key)
_copy_to_file(item, new_group)

# If dataset, create dataset
else:

# Check if we are in a distributed dataset
if isinstance(item, MemDatasetDistributed):

data = check_unicode(item)

# Write to file from MPIArray
data.to_hdf5(
group,
key,
chunks=item.chunks,
compressor=fileformats.Zarr.compression_kwargs(
item.compression, item.compression_opts
)["compressor"],
file_format=fileformats.Zarr,
)
dset = group[key]

if hints:
dset.attrs["__memh5_distributed_dset"] = True

# Create common dataset (collective)
else:

# Convert from unicode to bytestring
if convert_dataset_strings:
data = ensure_bytestring(item.data)
else:
data = check_unicode(item)

dset = group.create_dataset(
key,
shape=data.shape,
dtype=data.dtype,
chunks=item.chunks,
compressor=fileformats.Zarr.compression_kwargs(
item.compression, item.compression_opts
)["compressor"],
file_format=fileformats.Zarr,
)

# Write common data from rank 0
if memgroup.comm.rank == 0:
dset[:] = data

if hints:
dset.attrs["__memh5_distributed_dset"] = False

# Copy attributes over into dataset
copyattrs(
item.attrs, dset.attrs, convert_strings=convert_attribute_strings
)

# Open file on all ranks
with zarr.open_group(fname, mode) as f:
# Start recursive file write
_copy_to_file(group, f)

if hints:
f.attrs["__memh5_distributed_file"] = True

# Final synchronisation
group.comm.Barrier()


def _distributed_group_from_file(
fname,
comm=None,
Expand Down

0 comments on commit f070c2c

Please sign in to comment.