Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example of a sklearn like pipeline #294

Merged
merged 7 commits into from
Dec 7, 2023
Merged
104 changes: 104 additions & 0 deletions spec/API_specification/examples/03_sklearn_like_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""
This is an example of how a (possibly) lazy data frame may be used
in a sklearn-like pipeline.

The example is motivated by the prospect of a fully lazy, ONNX-based
data frame implementation. The concept is that calls to `fit` are
eager. They compute some state that is later meant to be transferred
into the ONNX graph/model. That transfer happens when calling
`transform`. The logic within the `transform` methods is "traced"
lazily and the resulting lazy object is then exported to ONNX.
"""
from __future__ import annotations

from typing import Any, TYPE_CHECKING, Self

from dataframe_api.dataframe_object import DataFrame


class Scaler:
"""Apply a standardization scaling factor to `column_names`."""
scalings_: dict[str, float]

def __init__(self, column_names: list[str]):
self.column_names = column_names

def fit(self, df: DataFrame) -> Self:
"""Compute scaling factors from given data frame.

Calling this function requires collecting values.
"""
scalings = df.select(self.column_names).std()
if hasattr(scalings, 'collect'):
scalings = scalings.collect()

self.scalings_ = {
# Note: `get_value` returns an implemenation-defined,
# duck-typed scalar which may be lazy.
column_name: scalings.get_column_by_name(column_name).get_value(0)
for column_name in self.column_names
}

return self

def transform(self, df: DataFrame) -> DataFrame:
"""Apply the "trained" scaling values.

This function is guaranteed to not collect values.
"""
for column_name in df.column_names:
if not column_name in self.column_names:
continue
column = df.get_column_by_name(column_name) / self.scalings_[column_name]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kkraus14 just to clarify about scalar issue - should this be written as

column = df.get_column_by_name(column_name) / float(self.scalings_[column_name])

instead, so that __float__ is called and each implementation knows what to do (e.g. trigger compute, raise, return eager value, ...)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that would force materialization unnecessarily. self.scalings_ items get defined via scalings.get_column_by_name(column_name).get_value(0) which could return a ducktyped lazy scalar, and then this division could be lazy as well.

I.E. how this works in cuDF today is that the get_value(0) call returns a cudf.Scalar object which lives on the GPU, and then the __div__ operation with the column can use that on-GPU value directly.

Adding float(...) requires returning a Python float, which then prevents doing things like the above.

Copy link
Contributor

@MarcoGorelli MarcoGorelli Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for explaining, OK with not using 'float' here

Sorry to belabour the point, but the "lazy scalar" here is the part which I'm just not getting

A fairly common pattern I used to work with when I did data science was:

  • train model in some environment (the fit part, potentially expensive)
  • deploy model somewhere (the transform / predict part, usually much cheaper)

E.g.: train a model somewhere, and save some weights. Then, say on a device (like a smart watch), make predictions using that model (and the saved weights)

By the time you're doing inference (.transform), doesn't exist any more - at least, not in the environment you're doing inference on

So how can something calculated during the fit phase stay lazy?

I think this is what @cbourjau was getting to as well, saying that for fit, they need to materialise values

Copy link
Collaborator

@kkraus14 kkraus14 Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's an implementation detail of the library in question? The output of fit is effectively self.scalings_ which is a dictionary of ducktyped scalars. Those ducktyped scalars could effectively be an expression and then the __div__ in .transform just continues to build expressions until something triggers materialization, whether that's a call to something like __float__ or __bool__ or something else.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue I have is that I would typically do:

  • fit in one environment
  • predict in a different environment (e.g. on a smartwatch)

If the "something triggers materialization" happens during predict, then the expression is going to fail, because the dataframe which is meant to be used for training isn't available any more

What I want to do is:

  • fit in one environment, and force materialisation so I can export the model weights
  • predict in a different environment, using the model weights calculated during the fit phase

The way the code is written in this PR achieves this by doing

        if hasattr(scalings, 'collect'):
            scalings = scalings.collect()

during the fit phase. And we may have to be OK with this if we can't agree on a solution which can be part of the standard

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's the only option. I think we want a .may_execute() type method (ignore the exact method name here), which in the example above (and also @cbourjau's ONNX library which has lazy arrays) can simply be do-nothing, while Polars could alias that to .collect(). It would address the concern about implicit materialization, making it explicit.

It wouldn't be super ergonomic to write it as .may_execute().to_array(), but probably better than throwing - which isn't a solution really.

This solution is kinda circling back to a small part of Marco's lazy/eager design, without the separate classes or expression objects.

Other thought on Polars there: it could also choose to do an optimization pass, where e.g. it could figure out for a block of code with multiple may_execute's which ones have to collect and which ones don't. That'd actually be generically useful outside of dataframe API support, because the manual "must execute" imperative nature of .collect is not so easy to write optimally (as we already discovered in another discussion on this repo).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may_execute sounds fine, thanks

btw yesterday I saw a colleague write this

Has anyone seen hvplot causing 3 identical computes when working off of a dask dataframe? (version 0.9.0) I dont know if I should be calling something differently or if this is a bug?

. If I was -1 on automated collection before, now I'm more like -10 😄

Copy link
Contributor

@MarcoGorelli MarcoGorelli Oct 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that in skrub.DatetimeEncoder.fit, even Dask would throw an error telling the user to call .compute

I've added an example of this in #307, as well as a proposal to add maybe_execute as a hint rather than as a directive

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is it rather a predefined relationship where any dataframe implementation specifies exactly one array API implementation?

I think it's this - each implementation can choose which array library to defer to when someone calls to_array. For cudf it would probably be cupy, for pandas/polars numpy

If we have a 1:1 relationship between the dataframe and the array API standard then we have successfully kicked the can down the road (i.e. tech-stack) into the array API standard. Calling to_array will always trigger materialization if the associated array-API implementation is eager. If the associated array-API implementation is lazy, then it will remain lazy. Would this be acceptable for the Pandas/Polars-backed implementations? I'm afraid I don't quite see the intention behind may_execute?

The above-described semantics would work well with ONNX as far as the dataframe API is concerned. We are left with the same question of explicit materialization on the array API level, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling to_array will always trigger materialization if the associated array-API implementation is eager. [...] Would this be acceptable for the Pandas/Polars-backed implementations?

I'm afraid not, sorry: #307 (comment)

# Note: `assign` is not in-place
df = df.assign(column)

return df

class FeatureSelector:
"""Limit columns to those seen in training including their order."""

def fit(self, df: DataFrame) -> Self:
"""Record the observed columns and their order.

This function is guaranteed to not collect values.
"""
self.columns_ = df.column_names
return self

def transform(self, df: DataFrame) -> DataFrame:
"""Select and sort the columns as observed in training.

This function is guaranteed to not collect values.
"""
# Note: This assumes that select ensures the column order.
return df.select(self.columns_)


class Pipeline:
"""Linear pipeline of transformers."""

def __init__(self, steps: list[Any]):
self.steps = steps

def fit(self, df: DataFrame) -> Self:
"""Call fit on the steps of the pipeline subsequently.

Calling this function may trigger a collection.
"""
for step in self.steps:
step.fit(df)

self.steps_ = self.steps
return self

def transform(self, df: DataFrame) -> DataFrame:
"""Call transform on all steps of this pipeline subsequently.

This function is guaranteed to not trigger a collection.
"""
for step in self.steps_:
df = step.transform(df)

return df