diff --git a/CHANGELOG.md b/CHANGELOG.md index 126cf46c2..9ffd9105b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,25 @@ +# dask-cuda 24.04.00 (10 Apr 2024) + +## 🐛 Bug Fixes + +- handle more RAPIDS version formats in update-version.sh ([#1307](https://github.com/rapidsai/dask-cuda/pull/1307)) [@jameslamb](https://github.com/jameslamb) + +## 🚀 New Features + +- Allow using pandas 2 ([#1308](https://github.com/rapidsai/dask-cuda/pull/1308)) [@vyasr](https://github.com/vyasr) +- Support CUDA 12.2 ([#1302](https://github.com/rapidsai/dask-cuda/pull/1302)) [@jameslamb](https://github.com/jameslamb) + +## 🛠️ Improvements + +- Use `conda env create --yes` instead of `--force` ([#1326](https://github.com/rapidsai/dask-cuda/pull/1326)) [@bdice](https://github.com/bdice) +- Add upper bound to prevent usage of NumPy 2 ([#1320](https://github.com/rapidsai/dask-cuda/pull/1320)) [@bdice](https://github.com/bdice) +- Generalize GHA selectors for pure Python testing ([#1318](https://github.com/rapidsai/dask-cuda/pull/1318)) [@jakirkham](https://github.com/jakirkham) +- Requre NumPy 1.23+ ([#1316](https://github.com/rapidsai/dask-cuda/pull/1316)) [@jakirkham](https://github.com/jakirkham) +- Add support for Python 3.11 ([#1315](https://github.com/rapidsai/dask-cuda/pull/1315)) [@jameslamb](https://github.com/jameslamb) +- target branch-24.04 for GitHub Actions workflows ([#1314](https://github.com/rapidsai/dask-cuda/pull/1314)) [@jameslamb](https://github.com/jameslamb) +- Filter dd deprecation ([#1312](https://github.com/rapidsai/dask-cuda/pull/1312)) [@rjzamora](https://github.com/rjzamora) +- Update ops-bot.yaml ([#1310](https://github.com/rapidsai/dask-cuda/pull/1310)) [@AyodeAwe](https://github.com/AyodeAwe) + # dask-cuda 24.02.00 (12 Feb 2024) ## 🚨 Breaking Changes diff --git a/ci/build_docs.sh b/ci/build_docs.sh index 6a53fe47c..a727d6daf 100755 --- a/ci/build_docs.sh +++ b/ci/build_docs.sh @@ -10,7 +10,7 @@ rapids-dependency-file-generator \ --file_key docs \ --matrix "cuda=${RAPIDS_CUDA_VERSION%.*};arch=$(arch);py=${RAPIDS_PY_VERSION}" | tee env.yaml -rapids-mamba-retry env create --force -f env.yaml -n docs +rapids-mamba-retry env create --yes -f env.yaml -n docs conda activate docs rapids-print-env diff --git a/ci/check_style.sh b/ci/check_style.sh index be3ac3f4b..9bc26fe71 100755 --- a/ci/check_style.sh +++ b/ci/check_style.sh @@ -11,7 +11,7 @@ rapids-dependency-file-generator \ --file_key checks \ --matrix "cuda=${RAPIDS_CUDA_VERSION%.*};arch=$(arch);py=${RAPIDS_PY_VERSION}" | tee env.yaml -rapids-mamba-retry env create --force -f env.yaml -n checks +rapids-mamba-retry env create --yes -f env.yaml -n checks conda activate checks # Run pre-commit checks diff --git a/ci/release/update-version.sh b/ci/release/update-version.sh index 9f40318b2..0d1b8b1a5 100755 --- a/ci/release/update-version.sh +++ b/ci/release/update-version.sh @@ -56,3 +56,8 @@ for FILE in .github/workflows/*.yaml; do sed_runner "/shared-workflows/ s/@.*/@branch-${NEXT_SHORT_TAG}/g" "${FILE}" done sed_runner "s/RAPIDS_VERSION_NUMBER=\".*/RAPIDS_VERSION_NUMBER=\"${NEXT_SHORT_TAG}\"/g" ci/build_docs.sh + +# Docs referencing source code +find docs/source/ -type f -name *.rst -print0 | while IFS= read -r -d '' filename; do + sed_runner "s|/branch-[^/]*/|/branch-${NEXT_SHORT_TAG}/|g" "${filename}" +done diff --git a/ci/test_python.sh b/ci/test_python.sh index f700c935b..b52cbb6d4 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -11,7 +11,7 @@ rapids-dependency-file-generator \ --file_key test_python \ --matrix "cuda=${RAPIDS_CUDA_VERSION%.*};arch=$(arch);py=${RAPIDS_PY_VERSION}" | tee env.yaml -rapids-mamba-retry env create --force -f env.yaml -n test +rapids-mamba-retry env create --yes -f env.yaml -n test # Temporarily allow unbound variables for conda activation. set +u @@ -35,11 +35,16 @@ rapids-logger "Check GPU usage" nvidia-smi EXITCODE=0 -trap "EXITCODE=1" ERR +set_exit_code() { + EXITCODE=$? + rapids-logger "Test failed with error ${EXITCODE}" +} +trap set_exit_code ERR set +e -rapids-logger "pytest dask-cuda" +rapids-logger "pytest dask-cuda (dask-expr)" pushd dask_cuda +DASK_DATAFRAME__QUERY_PLANNING=True \ DASK_CUDA_TEST_SINGLE_GPU=1 \ DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ UCXPY_IFNAME=eth0 \ @@ -58,18 +63,56 @@ timeout 60m pytest \ tests -k "not ucxx" popd -rapids-logger "Run local benchmark" +rapids-logger "pytest explicit-comms (legacy dd)" +pushd dask_cuda +DASK_DATAFRAME__QUERY_PLANNING=False \ +DASK_CUDA_TEST_SINGLE_GPU=1 \ +DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ +UCXPY_IFNAME=eth0 \ +UCX_WARN_UNUSED_ENV_VARS=n \ +UCX_MEMTYPE_CACHE=n \ +timeout 30m pytest \ + -vv \ + --durations=0 \ + --capture=no \ + --cache-clear \ + --junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cuda-legacy.xml" \ + --cov-config=../pyproject.toml \ + --cov=dask_cuda \ + --cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage-legacy.xml" \ + --cov-report=term \ + tests/test_explicit_comms.py -k "not ucxx" +popd + +rapids-logger "Run local benchmark (dask-expr)" +DASK_DATAFRAME__QUERY_PLANNING=True \ +python dask_cuda/benchmarks/local_cudf_shuffle.py \ + --partition-size="1 KiB" \ + -d 0 \ + --runs 1 \ + --backend dask + +DASK_DATAFRAME__QUERY_PLANNING=True \ +python dask_cuda/benchmarks/local_cudf_shuffle.py \ + --partition-size="1 KiB" \ + -d 0 \ + --runs 1 \ + --backend explicit-comms + +rapids-logger "Run local benchmark (legacy dd)" +DASK_DATAFRAME__QUERY_PLANNING=False \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --partition-size="1 KiB" \ -d 0 \ --runs 1 \ --backend dask +DASK_DATAFRAME__QUERY_PLANNING=False \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --partition-size="1 KiB" \ -d 0 \ --runs 1 \ --backend explicit-comms -rapids-logger "Test script exiting with value: $EXITCODE" +rapids-logger "Test script exiting with latest error code: $EXITCODE" exit ${EXITCODE} diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index 30f987ac4..516599da3 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -20,6 +20,18 @@ from .proxify_device_objects import proxify_decorator, unproxify_decorator +if dask.config.get("dataframe.query-planning", None) is not False and dask.config.get( + "explicit-comms", False +): + raise NotImplementedError( + "The 'explicit-comms' config is not yet supported when " + "query-planning is enabled in dask. Please use the shuffle " + "API directly, or use the legacy dask-dataframe API " + "(set the 'dataframe.query-planning' config to `False`" + "before importing `dask.dataframe`).", + ) + + # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper( dask.dataframe.shuffle.rearrange_by_column diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index ba3a9d56d..6a68ad788 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -7,8 +7,7 @@ import pandas as pd import dask -from dask.base import tokenize -from dask.dataframe.core import new_dd_object +import dask.dataframe as dd from dask.distributed import performance_report, wait from dask.utils import format_bytes, parse_bytes @@ -25,12 +24,20 @@ # -def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu): +# Set default shuffle method to "tasks" +if dask.config.get("dataframe.shuffle.method", None) is None: + dask.config.set({"dataframe.shuffle.method": "tasks"}) + + +def generate_chunk(input): + i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu = input + # Setting a seed that triggers max amount of comm in the two-GPU case. if gpu: import cupy as xp import cudf as xdf + import dask_cudf # noqa: F401 else: import numpy as xp import pandas as xdf @@ -105,25 +112,25 @@ def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args): parts = [chunk_size for _ in range(num_chunks)] device_type = True if args.type == "gpu" else False - meta = generate_chunk(0, 4, 1, chunk_type, None, device_type) + meta = generate_chunk((0, 4, 1, chunk_type, None, device_type)) divisions = [None] * (len(parts) + 1) - name = "generate-data-" + tokenize(chunk_size, num_chunks, frac_match, chunk_type) - - graph = { - (name, i): ( - generate_chunk, - i, - part, - len(parts), - chunk_type, - frac_match, - device_type, - ) - for i, part in enumerate(parts) - } - - ddf = new_dd_object(graph, name, meta, divisions) + ddf = dd.from_map( + generate_chunk, + [ + ( + i, + part, + len(parts), + chunk_type, + frac_match, + device_type, + ) + for i, part in enumerate(parts) + ], + meta=meta, + divisions=divisions, + ) if chunk_type == "build": if not args.no_shuffle: diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index a3492b664..a1129dd37 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -8,8 +8,6 @@ import dask import dask.dataframe -from dask.dataframe.core import new_dd_object -from dask.dataframe.shuffle import shuffle from dask.distributed import Client, performance_report, wait from dask.utils import format_bytes, parse_bytes @@ -33,7 +31,7 @@ def shuffle_dask(df, args): - result = shuffle(df, index="data", shuffle="tasks", ignore_index=args.ignore_index) + result = df.shuffle("data", shuffle_method="tasks", ignore_index=args.ignore_index) if args.backend == "dask-noop": result = as_noop(result) t1 = perf_counter() @@ -94,18 +92,24 @@ def create_data( ) # Create partition based to the specified partition distribution - dsk = {} + futures = [] for i, part_size in enumerate(dist): for _ in range(part_size): # We use `client.submit` to control placement of the partition. - dsk[(name, len(dsk))] = client.submit( - create_df, chunksize, args.type, workers=[workers[i]], pure=False + futures.append( + client.submit( + create_df, chunksize, args.type, workers=[workers[i]], pure=False + ) ) - wait(dsk.values()) + wait(futures) df_meta = create_df(0, args.type) - divs = [None] * (len(dsk) + 1) - ret = new_dd_object(dsk, name, df_meta, divs).persist() + divs = [None] * (len(futures) + 1) + ret = dask.dataframe.from_delayed( + futures, + meta=df_meta, + divisions=divs, + ).persist() wait(ret) data_processed = args.in_parts * args.partition_size @@ -254,7 +258,9 @@ def parse_args(): ] return parse_benchmark_args( - description="Distributed shuffle (dask/cudf) benchmark", args_list=special_args + description="Distributed shuffle (dask/cudf) benchmark", + args_list=special_args, + check_explicit_comms=False, ) diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 51fae7201..5ac79a88d 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -11,6 +11,7 @@ import numpy as np import pandas as pd +from dask import config from dask.distributed import Client, SSHCluster from dask.utils import format_bytes, format_time, parse_bytes from distributed.comm.addressing import get_address_host @@ -47,7 +48,11 @@ def as_noop(dsk): raise RuntimeError("Requested noop computation but dask-noop not installed.") -def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]): +def parse_benchmark_args( + description="Generic dask-cuda Benchmark", + args_list=[], + check_explicit_comms=True, +): parser = argparse.ArgumentParser(description=description) worker_args = parser.add_argument_group(description="Worker configuration") worker_args.add_argument( @@ -317,6 +322,24 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] if args.multi_node and len(args.hosts.split(",")) < 2: raise ValueError("--multi-node requires at least 2 hosts") + # Raise error early if "explicit-comms" is not allowed + if ( + check_explicit_comms + and args.backend == "explicit-comms" + and config.get( + "dataframe.query-planning", + None, + ) + is not False + ): + raise NotImplementedError( + "The 'explicit-comms' config is not yet supported when " + "query-planning is enabled in dask. Please use the legacy " + "dask-dataframe API by setting the following environment " + "variable before executing:", + " DASK_DATAFRAME__QUERY_PLANNING=False", + ) + return args diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index ca69156dd..3f7b79514 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -11,10 +11,12 @@ import dask import dask.config import dask.dataframe +import dask.dataframe as dd import dask.utils import distributed.worker from dask.base import tokenize -from dask.dataframe.core import DataFrame, Series, _concat as dd_concat, new_dd_object +from dask.dataframe import DataFrame, Series +from dask.dataframe.core import _concat as dd_concat from dask.dataframe.shuffle import group_split_dispatch, hash_object_dispatch from distributed import wait from distributed.protocol import nested_deserialize, to_serialize @@ -468,18 +470,19 @@ def shuffle( npartitions = df.npartitions # Step (a): - df = df.persist() # Make sure optimizations are apply on the existing graph + df = df.persist() # Make sure optimizations are applied on the existing graph wait([df]) # Make sure all keys has been materialized on workers + persisted_keys = [f.key for f in c.client.futures_of(df)] name = ( "explicit-comms-shuffle-" - f"{tokenize(df, column_names, npartitions, ignore_index)}" + f"{tokenize(df, column_names, npartitions, ignore_index, batchsize)}" ) df_meta: DataFrame = df._meta # Stage all keys of `df` on the workers and cancel them, which makes it possible # for the shuffle to free memory as the partitions of `df` are consumed. # See CommsContext.stage_keys() for a description of staging. - rank_to_inkeys = c.stage_keys(name=name, keys=df.__dask_keys__()) + rank_to_inkeys = c.stage_keys(name=name, keys=persisted_keys) c.client.cancel(df) # Get batchsize @@ -526,23 +529,26 @@ def shuffle( # TODO: can we do this without using `submit()` to avoid the overhead # of creating a Future for each dataframe partition? - dsk = {} + futures = [] for rank in ranks: for part_id in rank_to_out_part_ids[rank]: - dsk[(name, part_id)] = c.client.submit( - getitem, - shuffle_result[rank], - part_id, - workers=[c.worker_addresses[rank]], + futures.append( + c.client.submit( + getitem, + shuffle_result[rank], + part_id, + workers=[c.worker_addresses[rank]], + ) ) # Create a distributed Dataframe from all the pieces - divs = [None] * (len(dsk) + 1) - ret = new_dd_object(dsk, name, df_meta, divs).persist() + divs = [None] * (len(futures) + 1) + kwargs = {"meta": df_meta, "divisions": divs, "prefix": "explicit-comms-shuffle"} + ret = dd.from_delayed(futures, **kwargs).persist() wait([ret]) # Release all temporary dataframes - for fut in [*shuffle_result.values(), *dsk.values()]: + for fut in [*shuffle_result.values(), *futures]: fut.release() return ret diff --git a/dask_cuda/tests/test_dgx.py b/dask_cuda/tests/test_dgx.py index d57cf1a3c..41bfa6cb1 100644 --- a/dask_cuda/tests/test_dgx.py +++ b/dask_cuda/tests/test_dgx.py @@ -15,6 +15,10 @@ psutil = pytest.importorskip("psutil") +def _is_ucx_116(ucp): + return ucp.get_ucx_version()[:2] == (1, 16) + + class DGXVersion(Enum): DGX_1 = auto() DGX_2 = auto() @@ -102,9 +106,11 @@ def check_ucx_options(): ) def test_tcp_over_ucx(protocol): if protocol == "ucx": - pytest.importorskip("ucp") + ucp = pytest.importorskip("ucp") elif protocol == "ucxx": - pytest.importorskip("ucxx") + ucp = pytest.importorskip("ucxx") + if _is_ucx_116(ucp): + pytest.skip("https://github.com/rapidsai/ucx-py/issues/1037") p = mp.Process(target=_test_tcp_over_ucx, args=(protocol,)) p.start() @@ -217,9 +223,11 @@ def check_ucx_options(): ) def test_ucx_infiniband_nvlink(protocol, params): if protocol == "ucx": - pytest.importorskip("ucp") + ucp = pytest.importorskip("ucp") elif protocol == "ucxx": - pytest.importorskip("ucxx") + ucp = pytest.importorskip("ucxx") + if _is_ucx_116(ucp) and params["enable_infiniband"] is False: + pytest.skip("https://github.com/rapidsai/ucx-py/issues/1037") skip_queue = mp.Queue() diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 1f70fb2ca..f495648e0 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -22,14 +22,23 @@ from dask_cuda.explicit_comms.dataframe.shuffle import shuffle as explicit_comms_shuffle from dask_cuda.utils_test import IncreasedCloseTimeoutNanny +mp = mp.get_context("spawn") # type: ignore +ucp = pytest.importorskip("ucp") + +QUERY_PLANNING_ON = dask.config.get("dataframe.query-planning", None) is not False + # Skip these tests when dask-expr is active (for now) -pytestmark = pytest.mark.skipif( - dask.config.get("dataframe.query-planning", None) is not False, - reason="https://github.com/rapidsai/dask-cuda/issues/1311", +query_planning_skip = pytest.mark.skipif( + QUERY_PLANNING_ON, + reason=( + "The 'explicit-comms' config is not supported " + "when query planning is enabled." + ), ) -mp = mp.get_context("spawn") # type: ignore -ucp = pytest.importorskip("ucp") +# Set default shuffle method to "tasks" +if dask.config.get("dataframe.shuffle.method", None) is None: + dask.config.set({"dataframe.shuffle.method": "tasks"}) # Notice, all of the following tests is executed in a new process such @@ -89,6 +98,7 @@ def _test_dataframe_merge_empty_partitions(nrows, npartitions): pd.testing.assert_frame_equal(got, expected) +@query_planning_skip def test_dataframe_merge_empty_partitions(): # Notice, we use more partitions than rows p = mp.Process(target=_test_dataframe_merge_empty_partitions, args=(2, 4)) @@ -227,6 +237,7 @@ def check_shuffle(): check_shuffle() +@query_planning_skip @pytest.mark.parametrize("in_cluster", [True, False]) def test_dask_use_explicit_comms(in_cluster): def _timeout(process, function, timeout): @@ -289,6 +300,7 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers): assert_eq(got, expected) +@query_planning_skip @pytest.mark.parametrize("nworkers", [1, 2, 4]) @pytest.mark.parametrize("backend", ["pandas", "cudf"]) @pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"]) diff --git a/docs/source/api.rst b/docs/source/api.rst index b9d9d6dfa..1594594cc 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -33,3 +33,6 @@ Explicit-comms .. currentmodule:: dask_cuda.explicit_comms.comms .. autoclass:: CommsContext :members: + +.. currentmodule:: dask_cuda.explicit_comms.dataframe.shuffle +.. autofunction:: shuffle diff --git a/docs/source/examples/ucx.rst b/docs/source/examples/ucx.rst index 18c569ff1..7a0651173 100644 --- a/docs/source/examples/ucx.rst +++ b/docs/source/examples/ucx.rst @@ -2,7 +2,7 @@ Enabling UCX communication ========================== A CUDA cluster using UCX communication can be started automatically with LocalCUDACluster or manually with the ``dask cuda worker`` CLI tool. -In either case, a ``dask.distributed.Client`` must be made for the worker cluster using the same Dask UCX configuration; see `UCX Integration -- Configuration <../ucx.html#configuration>`_ for details on all available options. +In either case, a ``dask.distributed.Client`` must be made for the worker cluster using the same Dask UCX configuration; see `UCX Integration -- Configuration <../../ucx/#configuration>`_ for details on all available options. LocalCUDACluster with Automatic Configuration --------------------------------------------- @@ -29,7 +29,7 @@ To connect a client to a cluster with automatically-configured UCX and an RMM po LocalCUDACluster with Manual Configuration ------------------------------------------ -When using LocalCUDACluster with UCX communication and manual configuration, all required UCX configuration is handled through arguments supplied at construction; see `API -- Cluster <../api.html#cluster>`_ for a complete list of these arguments. +When using LocalCUDACluster with UCX communication and manual configuration, all required UCX configuration is handled through arguments supplied at construction; see `API -- Cluster <../../api/#cluster>`_ for a complete list of these arguments. To connect a client to a cluster with all supported transports and an RMM pool: .. code-block:: python @@ -148,7 +148,7 @@ We communicate to the scheduler that we will be using UCX with the ``--protocol` Workers ^^^^^^^ -All UCX configuration options have analogous options in ``dask cuda worker``; see `API -- Worker <../api.html#worker>`_ for a complete list of these options. +All UCX configuration options have analogous options in ``dask cuda worker``; see `API -- Worker <../../api/#worker>`_ for a complete list of these options. To start a cluster with all supported transports and an RMM pool: .. code-block:: bash @@ -163,7 +163,7 @@ To start a cluster with all supported transports and an RMM pool: Client ^^^^^^ -A client can be configured to use UCX by using ``dask_cuda.initialize``, a utility which takes the same UCX configuring arguments as LocalCUDACluster and adds them to the current Dask configuration used when creating it; see `API -- Client initialization <../api.html#client-initialization>`_ for a complete list of arguments. +A client can be configured to use UCX by using ``dask_cuda.initialize``, a utility which takes the same UCX configuring arguments as LocalCUDACluster and adds them to the current Dask configuration used when creating it; see `API -- Client initialization <../../api/#client-initialization>`_ for a complete list of arguments. To connect a client to the cluster we have made: .. code-block:: python diff --git a/docs/source/explicit_comms.rst b/docs/source/explicit_comms.rst index 56ad97758..aecbc1fd9 100644 --- a/docs/source/explicit_comms.rst +++ b/docs/source/explicit_comms.rst @@ -5,7 +5,7 @@ Communication and scheduling overhead can be a major bottleneck in Dask/Distribu The idea is that Dask/Distributed spawns workers and distribute data as usually while the user can submit tasks on the workers that communicate explicitly. This makes it possible to bypass Distributed's scheduler and write hand-tuned computation and communication patterns. Currently, Dask-CUDA includes an explicit-comms -implementation of the Dataframe `shuffle `_ operation used for merging and sorting. +implementation of the Dataframe `shuffle <../api/#dask_cuda.explicit_comms.dataframe.shuffle.shuffle>`_ operation used for merging and sorting. Usage @@ -14,4 +14,4 @@ Usage In order to use explicit-comms in Dask/Distributed automatically, simply define the environment variable ``DASK_EXPLICIT_COMMS=True`` or setting the ``"explicit-comms"`` key in the `Dask configuration `_. -It is also possible to use explicit-comms in tasks manually, see the `API `_ and our `implementation of shuffle `_ for guidance. +It is also possible to use explicit-comms in tasks manually, see the `API <../api/#explicit-comms>`_ and our `implementation of shuffle `_ for guidance. diff --git a/docs/source/index.rst b/docs/source/index.rst index 37ba12139..0d415cb0d 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -11,7 +11,7 @@ While Distributed can be used to leverage GPU workloads through libraries such a - **Automatic instantiation of per-GPU workers** -- Using Dask-CUDA's LocalCUDACluster or ``dask cuda worker`` CLI will automatically launch one worker for each GPU available on the executing node, avoiding the need to explicitly select GPUs. - **Automatic setting of CPU affinity** -- The setting of CPU affinity for each GPU is done automatically, preventing memory transfers from taking suboptimal paths. -- **Automatic selection of InfiniBand devices** -- When UCX communication is enabled over InfiniBand, Dask-CUDA automatically selects the optimal InfiniBand device for each GPU (see `UCX Integration `_ for instructions on configuring UCX communication). +- **Automatic selection of InfiniBand devices** -- When UCX communication is enabled over InfiniBand, Dask-CUDA automatically selects the optimal InfiniBand device for each GPU (see `UCX Integration `_ for instructions on configuring UCX communication). - **Memory spilling from GPU** -- For memory-intensive workloads, Dask-CUDA supports spilling from GPU to host memory when a GPU reaches the default or user-specified memory utilization limit. - **Allocation of GPU memory** -- when using UCX communication, per-GPU memory pools can be allocated using `RAPIDS Memory Manager `_ to circumvent the costly memory buffer mappings that would be required otherwise. diff --git a/docs/source/spilling.rst b/docs/source/spilling.rst index 28f3562b9..a237adf74 100644 --- a/docs/source/spilling.rst +++ b/docs/source/spilling.rst @@ -37,7 +37,7 @@ JIT-Unspill The regular spilling in Dask and Dask-CUDA has some significate issues. Instead of tracking individual objects, it tracks task outputs. This means that a task returning a collection of CUDA objects will either spill all of the CUDA objects or none of them. Other issues includes *object duplication*, *wrong spilling order*, and *non-tracking of sharing device buffers* -(see: https://github.com/dask/distributed/issues/4568#issuecomment-805049321). +(`see discussion `_). In order to address all of these issues, Dask-CUDA introduces JIT-Unspilling, which can improve performance and memory usage significantly. For workloads that require significant spilling diff --git a/docs/source/ucx.rst b/docs/source/ucx.rst index d9cacdc77..cf798e5dc 100644 --- a/docs/source/ucx.rst +++ b/docs/source/ucx.rst @@ -37,7 +37,7 @@ Automatic Beginning with Dask-CUDA 22.02 and assuming UCX >= 1.11.1, specifying UCX transports is now optional. -A local cluster can now be started with ``LocalCUDACluster(protocol="ucx")``, implying automatic UCX transport selection (``UCX_TLS=all``). Starting a cluster separately -- scheduler, workers and client as different processes -- is also possible, as long as Dask scheduler is created with ``dask scheduler --protocol="ucx"`` and connecting a ``dask cuda worker`` to the scheduler will imply automatic UCX transport selection, but that requires the Dask scheduler and client to be started with ``DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True``. See `Enabling UCX communication `_ for more details examples of UCX usage with automatic configuration. +A local cluster can now be started with ``LocalCUDACluster(protocol="ucx")``, implying automatic UCX transport selection (``UCX_TLS=all``). Starting a cluster separately -- scheduler, workers and client as different processes -- is also possible, as long as Dask scheduler is created with ``dask scheduler --protocol="ucx"`` and connecting a ``dask cuda worker`` to the scheduler will imply automatic UCX transport selection, but that requires the Dask scheduler and client to be started with ``DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True``. See `Enabling UCX communication <../examples/ucx/>`_ for more details examples of UCX usage with automatic configuration. Configuring transports manually is still possible, please refer to the subsection below. @@ -79,12 +79,12 @@ However, some will affect related libraries, such as RMM: .. note:: These options can be used with mainline Dask.distributed. However, some features are exclusive to Dask-CUDA, such as the automatic detection of InfiniBand interfaces. - See `Dask-CUDA -- Motivation `_ for more details on the benefits of using Dask-CUDA. + See `Dask-CUDA -- Motivation <../#motivation>`_ for more details on the benefits of using Dask-CUDA. Usage ----- -See `Enabling UCX communication `_ for examples of UCX usage with different supported transports. +See `Enabling UCX communication <../examples/ucx/>`_ for examples of UCX usage with different supported transports. Running in a fork-starved environment ------------------------------------- @@ -97,7 +97,7 @@ this when using Dask-CUDA's UCX integration, processes launched via multiprocessing should use the start processes using the `"forkserver" `_ -method. When launching workers using `dask cuda worker `_, this can be +method. When launching workers using `dask cuda worker <../quickstart/#dask-cuda-worker>`_, this can be achieved by passing ``--multiprocessing-method forkserver`` as an argument. In user code, the method can be controlled with the ``distributed.worker.multiprocessing-method`` configuration key in