Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem running ds.to_zarr on a dask cluster #116

Closed
jkingslake opened this issue Nov 17, 2020 · 25 comments
Closed

Problem running ds.to_zarr on a dask cluster #116

jkingslake opened this issue Nov 17, 2020 · 25 comments
Labels

Comments

@jkingslake
Copy link

@porterdf and I are trying to setup a simple netcdf-to-zarr workflow, similar to the many useful examples in #8

We are successfully loading data from a netcdf and writing it to zarr in a google bucket using this notebook in us-central1-b.gcp.pangeo.io.

But we need to get .to_zarr running in a dask cluster to avoid memory issues we are running in to. It appears from the examples in #8 that starting a cluster should simply lead to the .to_zarr operation running in the cluster, but I must me missing something, as this doesn't seem to be how it behaves.

Any help on what I assume is an embarrassingly simple for this community, would be much appreciated!

@charlesbluca
Copy link
Member

Thanks for bringing this up!

I don't have much experience with @rabernat's workflow for authentication with a Dask cluster, but since the main error in the notebook occurs when trying to get tokens using gcfs_auth.tokens(...), have you tried bypassing this step and creating a GCSMap directly from the first GCSFileSystem you create?

gcfs_auth = gcsfs.GCSFileSystem(project='ldeo-glaciology', token='../secrets/ldeo-glaciology-bc97b12df06b.json')
gcsmap = gcfs_auth.get_mapper('gs://ldeo-glaciology/temp/bm_small4.zarr')
ds.to_zarr(gcsmap)

If that doesn't help, this issue might get more exposure if posted on Pangeo's Discourse forum or Gitter chat.

@jkingslake
Copy link
Author

Thanks for this!!
Really appreciate you going in and looking at the code.

Your suggestion works - in the same way as the following from a cell further down the notebook works:

bm_mapper = fsspec.get_mapper('gs://ldeo-glaciology/bedmachine/bm.zarr', mode='ab',
                            token='../secrets/ldeo-glaciology-bc97b12df06b.json')
bm.to_zarr(bm_mapper, mode='w'); 

but unfortunately this doesn't lead to the cluster doing the .to_zarr step.

My thought was that @rabernat's suggestion here was to get a mapper that give the cluster control, but I don't understand gcsfs enough to know how this works (and I have not found the right documentation yet).

Thanks again for your help!

@rabernat
Copy link
Member

Hi Jonny! 😊

The problem is that the dask workers don't share a filesystem with the notebook server, so this path - '../secrets/ldeo-glaciology-bc97b12df06b.json' - isn't visible from the workers. They could be failing because of this.

The solution is to use the syntax documented on the Pangeo Cloud site.

with open('<your_token_file>.json') as token_file:
    token = json.load(token_file)
gcs = gcsfs.GCSFileSystem(token=token)
mapper = gcs.get_mapper(...)

That way you are loading the token data into memory as a dict and passing it around to the workers directly.

Does this fix it?

@jkingslake
Copy link
Author

Hi Ryan,
Thanks!
This make sense - the workers not having that token file.
Unfortunately, this version still doesn't seems to send anything the dask scheduler (i.e nothing happens in the Dask: status page and the graph page says the scheduler is empty).
Below is the code I am starting the cluster with, in case that's relevant.
Thanks for your help, as always! :-)

dask_gateway.__version__
dask.config.config['gateway']
gateway = dask_gateway.Gateway()
cluster = gateway.new_cluster()
gateway.list_clusters()
cluster.dashboard_link
cluster.scale(8)
client = Client(cluster)
cluster

@rabernat
Copy link
Member

Can you do anything at all with the cluster? Just compute something random?

import dask.array as dsa
arr = dsa.ones((100,), chunks=(10,))
arr.compute()

@rabernat
Copy link
Member

Unfortunately, this version still doesn't seems to send anything the dask scheduler

Do you see any data show up in your bucket?

@jkingslake
Copy link
Author

Do you see any data show up in your bucket?

Yes, the zarrs shows up when I avoid the memory issues by only sending a small subset of the data, or by using the largest server on https://us-central1-b.gcp.pangeo.io/

Can you do anything at all with the cluster? Just compute something random?

Generally, yes, using this setup I am successfully doing things on the cluster, but I am going to check this with this notebook once the sever is up and running. I'll get back to you to confirm.

@rabernat
Copy link
Member

So for some reason, writing this data is not going through the cluster, but other operations are...very weird. Doesn't make sense.

How are you creating ds? Does it actually use dask array? Can you share your code?

@jkingslake
Copy link
Author

OK, apologies, I was forgetting the difference between a dask array and just an xarray that was loaded lazily. Now I am loading the netcdf as a dask array by choosing chunk size.

A couple of new errors come up now. The notebook is here. It's getting a bit messy with non-dask- and dask-versions of the data, but hopefully it makes sense.

First, a worker is killed when trying to load into memory a ~10MB dask array from zarr that is created from a non-dask array (i.e. the creation of this zarr does not use the cluster). This error does not occur if you don't start a cluster.

