diff --git a/tests/benchmarks/test_array.py b/tests/benchmarks/test_array.py index 6b207aa27f..0715beebe5 100644 --- a/tests/benchmarks/test_array.py +++ b/tests/benchmarks/test_array.py @@ -3,48 +3,18 @@ import time import dask.array as da -import numpy as np import pytest -from dask.utils import parse_bytes from ..utils_test import ( arr_to_devnull, cluster_memory, print_size_info, - run_up_to_nthreads, scaled_array_shape, scaled_array_shape_quadratic, wait, ) -def test_anom_mean(small_client, new_array): - """From https://github.com/dask/distributed/issues/2602#issuecomment-498718651""" - xarray = pytest.importorskip("xarray") - - memory = cluster_memory(small_client) # 76.66 GiB - target_nbytes = memory // 2 - data = new_array( - scaled_array_shape(target_nbytes, ("x", "10MiB")), - chunks=(1, parse_bytes("10MiB") // 8), - ) - print_size_info(memory, target_nbytes, data) - # 38.32 GiB - 3925 10.00 MiB chunks - - ngroups = data.shape[0] // 100 - arr = xarray.DataArray( - data, - dims=["time", "x"], - coords={"day": ("time", np.arange(data.shape[0]) % ngroups)}, - ) - with xarray.set_options(use_flox=False): - clim = arr.groupby("day").mean(dim="time") - anom = arr.groupby("day") - clim - anom_mean = anom.mean(dim="time") - - wait(anom_mean, small_client, 10 * 60) - - @pytest.mark.parametrize( "speed,chunk_shape", [ @@ -114,63 +84,6 @@ def slow_map(x): wait(result, small_client, 10 * 60) -@pytest.mark.skip( - "fails in actual CI; see https://github.com/coiled/benchmarks/issues/253" -) -def test_climatic_mean(small_client, new_array): - """From https://github.com/dask/distributed/issues/2602#issuecomment-535009454""" - xarray = pytest.importorskip("xarray") - - memory = cluster_memory(small_client) # 76.66 GiB - target_nbytes = memory * 2 - chunks = (1, 1, 96, 21, 90, 144) - shape = (28, "x", 96, 21, 90, 144) - data = new_array(scaled_array_shape(target_nbytes, shape), chunks=chunks) - print_size_info(memory, target_nbytes, data) - # 152.62 GiB - 784 199.34 MiB chunks - - array = xarray.DataArray( - data, - dims=["ensemble", "init_date", "lat", "lead_time", "level", "lon"], - # coords={"init_date": pd.date_range(start="1960", periods=arr.shape[1])}, - coords={"init_date": np.arange(data.shape[1]) % 10}, - ) - # arr_clim = array.groupby("init_date.month").mean(dim="init_date") - with xarray.set_options(use_flox=False): - arr_clim = array.groupby("init_date").mean(dim="init_date") - - wait(arr_clim, small_client, 15 * 60) - - -@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") -@pytest.mark.parametrize("backend", ["dataframe", "array"]) -def test_quadratic_mean(small_client, backend): - # https://github.com/pangeo-data/distributed-array-examples/issues/2 - # See https://github.com/dask/dask/issues/10384 - xr = pytest.importorskip("xarray") - size = 5000 - ds = xr.Dataset( - dict( - anom_u=( - ["time", "face", "j", "i"], - da.random.random((size, 1, 987, 1920), chunks=(10, 1, -1, -1)), - ), - anom_v=( - ["time", "face", "j", "i"], - da.random.random((size, 1, 987, 1920), chunks=(10, 1, -1, -1)), - ), - ) - ) - - quad = ds**2 - quad["uv"] = ds.anom_u * ds.anom_v - mean = quad.mean("time") - if backend == "dataframe": - mean = mean.to_dask_dataframe() - - wait(mean, small_client, 10 * 60) - - def test_vorticity(small_client, new_array): # From https://github.com/dask/distributed/issues/6571 diff --git a/tests/benchmarks/test_xarray.py b/tests/benchmarks/test_xarray.py index d7ed6f705a..35fbaba50e 100644 --- a/tests/benchmarks/test_xarray.py +++ b/tests/benchmarks/test_xarray.py @@ -1,12 +1,22 @@ import uuid +import dask.array as da import fsspec +import numpy as np import pytest from coiled import Cluster +from dask.utils import parse_bytes from distributed import Client from tests.conftest import dump_cluster_kwargs -from tests.utils_test import wait + +from ..utils_test import ( + cluster_memory, + print_size_info, + run_up_to_nthreads, + scaled_array_shape, + wait, +) xr = pytest.importorskip("xarray") pytest.importorskip("flox") @@ -67,3 +77,84 @@ def test_xarray_groupby_reduction(group_reduction_client, func): subset = subset.isel(x=slice(0, 350 * 8), y=slice(0, 350 * 8)) result = func(subset) wait(result, group_reduction_client, 10 * 60) + + +@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") +@pytest.mark.parametrize("backend", ["dataframe", "array"]) +def test_quadratic_mean(small_client, backend): + # https://github.com/pangeo-data/distributed-array-examples/issues/2 + # See https://github.com/dask/dask/issues/10384 + size = 5000 + ds = xr.Dataset( + dict( + anom_u=( + ["time", "face", "j", "i"], + da.random.random((size, 1, 987, 1920), chunks=(10, 1, -1, -1)), + ), + anom_v=( + ["time", "face", "j", "i"], + da.random.random((size, 1, 987, 1920), chunks=(10, 1, -1, -1)), + ), + ) + ) + + quad = ds**2 + quad["uv"] = ds.anom_u * ds.anom_v + mean = quad.mean("time") + if backend == "dataframe": + mean = mean.to_dask_dataframe() + + wait(mean, small_client, 10 * 60) + + +def test_anom_mean(small_client, new_array): + """From https://github.com/dask/distributed/issues/2602#issuecomment-498718651""" + + memory = cluster_memory(small_client) # 76.66 GiB + target_nbytes = memory // 2 + data = new_array( + scaled_array_shape(target_nbytes, ("x", "10MiB")), + chunks=(1, parse_bytes("10MiB") // 8), + ) + print_size_info(memory, target_nbytes, data) + # 38.32 GiB - 3925 10.00 MiB chunks + + ngroups = data.shape[0] // 100 + arr = xr.DataArray( + data, + dims=["time", "x"], + coords={"day": ("time", np.arange(data.shape[0]) % ngroups)}, + ) + with xr.set_options(use_flox=False): + clim = arr.groupby("day").mean(dim="time") + anom = arr.groupby("day") - clim + anom_mean = anom.mean(dim="time") + + wait(anom_mean, small_client, 10 * 60) + + +@pytest.mark.skip( + "fails in actual CI; see https://github.com/coiled/benchmarks/issues/253" +) +def test_climatic_mean(small_client, new_array): + """From https://github.com/dask/distributed/issues/2602#issuecomment-535009454""" + + memory = cluster_memory(small_client) # 76.66 GiB + target_nbytes = memory * 2 + chunks = (1, 1, 96, 21, 90, 144) + shape = (28, "x", 96, 21, 90, 144) + data = new_array(scaled_array_shape(target_nbytes, shape), chunks=chunks) + print_size_info(memory, target_nbytes, data) + # 152.62 GiB - 784 199.34 MiB chunks + + array = xr.DataArray( + data, + dims=["ensemble", "init_date", "lat", "lead_time", "level", "lon"], + # coords={"init_date": pd.date_range(start="1960", periods=arr.shape[1])}, + coords={"init_date": np.arange(data.shape[1]) % 10}, + ) + # arr_clim = array.groupby("init_date.month").mean(dim="init_date") + with xr.set_options(use_flox=False): + arr_clim = array.groupby("init_date").mean(dim="init_date") + + wait(arr_clim, small_client, 15 * 60)