Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialization issue with distributed, h5netcdf, and fsspec (ImplicitToExplicitIndexingAdapter) #4591

Closed
rabernat opened this issue Nov 18, 2020 · 12 comments

Comments

@rabernat
Copy link
Contributor

This was originally reported by @jkingslake at pangeo-data/pangeo-datastore#116.

What happened:

I tried to open a netcdf file over http using fsspec and the h5netcdf engine and compute data using dask.distributed. It appears that our ImplicitToExplicitIndexingAdapter is [no longer?] serializable?

What you expected to happen:

Things would work. Indeed, I could swear this used to work with previous versions.

Minimal Complete Verifiable Example:

import xarray as xr
import fsspec
from dask.distributed import Client

# example needs to use distributed to reproduce the bug
client = Client()

url = 'https://storage.googleapis.com/ldeo-glaciology/bedmachine/BedMachineAntarctica_2019-11-05_v01.nc'  
with  fsspec.open(url, mode='rb')  as openfile:  
    dsc = xr.open_dataset(openfile, chunks=3000)
dsc.surface.mean().compute()

raises the following error

Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/core.py", line 50, in dumps
    data = {
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/core.py", line 51, in <dictcomp>
    key: serialize(
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 277, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type ImplicitToExplicitIndexingAdapter.', 'ImplicitToExplicitIndexingAdapter(array=CopyOnWriteArray(array=LazilyOuterIndexedArray(array=<xarray.backends.h5netcdf_.H5NetCDFArrayWrapper object at 0x7ff8e3988540>, key=BasicIndexer((slice(None, None, None), slice(None, None, None))))))')
distributed.comm.utils - ERROR - ('Could not serialize object of type ImplicitToExplicitIndexingAdapter.', 'ImplicitToExplicitIndexingAdapter(array=CopyOnWriteArray(array=LazilyOuterIndexedArray(array=<xarray.backends.h5netcdf_.H5NetCDFArrayWrapper object at 0x7ff8e3988540>, key=BasicIndexer((slice(None, None, None), slice(None, None, None))))))')

Anything else we need to know?:

One can work around this by using the netcdf4 library's new and undocumented ability to open files over http.

url = 'https://storage.googleapis.com/ldeo-glaciology/bedmachine/BedMachineAntarctica_2019-11-05_v01.nc#mode=bytes'  
ds = xr.open_dataset(url, engine='netcdf4', chunks=3000)
ds

However, the fsspec + h5netcdf path should work!

Environment:

Output of xr.show_versions()
INSTALLED VERSIONS
------------------
commit: None
python: 3.8.6 | packaged by conda-forge | (default, Oct  7 2020, 19:08:05) 
[GCC 7.5.0]
python-bits: 64
OS: Linux
OS-release: 4.19.112+
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: C.UTF-8
LANG: C.UTF-8
LOCALE: en_US.UTF-8
libhdf5: 1.10.6
libnetcdf: 4.7.4

xarray: 0.16.1
pandas: 1.1.3
numpy: 1.19.2
scipy: 1.5.2
netCDF4: 1.5.4
pydap: installed
h5netcdf: 0.8.1
h5py: 2.10.0
Nio: None
zarr: 2.4.0
cftime: 1.2.1
nc_time_axis: 1.2.0
PseudoNetCDF: None
rasterio: 1.1.7
cfgrib: 0.9.8.4
iris: None
bottleneck: 1.3.2
dask: 2.30.0
distributed: 2.30.0
matplotlib: 3.3.2
cartopy: 0.18.0
seaborn: None
numbagg: None
pint: 0.16.1
setuptools: 49.6.0.post20201009
pip: 20.2.4
conda: None
pytest: 6.1.1
IPython: 7.18.1
sphinx: 3.2.1

Also fsspec 0.8.4

cc @martindurant for fsspec integration.

@rabernat
Copy link
Contributor Author

I finally found a permutation that works, which makes me think this is an fsspec error.

import gcsfs

gcs = gcsfs.GCSFileSystem()
url = 'gs://ldeo-glaciology/bedmachine/BedMachineAntarctica_2019-11-05_v01.nc'  
openfile = gcs.open(url, mode='rb') 
dsgcs = xr.open_dataset(openfile, chunks=3000)
dsgcs.surface.mean().compute()

@martindurant
Copy link
Contributor

I don't think it's fsspec, the HTTPFileSystem and file objects are known to serialise.

However

>>> distributed.protocol.serialize(dsc.surface.mean().data.dask['open_dataset-27832a1f850736a8d9a11a882ad06230surface-3b6f5b6a90c2cfa65379d3bfae22126f'])
({'serializer': 'error'}, ...)

(that's one of the keys I picked from the graph at random, your keys may differ)
I can't say why this object is in the graph where perhaps it wasn't before, but it has a reference to a "CopyOnWriteArray", which sounds like a buffer owned by something else and probably the non-serializable part. Digging find a contained "<xarray.backends.h5netcdf_.H5NetCDFArrayWrapper at 0x17e669ad0>" which is not serializable - so maybe xarray can do something about this.

@rabernat
Copy link
Contributor Author

Can you figure out how the http version differs from the gcs version? That might hold a clue.

@martindurant
Copy link
Contributor

OK, I can see a thing after all... please stand by

martindurant pushed a commit to martindurant/filesystem_spec that referenced this issue Nov 18, 2020
@shoyer
Copy link
Member

shoyer commented Nov 18, 2020

H5NetCDFArrayWrapper is definitely supposed to be serializable with dask -- that's one of main reasons why these array wrapper classes exist in the first place.

@martindurant
Copy link
Contributor

The xarray.backends.h5netcdf_.H5NetCDFArrayWrapper seems to keep a reference to the open file, which for HTTP contains the open session. The linked PR fixes the serialization of those files, for the HTTP case.

@rabernat
Copy link
Contributor Author

Thanks for your quick response to this Martin!

@shoyer
Copy link
Member

shoyer commented Nov 18, 2020

OK, I think I understand what's going on. Xarray serializes arguments that should suffice to recreate/open a backend-specific file object (e.g., h5netcdf.File). So if you pass in a file name to open_dataset(), that works fine. But if you pass in a file-like object (as is done here with fsspec) the file-like object needs to be serializable.

@rabernat
Copy link
Contributor Author

This is fixed by fsspec/filesystem_spec#477.

However, the existence of this issue points to the need for more ecosystem-wide integration testing of xarray / dask / zarr / fsspec. I know we discussed this is on some other issue, but I can't find it...

@amatsukawa
Copy link
Contributor

amatsukawa commented Jun 29, 2021

This issue appears to be back in some form, with engine=zarr.

The code looks like this, using fsspec's mapper API to access Azure blob store:

fs = fsspec.filesystem("az://...")
ds = xr.open_dataset(fs.get_mapper(path), engine="zarr", chunks="auto"):
   ...

I have not tracked down a self-contained reproducer, as it only fails for one call but not others of a similar form. Reporting it while I dig into it further, in case you have any suggestions.

[2021-06-29 00:44:47] [2021-06-29 00:44:47 core.py:74 CRITICAL] Failed to Serialize
[2021-06-29 00:44:47] Traceback (most recent call last):
[2021-06-29 00:44:47]   File "/deps/envs/deps/lib/python3.7/site-packages/distributed/protocol/core.py", line 70, in dumps
[2021-06-29 00:44:47]     frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
[2021-06-29 00:44:47]   File "/deps/envs/deps/lib/python3.7/site-packages/msgpack/__init__.py", line 35, in packb
[2021-06-29 00:44:47]     return Packer(**kwargs).pack(o)
[2021-06-29 00:44:47]   File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
[2021-06-29 00:44:47]   File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
[2021-06-29 00:44:47]   File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
[2021-06-29 00:44:47]   File "msgpack/_packer.pyx", line 258, in msgpack._cmsgpack.Packer._pack
[2021-06-29 00:44:47]   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
[2021-06-29 00:44:47]   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
[2021-06-29 00:44:47]   File "msgpack/_packer.pyx", line 258, in msgpack._cmsgpack.Packer._pack
[2021-06-29 00:44:47]   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
[2021-06-29 00:44:47]   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
[2021-06-29 00:44:47]   File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
[2021-06-29 00:44:47]   File "msgpack/_packer.pyx", line 279, in msgpack._cmsgpack.Packer._pack
[2021-06-29 00:44:47]   File "/deps/envs/deps/lib/python3.7/site-packages/distributed/protocol/core.py", line 56, in _encode_default
[2021-06-29 00:44:47]     obj, serializers=serializers, on_error=on_error, context=context
[2021-06-29 00:44:47]   File "/deps/envs/deps/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 422, in serialize_and_split
[2021-06-29 00:44:47]     header, frames = serialize(x, serializers, on_error, context)
[2021-06-29 00:44:47]   File "/deps/envs/deps/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 256, in serialize
[2021-06-29 00:44:47]     iterate_collection=True,
[2021-06-29 00:44:47]   File "/deps/envs/deps/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 348, in serialize
[2021-06-29 00:44:47]     raise TypeError(msg, str(x)[:10000])
[2021-06-29 00:44:47] TypeError: ('Could not serialize object of type ImplicitToExplicitIndexingAdapter.', 'ImplicitToExplicitIndexingAdapter(array=CopyOnWriteArray(array=LazilyIndexedArray(array=<xarray.backends.zarr.ZarrArrayWrapper object at 0x7f52dedbb690>, key=BasicIndexer((slice(None, None, None), slice(None, None, None))))))')
[2021-06-29 00:44:47] [2021-06-29 00:44:47 utils.py:37 ERROR] ('Could not serialize object of type ImplicitToExplicitIndexingAdapter.', 'ImplicitToExplicitIndexingAdapter(array=CopyOnWriteArray(array=LazilyIndexedArray(array=<xarray.backends.zarr.ZarrArrayWrapper object at 0x7f52dedbb690>, key=BasicIndexer((slice(None, None, None), slice(None, None, None))))))')
pip list | grep 'dask\|distributed\|xarray\|zarr\|msgpack\|adlfs'
adlfs                    0.7.7
dask                     2021.6.2
distributed              2021.6.2
msgpack                  1.0.0
xarray                   0.18.2
zarr                     2.8.3

@martindurant
Copy link
Contributor

I only have vague thoughts.

To be sure: you can pickle the file-system, any mapper (.get_mapper()) and any open file (.open()), right?

The question here is, why msgpack is being invoked. Those items, as well as any internal xarray stuff should only be in tasks, and so pickled. Is there a high-level-graph layer encapsulating things that were previously pickled? The only things that appear in any HLG-layer should be the paths and storage options needed to open a file-system, not the file-system itself.

@amatsukawa
Copy link
Contributor

I am trying to use worker_client that is opening xarrays, submitting further compute, and then saving xarrays. Perhaps somehow related to that?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants