Skip to content

Commit

Permalink
Move xarray tests into a single file (#1537)
Browse files Browse the repository at this point in the history
  • Loading branch information
phofl authored Sep 13, 2024
1 parent 7dbb2c9 commit 7273ef9
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 88 deletions.
87 changes: 0 additions & 87 deletions tests/benchmarks/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,18 @@
import time

import dask.array as da
import numpy as np
import pytest
from dask.utils import parse_bytes

from ..utils_test import (
arr_to_devnull,
cluster_memory,
print_size_info,
run_up_to_nthreads,
scaled_array_shape,
scaled_array_shape_quadratic,
wait,
)


def test_anom_mean(small_client, new_array):
"""From https://github.com/dask/distributed/issues/2602#issuecomment-498718651"""
xarray = pytest.importorskip("xarray")

memory = cluster_memory(small_client) # 76.66 GiB
target_nbytes = memory // 2
data = new_array(
scaled_array_shape(target_nbytes, ("x", "10MiB")),
chunks=(1, parse_bytes("10MiB") // 8),
)
print_size_info(memory, target_nbytes, data)
# 38.32 GiB - 3925 10.00 MiB chunks

ngroups = data.shape[0] // 100
arr = xarray.DataArray(
data,
dims=["time", "x"],
coords={"day": ("time", np.arange(data.shape[0]) % ngroups)},
)
with xarray.set_options(use_flox=False):
clim = arr.groupby("day").mean(dim="time")
anom = arr.groupby("day") - clim
anom_mean = anom.mean(dim="time")

wait(anom_mean, small_client, 10 * 60)


@pytest.mark.parametrize(
"speed,chunk_shape",
[
Expand Down Expand Up @@ -114,63 +84,6 @@ def slow_map(x):
wait(result, small_client, 10 * 60)


@pytest.mark.skip(
"fails in actual CI; see https://github.com/coiled/benchmarks/issues/253"
)
def test_climatic_mean(small_client, new_array):
"""From https://github.com/dask/distributed/issues/2602#issuecomment-535009454"""
xarray = pytest.importorskip("xarray")

memory = cluster_memory(small_client) # 76.66 GiB
target_nbytes = memory * 2
chunks = (1, 1, 96, 21, 90, 144)
shape = (28, "x", 96, 21, 90, 144)
data = new_array(scaled_array_shape(target_nbytes, shape), chunks=chunks)
print_size_info(memory, target_nbytes, data)
# 152.62 GiB - 784 199.34 MiB chunks

array = xarray.DataArray(
data,
dims=["ensemble", "init_date", "lat", "lead_time", "level", "lon"],
# coords={"init_date": pd.date_range(start="1960", periods=arr.shape[1])},
coords={"init_date": np.arange(data.shape[1]) % 10},
)
# arr_clim = array.groupby("init_date.month").mean(dim="init_date")
with xarray.set_options(use_flox=False):
arr_clim = array.groupby("init_date").mean(dim="init_date")

wait(arr_clim, small_client, 15 * 60)


@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset")
@pytest.mark.parametrize("backend", ["dataframe", "array"])
def test_quadratic_mean(small_client, backend):
# https://github.com/pangeo-data/distributed-array-examples/issues/2
# See https://github.com/dask/dask/issues/10384
xr = pytest.importorskip("xarray")
size = 5000
ds = xr.Dataset(
dict(
anom_u=(
["time", "face", "j", "i"],
da.random.random((size, 1, 987, 1920), chunks=(10, 1, -1, -1)),
),
anom_v=(
["time", "face", "j", "i"],
da.random.random((size, 1, 987, 1920), chunks=(10, 1, -1, -1)),
),
)
)

quad = ds**2
quad["uv"] = ds.anom_u * ds.anom_v
mean = quad.mean("time")
if backend == "dataframe":
mean = mean.to_dask_dataframe()

wait(mean, small_client, 10 * 60)


def test_vorticity(small_client, new_array):
# From https://github.com/dask/distributed/issues/6571

Expand Down
93 changes: 92 additions & 1 deletion tests/benchmarks/test_xarray.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import uuid

import dask.array as da
import fsspec
import numpy as np
import pytest
from coiled import Cluster
from dask.utils import parse_bytes
from distributed import Client

from tests.conftest import dump_cluster_kwargs
from tests.utils_test import wait

from ..utils_test import (
cluster_memory,
print_size_info,
run_up_to_nthreads,
scaled_array_shape,
wait,
)

xr = pytest.importorskip("xarray")
pytest.importorskip("flox")
Expand Down Expand Up @@ -67,3 +77,84 @@ def test_xarray_groupby_reduction(group_reduction_client, func):
subset = subset.isel(x=slice(0, 350 * 8), y=slice(0, 350 * 8))
result = func(subset)
wait(result, group_reduction_client, 10 * 60)


@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset")
@pytest.mark.parametrize("backend", ["dataframe", "array"])
def test_quadratic_mean(small_client, backend):
# https://github.com/pangeo-data/distributed-array-examples/issues/2
# See https://github.com/dask/dask/issues/10384
size = 5000
ds = xr.Dataset(
dict(
anom_u=(
["time", "face", "j", "i"],
da.random.random((size, 1, 987, 1920), chunks=(10, 1, -1, -1)),
),
anom_v=(
["time", "face", "j", "i"],
da.random.random((size, 1, 987, 1920), chunks=(10, 1, -1, -1)),
),
)
)

quad = ds**2
quad["uv"] = ds.anom_u * ds.anom_v
mean = quad.mean("time")
if backend == "dataframe":
mean = mean.to_dask_dataframe()

wait(mean, small_client, 10 * 60)


def test_anom_mean(small_client, new_array):
"""From https://github.com/dask/distributed/issues/2602#issuecomment-498718651"""

memory = cluster_memory(small_client) # 76.66 GiB
target_nbytes = memory // 2
data = new_array(
scaled_array_shape(target_nbytes, ("x", "10MiB")),
chunks=(1, parse_bytes("10MiB") // 8),
)
print_size_info(memory, target_nbytes, data)
# 38.32 GiB - 3925 10.00 MiB chunks

ngroups = data.shape[0] // 100
arr = xr.DataArray(
data,
dims=["time", "x"],
coords={"day": ("time", np.arange(data.shape[0]) % ngroups)},
)
with xr.set_options(use_flox=False):
clim = arr.groupby("day").mean(dim="time")
anom = arr.groupby("day") - clim
anom_mean = anom.mean(dim="time")

wait(anom_mean, small_client, 10 * 60)


@pytest.mark.skip(
"fails in actual CI; see https://github.com/coiled/benchmarks/issues/253"
)
def test_climatic_mean(small_client, new_array):
"""From https://github.com/dask/distributed/issues/2602#issuecomment-535009454"""

memory = cluster_memory(small_client) # 76.66 GiB
target_nbytes = memory * 2
chunks = (1, 1, 96, 21, 90, 144)
shape = (28, "x", 96, 21, 90, 144)
data = new_array(scaled_array_shape(target_nbytes, shape), chunks=chunks)
print_size_info(memory, target_nbytes, data)
# 152.62 GiB - 784 199.34 MiB chunks

array = xr.DataArray(
data,
dims=["ensemble", "init_date", "lat", "lead_time", "level", "lon"],
# coords={"init_date": pd.date_range(start="1960", periods=arr.shape[1])},
coords={"init_date": np.arange(data.shape[1]) % 10},
)
# arr_clim = array.groupby("init_date.month").mean(dim="init_date")
with xr.set_options(use_flox=False):
arr_clim = array.groupby("init_date").mean(dim="init_date")

wait(arr_clim, small_client, 15 * 60)

0 comments on commit 7273ef9

Please sign in to comment.