Skip to content

Latest commit

 

History

History
299 lines (215 loc) · 29 KB

ch11.asciidoc

File metadata and controls

299 lines (215 loc) · 29 KB

Machine Learning with Dask

Now that you know Dask’s many different data types, computation patterns, deployment options, and libraries, we are ready to tackle machine learning. You will quickly find that ML with Dask is quite intuitive to use, as it runs on the same Python environment as the many other popular ML libraries. Much of the heavy work is done by Dask’s built-in data types and Dask’s distributed schedulers, making writing code an enjoyable experience for the user.[1]

This chapter will primarily use the Dask-ML library, a robustly supported ML library from the Dask open source project, but we will also highlight other libraries, such as XGBoost and scikit-learn. The Dask-ML library is designed to run both in clusters and locally.[2] Dask-ML provides familiar interfaces by extending many common ML libraries. ML is different from many of the tasks discussed so far, as it requires the framework (here Dask-ML) to coordinate work more closely. In this chapter we’ll show some of the ways you can use it in your own programs, and we’ll also offer tips.

Since ML is such a wide and varied discipline, we are able to cover only some of the situations where Dask-ML is useful. This chapter will discuss some of the common work patterns, such as exploratory data analysis, random split, featurization, regression, and deep learning inferences, from a practitioner’s perspective on ramping up on Dask. If you don’t see your particular library or use case represented, it may still be possible to accelerate with Dask, and you should look at Dask-ML’s API guide. However, ML is not Dask’s primary focus, so you may find that you need to use other tools, like Ray.

Parallelizing ML

Many ML workloads face scaling challenges in two dimensions: model size and data size. Training models with large features or components, like many deep learning models, often become compute-bound, where training, predicting, and evaluating the model becomes slow and harder to manage. On the other hand, many ML models, even seemingly simple ones like regression, often get stretched to their limits with large volumes of training datasets that don’t fit into one machine, getting memory-bound in their scaling challenges.

On memory-bound workloads, Dask’s high-level collections that we have covered (such as Dask array, DataFrame, and bag) combine with Dask-ML libraries to offer native scaling. For compute-bound workloads, Dask parallelizes training through integrations such as Dask-ML and Dask-joblib. In the case of scikit-learn, Dask can manage cluster-wide work allocation, using Dask-joblib. You might notice each workflow requires a different library; this is because each ML tool uses its own method of parallelization that Dask extends.

You can use Dask in conjunction with many popular machine learning libraries, including scikit-learn and XGBoost. You may already be familiar with single-machine parallelism inside your favorite ML library. Dask takes these single-machine frameworks, like Dask-joblib, and extends them to machines connected over the network.

When to Use Dask-ML

Dask excels in parallel tasks with limited distributed mutable state (like large model weights). Dask is commonly used for inference/predictions on ML models, which is simpler than training. Training models, on the other hand, often require more inter-worker communication in the form of model weight updates and repeated loops, with sometimes variable amounts of computation per training cycle. You can use Dask for both of these use cases, but adoption and tooling for training is not as widespread.

Dask’s integration with common data preparation tools—including pandas, NumPy, PyTorch, and TensorFlow—makes it easier to build inference pipelines. In JVM-based tools, like Spark, working with those libraries comes at a higher overhead.

Another great use case for Dask is feature engineering and plotting large datasets before training. Dask’s pre-processing functions often use the same signatures, and functions the same way as scikit-learn, but it distributes the work across machines. Similarly with plotting and visualization, Dask is able to generate a beautiful plot of a large dataset beyond the usual limits of matplotlib/seaborn.

For more involved ML and deep learning work, some users opt to generate PyTorch or TensorFlow models separately and then use the models generated for inference workloads using Dask. This keeps the workload on the Dask side embarrassingly parallel. Alternatively, some users opt to write the training data as a Dask DataFrame using the delayed pattern, which is fed into Keras or Torch. Be warned that there is a medium level of effort to do so.

As discussed in previous chapters, the Dask project is still in the early phase of its life, and some of these libraries are still a work in progress and come with disclaimers. We took extra caution to validate most of the numerical methods used in the Dask-ML library to make sure the logic and mathematics are sound and work as expected. However, some dependent libraries come with warnings that it’s not yet ready for prime time, especially as it relates to GPU-aware workloads and massively distributed workloads. We expect these to get sorted out as the community grows and users contribute their feedback.

Getting Started with Dask-ML and XGBoost

Dask-ML is the officially supported ML library for Dask. Here, we will walk through the functionalities provided in the Dask-ML API; how it connects Dask, pandas, and scikit-learn into its functions; and some differences between Dask and its scikit-learn equivalents. Additionally, we will walk through a few XGBoost gradient boost integrations. We will primarily use the New York City yellow taxicab data we used previously to walk through examples. You can access the dataset directly from the New York City website.

Feature Engineering

As with any good data science workflow, we start with clean-up, applying scalers, and transforms. Dask-ML has drop-in replacements for most of the pre-processing API from scikit-learn, including StandardScaler, PolynomialFeatures, MinMax​Scaler, etc.

You can pass multiple columns to the transformers, and each will be normalized, resulting in a delayed Dask DataFrame that you should call compute on.

In Dask DataFrame pre-processing with StandardScaler, we scale trip distances, which are in miles, and total amount, which is in dollars, to their own scaled variables. This is a continuation of the exploratory data analysis we did in [ch04].

Example 1. Dask DataFrame pre-processing with StandardScaler
link:./examples/dask/Dask-Ch13_ML.py[role=include]

For categorical variables, while there is OneHotEncoder in Dask-ML, it’s not as efficient or as one-to-one in replacement as its scikit-learn equivalent. At this point we recommend using Categorizer to encode a categorical dtype.[3]

Dask DataFrame pre-processing as categorical variable using Dask-ML shows how you would categorize a particular column while preserving the existing DataFrame. We select payment_type, which is encoded originally as an integer but is really a four-category categorical variable. We call Dask-ML’s Categorizer, while using pandas’s CategoricalDtype to give type hints. While Dask does have type inference (e.g., it can auto-infer the type), it is always better to be explicit in your program.

Example 2. Dask DataFrame pre-processing as categorical variable using Dask-ML
link:./examples/dask/Dask-Ch13_ML.py[role=include]

Alternatively, you can opt to use Dask DataFrame’s built-in categorizer. While pandas is permissive with Object and String as categorical data types, Dask will reject these columns unless they are read first as a categorical variable. There are two ways you can do this: declare a column as categorical at reading the data, with dtype=\{col: categorical}, or convert before invoking get_dummies, by using df​.catego⁠rize(“col1”). The reasoning here is that Dask is lazily evaluated and cannot create a dummy variable out of a column without having a full list of unique values seen. Calling .categorize() is convenient and allows for dynamic handling of additional categories, but keep in mind that it does need to scan the entire column first to get the categories then do another full scan to transform the column. So if you know the categories already and they won’t change, you should just invoke DummyEncoder.

Dask DataFrame pre-processing as categorical variable using the Dask DataFrame built-in categorizes multiple columns at once. Nothing is materialized until you call execute, so you can chain many of these pre-processes at once.

Example 3. Dask DataFrame pre-processing as categorical variable using the Dask DataFrame built-in
link:./examples/dask/Dask-Ch13_ML.py[role=include]

DummyEncoder is the Dask-ML equivalent to scikit-learn’s OneHotEncoder, which will turn the variables into uint8, an eight-bit unsigned integer, which is more memory efficient.

Again, there is a Dask DataFrame function that gives you a similar result. Dask DataFrame pre-processing as dummy variable using the Dask DataFrame built-in demonstrates this on categorical columns, and Dask DataFrame pre-processing datetime as dummy variable using the Dask DataFrame built-in pre-processes datetime. Datetime can be tricky to work with. In this case, Python natively deserializes the datetime. Remember to always check datetime conversion and apply the necessary transforms beforehand.

Example 4. Dask DataFrame pre-processing as dummy variable using the Dask DataFrame built-in
link:./examples/dask/Dask-Ch13_ML.py[role=include]
Example 5. Dask DataFrame pre-processing datetime as dummy variable using the Dask DataFrame built-in
link:./examples/dask/Dask-Ch13_ML.py[role=include]

Dask-ML’s train_test_split method has more flexibility than the Dask DataFrames version. Both are partition-aware, and we using them instead of the scikit-learn equivalent. scikit-learn’s train_test_split can be called here, but it is not partition-aware and can result in a large data movement between workers, whereas the Dask implementations would split the train-test over each partition, avoiding the shuffle (see Dask DataFrame pseudorandom split).

Example 6. Dask DataFrame pseudorandom split
link:./examples/dask/Dask-Ch13_ML.py[role=include]

As a side effect of random splits happening by each partition block, the random behavior is not uniformly guaranteed over the whole of the DataFrame. If you suspect that some partition may have skews, you should compute, redistribute, and then shuffle-split.

Model Selection and Training

Many of scikit-learn’s model-selection-related functions, including cross-validation, hyperparameter search, clustering, regression, imputation, and scoring methods, are ported into Dask as a drop-in replacement. There are a few noteworthy improvements that make the functions run more efficiently than a simple parallel computing architecture, by using Dask’s task-graph views.

Most regression-based models have been implemented for Dask use and can be used as a replacement for scikit-learn.[4] Many scikit-learn users would be familiar with the task of .reshape() for pandas, needing them to convert a pandas DataFrame into a 2D array in order for scikit-learn to work. For some Dask-ML functions you still need to also call ddf.to_dask_array() in order to convert a DataFrame to an array before training. Lately, some Dask-ML has been improved to directly work on Dask DataFrames, but not all libraries.

Linear regression with Dask-ML runs through a straightforward multi-variate linear regression using Dask-ML. Say you want to build a regression model on two predictor columns and one output column. You would apply .to_array() to convert the data type to Dask arrays and then pass them into Dask-ML’s implementation of LinearRegression. Note how we needed to materialize the conversion into arrays, and we gave explicit chunk size. This is because Dask-ML’s underlying implementation with linear models is not quite fully able to infer chunk sizes from previous steps. We also purposefully use scikit-learn’s scoring library, not Dask-ML. We are noticing implementation issues where Dask-ML doesn’t play well with chunk sizes.[5] Thankfully, at this point, this calculation is a reduce step, which works without any Dask-specific logic.[6]

Example 7. Linear regression with Dask-ML
link:./examples/dask/Dask-Ch13_ML.py[role=include]

Note that function parameters for models for scikit-learn and Dask-ML are identical, but some are not supported as of now. For example, LogisticRegression is available in Dask-ML, but multi-class solver is not supported, meaning that there is no exact equivalent for multi-class solvers implemented in Dask-ML yet. So, if you want to use multinomial loss solver newton-cg or newton-cholesky, it might not work. For most uses of LogisticRegression, a default liblinear solver would do the trick. In practice, this limitation would pertain only to more niche and advanced use cases.

For hyperparameter search, Dask-ML has the scikit-learn equivalent of GridSearchCV for exhaustive search over parameter values, and RandomizedSearchCV for randomly trying hyperparameters from a list. These can be run directly, similar to its scikit-learn variant, if the data and resulting model do not require much scaling.

Cross-validation and hyperparameter tuning often is a costly process even with a smaller dataset, as anyone who has run the scikit-learn cross-validate would attest. Dask users often deal with datasets large enough that use of exhaustive search algorithms is not feasible. As an alternative, Dask-ML implements several additional adaptive algorithms and hyperband-based methods that approach the tuned parameter more quickly with robust mathematical foundation.[7] The authors of the HyperbandSearchCV methods do ask that the use be cited.[8]

When There Is No Dask-ML Equivalent

If there is a function that exists in scikit-learn or other data science libraries but not in Dask-ML, you can write the distributed version of your desired code. After all, Dask-ML can be thought of as a convenience wrapper around scikit-learn.

Linear regression with Dask-ML uses scikit-learn’s learning functions SGDRegressor and LinearRegression, and uses dask.delayed to wrap the delayed functionality around the method. You can do this over any piece of code you may want to parallelize.

Example 8. Linear regression with Dask-ML
link:./examples/dask/Dask-Ch13_ML.py[role=include]

Use with Dask’s joblib

Alternatively, you can use scikit-learn along with joblib (see Parallelizing computation using joblib), a package that can take any Python function as pipelined steps to be computed on a single machine. Joblib works well with a lot of parallel computations that are not dependent on each other. In this case, having hundreds of cores on a single machine would be helpful. While a typical laptop does not have hundreds of cores, using the four or so that it does have can still be beneficial. With Dask’s version of joblib you can use cores from multiple machines. This can work for ML workloads that are compute-bound on a single machine.

Example 9. Parallelizing computation using joblib
link:./examples/dask/Dask-Ch13_ML.py[role=include]

XGBoost with Dask

XGBoost is a popular Python gradient boosting library, used for parallel tree boosting. Well-known gradient boosting methods include bootstrap aggregation (bagging). Various gradient boosting methods have been used in high-energy physics data analysis at the Large Hadron Collider, used to train deep neural networks to confirm the discovery of the Higgs boson. Gradient boosting methods are currently used in scientific areas such as geological or climate studies. Given its importance, we found XGBoost on Dask-ML to be a well-implemented library, ready for users.

Dask-ML has built-in support for XGBoost to work with Dask arrays and DataFrames. By using XGBClassifiers within Dask-ML, you will be setting up XGBoost in distributed mode, which works with your Dask cluster. When you do so, XGBoost’s master process lives in Dask scheduler, and XGBoost’s worker processes will be on Dask’s worker processes. The data distribution is handled using Dask DataFrame, split into pandas DataFrame, and is talking between Dask worker and XGBoost worker on the same machine.

XGBoost uses a DMatrix (data matrix) as the standard data format it works with. XGBoost has a built-in Dask-compatible DMatrix, which takes in Dask array and Dask DataFrame. Once the Dask environment is set up, the use of gradient booster is as you would expect. Specify the learning rate, threads, and objective functions, as usual. Gradient-boosted trees with Dask-ML works with a Dask CUDA cluster and runs a standard gradient booster training.

Example 10. Gradient-boosted trees with Dask-ML
link:./examples/dask/Dask-Ch13_ML.py[role=include]

In Dask-ML using the XGBoost library, we run a simple training run and plot feature importance. Note when we define DMatrix, we explicitly specify the labels, and the label names are taken from Dask DataFrame into DMatrix.

Example 11. Dask-ML using the XGBoost library
link:./examples/dask/Dask-Ch13_ML.py[role=include]

Putting the previous examples together, you can now compose a function that can fit a model, provide early stopping arguments, and also define predictions using XGBoost for Dask (see Gradient-boosted tree training and inference using the Dask XGBoost library). These would be called in your main client code.

Example 12. Gradient-boosted tree training and inference using the Dask XGBoost library
link:./examples/dask/Dask-Ch13_ML.py[role=include]

ML Models with Dask-SQL

A much newer addition is another library, Dask-SQL, that provides a convenient wrapper around simple ML model training workloads. Registering datasets into Dask-SQL loads the same NYC yellow taxicab data as a Dask DataFrame and then registers the view to Dask-SQL context.

Example 13. Registering datasets into Dask-SQL
link:./examples/dask/Dask-Ch13_ML.py[role=include]

Dask-SQL implements similar ML SQL language to BigQuery ML, allowing you to simply define models, define the training data as a SQL select statement, and then run inference on a different select statement as well.

You can define the model with most of the ML models we discussed, which runs the scikit-learn ML models in the background. In Defining, training, and predicting a linear regression on Dask-SQL, we train the LinearRegression model we trained earlier, using Dask-SQL. We first define the model, telling it to use scikit-learn’s LinearRegression, and the target column. Then we feed the training data with requisite columns. You can inspect the model trained using the DESCRIBE statement; then you can see in the FROM PREDICT statement how the model is used to run inference on another SQL-defined dataset.

Example 14. Defining, training, and predicting a linear regression on Dask-SQL
link:./examples/dask/Dask-Ch13_ML.py[role=include]

Similarly, as shown in Defining, training, and predicting a classifier built using XGBoost with Dask-SQL, you can run classification models, similar to the XGBoost model we have discussed earlier using the Dask-ML library.

Example 15. Defining, training, and predicting a classifier built using XGBoost with Dask-SQL
link:./examples/dask/Dask-Ch13_ML.py[role=include]

Inference and Deployment

Regardless of the libraries chosen to train and validate your model (which could be using some of the Dask-ML libraries, or trained without using Dask at all), here are some of the considerations to keep in mind when using Dask for model inference deployment.

Distributing Data and Models Manually

When loading data and pre-trained models to Dask workers, dask.delayed is the main tool (see Loading large files on Dask workers). When distributing data, you should choose to use Dask’s collections: array and DataFrame. As you recall from [ch04], each Dask DataFrame is made up of a pandas DataFrame. This is useful since you can write a method that takes each smaller DataFrame and returns a computed output. Custom functions and tasks can also be given per partition using Dask DataFrame’s map​_par⁠titions function.

Remember to use delayed notation if you are reading in a large dataset, to delay materialization and avoid reading in unnecessarily early.

Tip

map_partitions passes in a row-wise operation that is meant to be fit into a serializable code that is marshaled to workers. You can define a custom class that handles inference to be called, but a static method needs to be called, not an instance-dependent method. We covered this further in [ch04].

Example 16. Loading large files on Dask workers
link:./examples/dask/Dask-Ch13_ML.py[role=include]

Large-Scale Inferences with Dask

When using Dask for inference on scale, you would distribute trained models to each worker, and then distribute Dask collections (DataFrame or array) over these partitions to work on a portion of the collection at a time, parallelizing the workflow. This strategy would work well in a straightforward inference deployment. We will cover one of the ways to achieve this: defining the workflow manually using map_partitions, and then wrapping existing functions with PyTorch or Keras/​Ten⁠sor⁠Flow models. For PyTorch-based models, you can wrap Skorch with the model, which allows it to be used with the Dask-ML API. For TensorFlow models, you would use SciKeras to create a scikit-learn-compatible model, which would allow it to be used for Dask-ML. For PyTorch, the dask-pytorch-ddp library from SaturnCloud is currently the most widely used. As for Keras and TensorFlow, be aware that while it’s doable, there are some issues with TensorFlow not liking some of its threads being moved to other workers.

The most generic way to deploy inference is using Dask DataFrame’s map_partitions (see Distributed inference using Dask DataFrame). You can take your custom inference function that will be run on each row, with the data mapped over each worker by partition.

Example 17. Distributed inference using Dask DataFrame
link:./examples/dask/Dask-Ch13_ML.py[role=include]

One of the interesting ways that Dask offers more than other scalable libraries is flexibility in parallel behavior. In the preceding example, we define a function that works row-wise and then give that function to a partition-wise logic that will be run by each partition over the entire DataFrame. We can use this as a boilerplate to define more fine-grained batched functions (see Distributed inference using Dask DataFrame). Keep in mind that behaviors you define within the row-wise function should be free of side effects, as in, you should avoid mutating the inputs to the function, as is the general best practice in Dask distributed delayed computations. Also, as the comments in the preceding example say, if you do .apply within a partition-wise lambda, this calls .apply() from pandas. Within Pandas, .apply() defaults to axis = 0, so if you want otherwise, you should remember to specify axis = 1.

Example 18. Distributed inference using Dask DataFrame
link:./examples/dask/Dask-Ch13_ML.py[role=include]

Conclusion

In this chapter, you have learned how to use the building blocks of Dask to write data science and ML workflows, combining core Dask libraries with other ML libraries you might be familiar with to achieve your desired task. You have also learned how you can use Dask to scale both compute- and memory-bound ML workloads.

Dask-ML provides an almost functionally equivalent library to scikit-learn, oftentimes calling scikit-learn with the additional awareness of task and data parallelism that Dask brings. Dask-ML is actively being developed by the community and will evolve to add more use cases and examples. Check the Dask documentation for the latest updates.

In addition, you have learned methods of parallelizing ML training with models from scikit-learn libraries by using joblib for compute-intensive workloads, and batched operations for data-intensive workloads, so that you can write any custom implementations yourself.

Finally, you have learned the use cases for Dask-SQL and its SQL ML statements in providing high-level abstraction for model creation, hyperparameter tuning, and inference.

Since ML can be very computation- and memory-heavy, it’s important to deploy your ML work on a correctly configured cluster and monitor the progress and output closely. We will cover deployment, profiling, and troubleshooting in the next chapter.


1. For those inclined to think that writing data engineering code is “fun.”
2. This is especially important for non-batch inference, where being able to use the same code can be of great benefit.
3. For performance reasons—at the time of writing, Dask’s OneHotEncoder calls the get_dummies method from pandas, which is a slower implementation than scikit-learn’s OneHotEncoder. Categorizer, on the other hand, uses a Dask DataFrame aggregation method to scan through categories efficiently.
4. Most linear models in Dask-ML use a base implementation of the Generalized Linear Model library that has been implemented for Dask. We have verified the code for mathematical correctness, but the writers of this library have not endorsed the use of their GLM library for prime time yet.
5. Dask-ML version 2023.3.24; some of the generalized linear models used rely on dask-glm 0.1.0.
6. Because it’s a simple reduce operation, we don’t need to preserve the chunking from previous steps.
7. Dask-ML’s own documentation has more info on adaptive and approximation CV methods implemented and use cases.
8. They note in the documentation that the following paper should be cited if using this method: S. Sievert, T. Augspurger, and M. Rocklin, “Better and Faster Hyperparameter Optimization with Dask,” Proceedings of the 18th Python in Science Conference (2019), https://doi.org/10.25080/Majora-7ddc1dd1-011.