Skip to content

Commit

Permalink
Merge branch 'branch-24.06' into jawe/ops-2621
Browse files Browse the repository at this point in the history
  • Loading branch information
AyodeAwe authored Apr 18, 2024
2 parents 5f989af + 63198ea commit e000aa7
Show file tree
Hide file tree
Showing 18 changed files with 219 additions and 72 deletions.
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
# dask-cuda 24.04.00 (10 Apr 2024)

## 🐛 Bug Fixes

- handle more RAPIDS version formats in update-version.sh ([#1307](https://github.com/rapidsai/dask-cuda/pull/1307)) [@jameslamb](https://github.com/jameslamb)

## 🚀 New Features

- Allow using pandas 2 ([#1308](https://github.com/rapidsai/dask-cuda/pull/1308)) [@vyasr](https://github.com/vyasr)
- Support CUDA 12.2 ([#1302](https://github.com/rapidsai/dask-cuda/pull/1302)) [@jameslamb](https://github.com/jameslamb)

## 🛠️ Improvements

- Use `conda env create --yes` instead of `--force` ([#1326](https://github.com/rapidsai/dask-cuda/pull/1326)) [@bdice](https://github.com/bdice)
- Add upper bound to prevent usage of NumPy 2 ([#1320](https://github.com/rapidsai/dask-cuda/pull/1320)) [@bdice](https://github.com/bdice)
- Generalize GHA selectors for pure Python testing ([#1318](https://github.com/rapidsai/dask-cuda/pull/1318)) [@jakirkham](https://github.com/jakirkham)
- Requre NumPy 1.23+ ([#1316](https://github.com/rapidsai/dask-cuda/pull/1316)) [@jakirkham](https://github.com/jakirkham)
- Add support for Python 3.11 ([#1315](https://github.com/rapidsai/dask-cuda/pull/1315)) [@jameslamb](https://github.com/jameslamb)
- target branch-24.04 for GitHub Actions workflows ([#1314](https://github.com/rapidsai/dask-cuda/pull/1314)) [@jameslamb](https://github.com/jameslamb)
- Filter dd deprecation ([#1312](https://github.com/rapidsai/dask-cuda/pull/1312)) [@rjzamora](https://github.com/rjzamora)
- Update ops-bot.yaml ([#1310](https://github.com/rapidsai/dask-cuda/pull/1310)) [@AyodeAwe](https://github.com/AyodeAwe)

# dask-cuda 24.02.00 (12 Feb 2024)

## 🚨 Breaking Changes
Expand Down
2 changes: 1 addition & 1 deletion ci/build_docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ rapids-dependency-file-generator \
--file_key docs \
--matrix "cuda=${RAPIDS_CUDA_VERSION%.*};arch=$(arch);py=${RAPIDS_PY_VERSION}" | tee env.yaml

rapids-mamba-retry env create --force -f env.yaml -n docs
rapids-mamba-retry env create --yes -f env.yaml -n docs
conda activate docs

rapids-print-env
Expand Down
2 changes: 1 addition & 1 deletion ci/check_style.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ rapids-dependency-file-generator \
--file_key checks \
--matrix "cuda=${RAPIDS_CUDA_VERSION%.*};arch=$(arch);py=${RAPIDS_PY_VERSION}" | tee env.yaml

rapids-mamba-retry env create --force -f env.yaml -n checks
rapids-mamba-retry env create --yes -f env.yaml -n checks
conda activate checks

# Run pre-commit checks
Expand Down
5 changes: 5 additions & 0 deletions ci/release/update-version.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ for FILE in .github/workflows/*.yaml; do
sed_runner "/shared-workflows/ s/@.*/@branch-${NEXT_SHORT_TAG}/g" "${FILE}"
done
sed_runner "s/RAPIDS_VERSION_NUMBER=\".*/RAPIDS_VERSION_NUMBER=\"${NEXT_SHORT_TAG}\"/g" ci/build_docs.sh

# Docs referencing source code
find docs/source/ -type f -name *.rst -print0 | while IFS= read -r -d '' filename; do
sed_runner "s|/branch-[^/]*/|/branch-${NEXT_SHORT_TAG}/|g" "${filename}"
done
53 changes: 48 additions & 5 deletions ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ rapids-dependency-file-generator \
--file_key test_python \
--matrix "cuda=${RAPIDS_CUDA_VERSION%.*};arch=$(arch);py=${RAPIDS_PY_VERSION}" | tee env.yaml

rapids-mamba-retry env create --force -f env.yaml -n test
rapids-mamba-retry env create --yes -f env.yaml -n test

# Temporarily allow unbound variables for conda activation.
set +u
Expand All @@ -35,11 +35,16 @@ rapids-logger "Check GPU usage"
nvidia-smi

EXITCODE=0
trap "EXITCODE=1" ERR
set_exit_code() {
EXITCODE=$?
rapids-logger "Test failed with error ${EXITCODE}"
}
trap set_exit_code ERR
set +e

rapids-logger "pytest dask-cuda"
rapids-logger "pytest dask-cuda (dask-expr)"
pushd dask_cuda
DASK_DATAFRAME__QUERY_PLANNING=True \
DASK_CUDA_TEST_SINGLE_GPU=1 \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
UCXPY_IFNAME=eth0 \
Expand All @@ -58,18 +63,56 @@ timeout 60m pytest \
tests -k "not ucxx"
popd

rapids-logger "Run local benchmark"
rapids-logger "pytest explicit-comms (legacy dd)"
pushd dask_cuda
DASK_DATAFRAME__QUERY_PLANNING=False \
DASK_CUDA_TEST_SINGLE_GPU=1 \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
UCXPY_IFNAME=eth0 \
UCX_WARN_UNUSED_ENV_VARS=n \
UCX_MEMTYPE_CACHE=n \
timeout 30m pytest \
-vv \
--durations=0 \
--capture=no \
--cache-clear \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cuda-legacy.xml" \
--cov-config=../pyproject.toml \
--cov=dask_cuda \
--cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage-legacy.xml" \
--cov-report=term \
tests/test_explicit_comms.py -k "not ucxx"
popd

rapids-logger "Run local benchmark (dask-expr)"
DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend dask

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

rapids-logger "Run local benchmark (legacy dd)"
DASK_DATAFRAME__QUERY_PLANNING=False \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend dask

DASK_DATAFRAME__QUERY_PLANNING=False \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

rapids-logger "Test script exiting with value: $EXITCODE"
rapids-logger "Test script exiting with latest error code: $EXITCODE"
exit ${EXITCODE}
12 changes: 12 additions & 0 deletions dask_cuda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@
from .proxify_device_objects import proxify_decorator, unproxify_decorator


if dask.config.get("dataframe.query-planning", None) is not False and dask.config.get(
"explicit-comms", False
):
raise NotImplementedError(
"The 'explicit-comms' config is not yet supported when "
"query-planning is enabled in dask. Please use the shuffle "
"API directly, or use the legacy dask-dataframe API "
"(set the 'dataframe.query-planning' config to `False`"
"before importing `dask.dataframe`).",
)


# Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True`
dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper(
dask.dataframe.shuffle.rearrange_by_column
Expand Down
47 changes: 27 additions & 20 deletions dask_cuda/benchmarks/local_cudf_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
import pandas as pd

import dask
from dask.base import tokenize
from dask.dataframe.core import new_dd_object
import dask.dataframe as dd
from dask.distributed import performance_report, wait
from dask.utils import format_bytes, parse_bytes

Expand All @@ -25,12 +24,20 @@
# <https://gist.github.com/rjzamora/0ffc35c19b5180ab04bbf7c793c45955>


def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu):
# Set default shuffle method to "tasks"
if dask.config.get("dataframe.shuffle.method", None) is None:
dask.config.set({"dataframe.shuffle.method": "tasks"})


def generate_chunk(input):
i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu = input

# Setting a seed that triggers max amount of comm in the two-GPU case.
if gpu:
import cupy as xp

import cudf as xdf
import dask_cudf # noqa: F401
else:
import numpy as xp
import pandas as xdf
Expand Down Expand Up @@ -105,25 +112,25 @@ def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args):

parts = [chunk_size for _ in range(num_chunks)]
device_type = True if args.type == "gpu" else False
meta = generate_chunk(0, 4, 1, chunk_type, None, device_type)
meta = generate_chunk((0, 4, 1, chunk_type, None, device_type))
divisions = [None] * (len(parts) + 1)

name = "generate-data-" + tokenize(chunk_size, num_chunks, frac_match, chunk_type)

graph = {
(name, i): (
generate_chunk,
i,
part,
len(parts),
chunk_type,
frac_match,
device_type,
)
for i, part in enumerate(parts)
}

ddf = new_dd_object(graph, name, meta, divisions)
ddf = dd.from_map(
generate_chunk,
[
(
i,
part,
len(parts),
chunk_type,
frac_match,
device_type,
)
for i, part in enumerate(parts)
],
meta=meta,
divisions=divisions,
)

if chunk_type == "build":
if not args.no_shuffle:
Expand Down
26 changes: 16 additions & 10 deletions dask_cuda/benchmarks/local_cudf_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

import dask
import dask.dataframe
from dask.dataframe.core import new_dd_object
from dask.dataframe.shuffle import shuffle
from dask.distributed import Client, performance_report, wait
from dask.utils import format_bytes, parse_bytes

Expand All @@ -33,7 +31,7 @@


def shuffle_dask(df, args):
result = shuffle(df, index="data", shuffle="tasks", ignore_index=args.ignore_index)
result = df.shuffle("data", shuffle_method="tasks", ignore_index=args.ignore_index)
if args.backend == "dask-noop":
result = as_noop(result)
t1 = perf_counter()
Expand Down Expand Up @@ -94,18 +92,24 @@ def create_data(
)

# Create partition based to the specified partition distribution
dsk = {}
futures = []
for i, part_size in enumerate(dist):
for _ in range(part_size):
# We use `client.submit` to control placement of the partition.
dsk[(name, len(dsk))] = client.submit(
create_df, chunksize, args.type, workers=[workers[i]], pure=False
futures.append(
client.submit(
create_df, chunksize, args.type, workers=[workers[i]], pure=False
)
)
wait(dsk.values())
wait(futures)

df_meta = create_df(0, args.type)
divs = [None] * (len(dsk) + 1)
ret = new_dd_object(dsk, name, df_meta, divs).persist()
divs = [None] * (len(futures) + 1)
ret = dask.dataframe.from_delayed(
futures,
meta=df_meta,
divisions=divs,
).persist()
wait(ret)

data_processed = args.in_parts * args.partition_size
Expand Down Expand Up @@ -254,7 +258,9 @@ def parse_args():
]

return parse_benchmark_args(
description="Distributed shuffle (dask/cudf) benchmark", args_list=special_args
description="Distributed shuffle (dask/cudf) benchmark",
args_list=special_args,
check_explicit_comms=False,
)


Expand Down
25 changes: 24 additions & 1 deletion dask_cuda/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import numpy as np
import pandas as pd

from dask import config
from dask.distributed import Client, SSHCluster
from dask.utils import format_bytes, format_time, parse_bytes
from distributed.comm.addressing import get_address_host
Expand Down Expand Up @@ -47,7 +48,11 @@ def as_noop(dsk):
raise RuntimeError("Requested noop computation but dask-noop not installed.")


def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]):
def parse_benchmark_args(
description="Generic dask-cuda Benchmark",
args_list=[],
check_explicit_comms=True,
):
parser = argparse.ArgumentParser(description=description)
worker_args = parser.add_argument_group(description="Worker configuration")
worker_args.add_argument(
Expand Down Expand Up @@ -317,6 +322,24 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]
if args.multi_node and len(args.hosts.split(",")) < 2:
raise ValueError("--multi-node requires at least 2 hosts")

# Raise error early if "explicit-comms" is not allowed
if (
check_explicit_comms
and args.backend == "explicit-comms"
and config.get(
"dataframe.query-planning",
None,
)
is not False
):
raise NotImplementedError(
"The 'explicit-comms' config is not yet supported when "
"query-planning is enabled in dask. Please use the legacy "
"dask-dataframe API by setting the following environment "
"variable before executing:",
" DASK_DATAFRAME__QUERY_PLANNING=False",
)

return args


Expand Down
Loading

0 comments on commit e000aa7

Please sign in to comment.