Second, when trying to write a ~10MB dask array to zarr it fails by saying it "Failed to Serialize".

@rabernat
Copy link
Member

rabernat commented Nov 18, 2020

If you bypass dask and have enough memory to handle things locally (in your notebook server), then things work.

The problem appears to be serializing the reference to the netcdf file and passing it to the workers:

url = 'https://storage.googleapis.com/ldeo-glaciology/bedmachine/BedMachineAntarctica_2019-11-05_v01.nc'  
with  fsspec.open(url, mode='rb')  as openfile:  
    bm_dask = xr.open_dataset(openfile,chunks=3000)  

Forget about writing for a moment. Can you do anything with bm_dask on the distributed cluster? Does bm_dask.mean().compute() work?

@jkingslake
Copy link
Author

Forget about writing for a moment. Can you do anything with bm_dask on the distributed cluster? Does bm_dask.mean().compute() work?

This is just what I was trying. No the same error comes up.

@rabernat
Copy link
Member

I'm going to spend 15 minutes digging into this. I'll update you. It definitely SHOULD work.

@jkingslake
Copy link
Author

I really appreciate it! I have meetings for the rest of the day, so I may not be able to respond after 11.

@tjcrone
Copy link

tjcrone commented Nov 18, 2020

I'll add that before the Dask Gateway transition, Dask workers on the OOI Pangeo had the same home directory as the notebook server, which is why there may be examples out there of @jkingslake's original workflow. I have not had time to get NFS working with the new Gateway configs but I'm guessing it would be possible if it is something that is needed.

@rabernat
Copy link
Member

rabernat commented Nov 18, 2020

Shared home space is not the core issue here. That is resolved by my suggestion in #116 (comment).

The problem is that the dataset, as currently loaded, is not serializable. This is either an xarray or fsspec bug.

@rabernat
Copy link
Member

So here is a workaround, which bypasses fsspec and uses the net netcdf4 library's new and undocumented ability to open files over http.

url = 'https://storage.googleapis.com/ldeo-glaciology/bedmachine/BedMachineAntarctica_2019-11-05_v01.nc#mode=bytes'  
ds = xr.open_dataset(url, engine='netcdf4', chunks=3000)
ds

This works with distributed. I would play around with the chunk size and do some quick benchmarking before committing to 3000.

@porterdf
Copy link

url = 'https://storage.googleapis.com/ldeo-glaciology/bedmachine/BedMachineAntarctica_2019-11-05_v01.nc#mode=bytes'  
ds = xr.open_dataset(url, engine='netcdf4', chunks=3000)
ds

Thanks @rabernat - however this does not work for me on OOI Pangeo (nor similar attempts with other public netCDF files on GCS)

ds = xr.open_dataset(url, engine='netcdf4', chunks=3000)

FileNotFoundError: [Errno 2] No such file or directory: b'https://storage.googleapis.com/ldeo-glaciology/bedmachine/BedMachineAntarctica_2019-11-05_v01.nc#mode=bytes'

Is it a parsing issue? That is certainly the link to the file (tested in browser).

@rabernat
Copy link
Member

It's probably a netcdf-python version issue. I'm on netCDF4 1.5.4.

@tjcrone
Copy link

tjcrone commented Nov 18, 2020

The current OOI image has 1.5.3. @porterdf you can install 1.5.4 temporarily to test this using pip install netcdf==1.5.4 in a Jupyterlab terminal. If this fixes your issue we will build this into the next image.

@rabernat
Copy link
Member

This also works:

import gcsfs
import xarray as xr

gcs = gcsfs.GCSFileSystem()
url = 'gs://ldeo-glaciology/bedmachine/BedMachineAntarctica_2019-11-05_v01.nc'  
openfile = gcs.open(url, mode='rb') 
ds = xr.open_dataset(openfile, chunks=3000)

@tjcrone
Copy link

tjcrone commented Nov 18, 2020

@porterdf, I bumped the OOI image to the same one that @rabernat is using. You can try it out on staging at the moment to test against his suggestions here. If it works for you I will merge it into prod so that it is available for you on prod. It probably has the new cartopy version you were asking about as well.

@jkingslake
Copy link
Author

This works great on https://us-central1-b.gcp.pangeo.io/

Thanks all!

@porterdf
Copy link

@porterdf, I bumped the OOI image to the same one that @rabernat is using. You can try it out on staging at the moment to test against his suggestions here. If it works for you I will merge it into prod so that it is available for you on prod. It probably has the new cartopy version you were asking about as well.

Thanks @rabernat and @tjcrone - now working as expected on OOI staging, with current netCDF4 (and cartopy too)!

Let me know when changes are merged on to production. Fine to close this from my end.

@github-actions
Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the Stale label Jan 18, 2021
@github-actions
Copy link

This issue has been automatically closed because it had not seen recent activity. The issue can always be reopened at a later date.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants