Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pickle writing fails with simplecache:: using xrootd paths (maybe with other remote paths as well). #1671

Open
ikrommyd opened this issue Sep 5, 2024 · 1 comment

Comments

@ikrommyd
Copy link

ikrommyd commented Sep 5, 2024

Something like this to reproduce:

import pickle
import fsspec

with fsspec.open("simplecache::root://cmseos.fnal.gov//store/user/ikrommyd/dummy/dummy.pkl", "wb") as f:
    d = {"1": 1, "2": 2}
    pickle.dump(d, f)

fails with:

---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
----> 1 with fsspec.open("simplecache::root://cmseos.fnal.gov//store/user/ikrommyd/dummy/dummy.pkl", "wb") as f:
      2     d = {"1": 1, "2": 2}
      3     pickle.dump(d, f)

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/core.py:134, in OpenFile.__exit__(self, *args)
    133 def __exit__(self, *args):
--> 134     self.close()

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/core.py:154, in OpenFile.close(self)
    152     if "r" not in self.mode and not f.closed:
    153         f.flush()
--> 154     f.close()
    155 self.fobjects.clear()

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/implementations/cached.py:911, in LocalTempFile.close(self)
    909 self.closed = True
    910 if self.autocommit:
--> 911     self.commit()

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/implementations/cached.py:918, in LocalTempFile.commit(self)
    917 def commit(self):
--> 918     self.fs.put(self.fn, self.path, **self.kwargs)

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/asyn.py:118, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
    115 @functools.wraps(func)
    116 def wrapper(*args, **kwargs):
    117     self = obj or args[0]
--> 118     return sync(self.loop, func, *args, **kwargs)

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/asyn.py:103, in sync(loop, func, timeout, *args, **kwargs)
    101     raise FSTimeoutError from return_result
    102 elif isinstance(return_result, BaseException):
--> 103     raise return_result
    104 else:
    105     return return_result

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/asyn.py:56, in _runner(event, coro, result, timeout)
     54     coro = asyncio.wait_for(coro, timeout=timeout)
     55 try:
---> 56     result[0] = await coro
     57 except Exception as ex:
     58     result[0] = ex

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/asyn.py:589, in AsyncFileSystem._put(self, lpath, rpath, recursive, callback, batch_size, maxdepth, **kwargs)
    586     put_file = callback.branch_coro(self._put_file)
    587     coros.append(put_file(lfile, rfile, **kwargs))
--> 589 return await _run_coros_in_chunks(
    590     coros, batch_size=batch_size, callback=callback
    591 )

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/asyn.py:268, in _run_coros_in_chunks(coros, batch_size, callback, timeout, return_exceptions, nofiles)
    266     done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
    267     while done:
--> 268         result, k = await done.pop()
    269         results[k] = result
    271 return results

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/asyn.py:245, in _run_coros_in_chunks.<locals>._run_coro(coro, i)
    243 async def _run_coro(coro, i):
    244     try:
--> 245         return await asyncio.wait_for(coro, timeout=timeout), i
    246     except Exception as e:
    247         if not return_exceptions:

File ~/miniforge3/envs/egamma_dev/lib/python3.10/asyncio/tasks.py:408, in wait_for(fut, timeout)
    405 loop = events.get_running_loop()
    407 if timeout is None:
--> 408     return await fut
    410 if timeout <= 0:
    411     fut = ensure_future(fut, loop=loop)

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/callbacks.py:81, in Callback.branch_coro.<locals>.func(path1, path2, **kwargs)
     78 @wraps(fn)
     79 async def func(path1, path2: str, **kwargs):
     80     with self.branched(path1, path2, **kwargs) as child:
---> 81         return await fn(path1, path2, callback=child, **kwargs)

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/asyn.py:517, in AsyncFileSystem._put_file(self, lpath, rpath, **kwargs)
    516 async def _put_file(self, lpath, rpath, **kwargs):
--> 517     raise NotImplementedError

NotImplementedError:

I haven't tried with other types of remote paths. Perhaps there's a more general problem that isn't only xrootd+simplecache.

@martindurant
Copy link
Member

I think I commented on this elsewhere, but let me copy here for the record.

It seems that xrootd doesn't implement _put_file. It would maybe be reasonable to implement this upstream in a simplistic way for backends that don't have their own, but looking in put_file (in AbstractFileSystem) shows that there ought to be more going on, e.g., for callbacks.

--- a/fsspec/asyn.py
+++ b/fsspec/asyn.py
@@ -514,7 +514,8 @@ class AsyncFileSystem(AbstractFileSystem):
         )

     async def _put_file(self, lpath, rpath, **kwargs):
-        raise NotImplementedError
+        data = open(lpath, "rb").read()
+        await self._pipe_file(rpath, data, **kwargs)

     async def _put(
         self,
@@ -591,7 +592,10 @@ class AsyncFileSystem(AbstractFileSystem):
         )

     async def _get_file(self, rpath, lpath, **kwargs):
-        raise NotImplementedError
+        data = await self._cat_file(rpath, **kwargs)
+        with open(lpath, "wb") as f:
+            f.write(data)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants