Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for MergeDims and Split Variables to FilePatternToChunks transform. #39

Closed
wants to merge 5 commits into from

Conversation

alxmrs
Copy link
Contributor

@alxmrs alxmrs commented Sep 24, 2021

Fixes #29 and #38.

@google-cla google-cla bot added the cla: yes label Sep 24, 2021
xarray_beam/_src/pangeo_forge.py Outdated Show resolved Hide resolved
Comment on lines +161 to +162
if not self._max_sizes:
self._max_sizes = dataset.sizes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this before, but why is this caching needed/useful? In general would guess it's probably a bad idea to make stateful Beam transforms, since that breaks one of the underlying assumptions of Beam's data model.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that's helpful to know that stateful transforms are discouraged; I'll keep that in mind for the future.

This caching was the simplest way I could think of to calculate the correct offsets for the keys. See this code-block for context.

When I calculated the offsets using the current dataset's sizes, it would always fail to compute the last offsets correctly (please see the code-block linked above). The simplest way I could think of to calculate the right starting offset was to cache the first dataset's size, and let the 0-indexed dim.index handle the rest.

From what I can tell, this data is safe to cache. Those are, however, famous last words in parallel programming.

Co-authored-by: Stephan Hoyer <shoyer@google.com>
@alxmrs
Copy link
Contributor Author

alxmrs commented Oct 7, 2021

I'm testing this PR end-to-end with a script that uses this file pattern for a Grib 2 dataset (uses the cfgrib backend in XArray), deployed on GCP's Dataflow. Right now I'm getting what appears to be a deadline. My pipeline ends with this error:

Root cause: The worker lost contact with the service.

Traces in logs show that threads are acquiring a lock, though it's unclear if it's just a big dataset and thus taking some time.

log 1
"Operation ongoing for over 1665.73 seconds in state process-msecs in step FilePatternToChunks/FlatMapTuple(_open_chunks)-ptransform-764  without returning. Current Traceback:
  File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()

  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()

  File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
    self._work_item.run()

  File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 356, in task
    self._execute(

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
    response = task()

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 601, in do_instruction
    return getattr(self, request_type)(

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
    bundle_processor.process_bundle(instruction_id))

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 222, in process_encoded
    self.output(decoded_value)

  File "/Users/alxrsngrtn/Github/xarray-beam/xarray_beam/_src/pangeo_forge.py", line 152, in _open_chunks
    with self._open_dataset(path) as dataset:

  File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)

  File "/usr/local/lib/python3.8/site-packages/xarray_beam/_src/pangeo_forge.py", line 137, in _open_dataset
    local_file = fsspec.open_local(

  File "/usr/local/lib/python3.8/site-packages/fsspec/core.py", line 487, in open_local
    with of as files:

  File "/usr/local/lib/python3.8/site-packages/fsspec/core.py", line 184, in __enter__
    self.files = fs.open_many(self)

  File "/usr/local/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 394, in <lambda>
    return lambda *args, **kw: getattr(type(self), item).__get__(self)(

  File "/usr/local/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 503, in open_many
    self.fs.get(downpath, downfn)

  File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 91, in wrapper
    return sync(self.loop, func, *args, **kwargs)

  File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 59, in sync
    if event.wait(1):

  File "/usr/local/lib/python3.8/threading.py", line 558, in wait
    signaled = self._cond.wait(timeout)

  File "/usr/local/lib/python3.8/threading.py", line 306, in wait
    gotit = waiter.acquire(True, timeout)
log 2
Operation ongoing for over 743.75 seconds in state process-msecs in step FilePatternToChunks/FlatMapTuple(_open_chunks)-ptransform-224  without returning. Current Traceback:
  File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()

  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()

  File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
    self._work_item.run()

  File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 356, in task
    self._execute(

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
    response = task()

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 601, in do_instruction
    return getattr(self, request_type)(

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
    bundle_processor.process_bundle(instruction_id))

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 222, in process_encoded
    self.output(decoded_value)

  File "/Users/alxrsngrtn/Github/xarray-beam/xarray_beam/_src/pangeo_forge.py", line 185, in _open_chunks
    new_key, chunk.compute(num_workers=num_threads)

  File "/usr/local/lib/python3.8/site-packages/xarray/core/dataset.py", line 1031, in compute
    return new.load(**kwargs)

  File "/usr/local/lib/python3.8/site-packages/xarray/core/dataset.py", line 865, in load
    evaluated_data = da.compute(*lazy_data.values(), **kwargs)

  File "/usr/local/lib/python3.8/site-packages/dask/base.py", line 570, in compute
    results = schedule(dsk, keys, **kwargs)

  File "/usr/local/lib/python3.8/site-packages/dask/threaded.py", line 79, in get
    results = get_async(

  File "/usr/local/lib/python3.8/site-packages/dask/local.py", line 506, in get_async
    for key, res_info, failed in queue_get(queue).result():

  File "/usr/local/lib/python3.8/site-packages/dask/local.py", line 134, in queue_get
    return q.get()

  File "/usr/local/lib/python3.8/queue.py", line 170, in get
    self.not_empty.wait()

  File "/usr/local/lib/python3.8/threading.py", line 302, in wait
    waiter.acquire()

This to me seems like an instance of pydata/xarray#4591. Right now, I'm going to experiment with changing the scheduler to use a single thread in the compute method of _open_chunks().

@alxmrs
Copy link
Contributor Author

alxmrs commented Oct 7, 2021

I should mention: The Dataflow diagnostics for the above report is showing unresponsive threads, making a dead-lock scenario more sound.

grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "keepalive watchdog timeout" debug_error_string = "{"created":"@1630579134.284653312","description":"Error received from peer ipv6:[::1]:12371","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"keepalive watchdog timeout","grpc_status":14}" >
at _next (/usr/local/lib/python3.8/site-packages/grpc/_channel.py:803)
at __next__ (/usr/local/lib/python3.8/site-packages/grpc/_channel.py:416)
at run (/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:251)
at main (/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker_main.py:182)

@shoyer
Copy link
Member

shoyer commented Oct 7, 2021

Dead-locking seems plausible, but this is different from pydata/xarray#4591 which describes a serialization failure.

…f with fsspec). This avoids contention issues with dask, fsspec, and xarray (including a possible deadlock).
@shoyer
Copy link
Member

shoyer commented Jan 5, 2024

This has gone stale.

@shoyer shoyer closed this Jan 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support pangeo_forge_recipes.patterns.MergeDim in FilePatternToChunks
2 participants