Skip to content

Commit

Permalink
Use SKPlumberFitState object in SKPlumber.fit and return SearchResult…
Browse files Browse the repository at this point in the history
… tuple
  • Loading branch information
epeters3 committed Apr 22, 2020
1 parent da61b69 commit a6de18a
Showing 1 changed file with 67 additions and 35 deletions.
102 changes: 67 additions & 35 deletions skplumber/skplumber.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import typing as t
from typing import NamedTuple
from time import time

import pandas as pd
Expand All @@ -11,16 +12,35 @@
from skplumber.primitives.sk_primitives.classifiers import classifiers
from skplumber.primitives.sk_primitives.regressors import regressors
from skplumber.primitives.sk_primitives.transformers import transformers
from skplumber.metrics import default_metrics, metrics
from skplumber.metrics import default_metrics, metrics, Metric
from skplumber.utils import logger
from skplumber.evaluators import make_train_test_evaluator
from skplumber.tuners.ga import ga_tune
from skplumber.progress import EVProgress


class SKPlumberFitState:
def __init__(self, budget: int, metric: Metric) -> None:
self.starttime = time()
self.endbytime = self.starttime + budget
self.best_pipeline_min_tune_time = 0.0
self.best_score = metric.worst_value


class SearchResult(NamedTuple):
# total train time in seconds
time: float
# total number of pipelines the sampler tried
n_sample_iters: int
# total number of pipelines the hyperparameter tuner tried
n_tune_iters: int
# the best score SKPlumber was able to find
best_score: float


class SKPlumber:

models_map: t.Dict[ProblemType, t.List[t.Type[Primitive]]] = {
_models_map: t.Dict[ProblemType, t.List[t.Type[Primitive]]] = {
ProblemType.CLASSIFICATION: list(classifiers.values()),
ProblemType.REGRESSION: list(regressors.values()),
}
Expand Down Expand Up @@ -157,7 +177,7 @@ def __init__(
if callback:
self.sampler_cbs.append(callback)

def fit(self, X: pd.DataFrame, y: pd.Series) -> None:
def fit(self, X: pd.DataFrame, y: pd.Series) -> SearchResult:
"""
The main runtime method of the package. Given a dataset, problem type,
and sampling strategy, it tries to find, in a limited amount of time,
Expand All @@ -170,26 +190,28 @@ def fit(self, X: pd.DataFrame, y: pd.Series) -> None:
y : pandas.Series
The target vector of your dataset. The indices of `X` and `y`
should match up.
Returns
-------
result : SearchResult
A named tuple containing data about how the fit process went.
"""

# Initialize

if len(X.index) != y.size:
raise ValueError(f"X and y must have the same number of instances")

# The time we need to finish fitting by
self.starttime = time()
self.endtime = self.starttime + self.budget
self.best_pipeline_min_tune_time = 0.0
self.best_score = self.metric.worst_value
# A little encapsulation to make this `fit` method's code less huge.
self.state = SKPlumberFitState(self.budget, self.metric)

# Run

self.progress.start()
best_pipeline, best_score = self.sampler.run(
best_pipeline, best_score, n_sample_iters = self.sampler.run(
X,
y,
models=self.models_map[self.problem_type],
models=self._models_map[self.problem_type],
transformers=list(transformers.values()),
problem_type=self.problem_type,
metric=self.metric,
Expand All @@ -199,7 +221,7 @@ def fit(self, X: pd.DataFrame, y: pd.Series) -> None:
exit_on_pipeline_error=self.exit_on_pipeline_error,
)
self.best_pipeline = best_pipeline
self.best_score = best_score
self.state.best_score = best_score

logger.info(f"found best validation score of {best_score}")
logger.info("best pipeline:")
Expand All @@ -209,7 +231,7 @@ def fit(self, X: pd.DataFrame, y: pd.Series) -> None:
logger.info(
"now performing hyperparameter tuning on best found pipeline..."
)
best_tuning_score, best_tuning_params = ga_tune(
best_tuning_score, best_tuning_params, n_tune_iters = ga_tune(
self.best_pipeline,
X,
y,
Expand All @@ -221,11 +243,13 @@ def fit(self, X: pd.DataFrame, y: pd.Series) -> None:
),
callback=self._tuner_callback,
)
if self.metric.is_better_than(best_tuning_score, self.best_score):
if self.metric.is_better_than(best_tuning_score, self.state.best_score):
# The hyperparameter tuning was able to find an
# improvement.
self.best_score = best_tuning_score
self.state.best_score = best_tuning_score
self.best_pipeline.set_params(best_tuning_params)
else:
n_tune_iters = 0

# Now that we have the "best" model, train it on
# the full dataset so it can see as much of the
Expand All @@ -234,17 +258,21 @@ def fit(self, X: pd.DataFrame, y: pd.Series) -> None:
self.best_pipeline.fit(*shuffle(X, y))

logger.info(
f"finished. total execution time: {time() - self.starttime:.2f} seconds."
"finished. total execution time: "
f"{time() - self.state.starttime:.2f} seconds."
)
logger.info(f"final best score found: {self.best_score}")
logger.info(f"final best score found: {self.state.best_score}")

# Decomission temporary variables
del self.starttime
del self.endtime
del self.best_pipeline_min_tune_time
result = SearchResult(
time() - self.state.starttime,
n_sample_iters,
n_tune_iters,
self.state.best_score,
)

# Fitting completed successfully
self.is_fitted = True
return result

def predict(self, X: pd.DataFrame) -> pd.Series:
"""
Expand All @@ -257,32 +285,34 @@ def predict(self, X: pd.DataFrame) -> pd.Series:
)
return self.best_pipeline.predict(X)

def _sampler_cb(self, state: SamplerState) -> bool:
def _sampler_cb(self, sampler_state: SamplerState) -> bool:
# Decide how much time is left available to us in
# the sampling phase.
if self.tune:
# We want to leave enough time in the budget to be able
# to complete at least one generation of hyperparameter tuning.
if self.metric.is_better_than(state.score, self.best_score):
self.best_score = state.score
if self.metric.is_better_than(sampler_state.score, self.state.best_score):
self.state.best_score = sampler_state.score
# An estimate of how long it will take to complete one
# generation of hyperparameter tuning on this current
# best pipeline.
self.best_pipeline_min_tune_time = (
state.train_time
* state.pipeline.num_params
self.state.best_pipeline_min_tune_time = (
sampler_state.train_time
* sampler_state.pipeline.num_params
* self.tuning_mult_factor
)
sampling_endtime = self.endtime - self.best_pipeline_min_tune_time
sampling_endtime = (
self.state.endbytime - self.state.best_pipeline_min_tune_time
)
else:
sampling_endtime = self.endtime
sampling_endtime = self.state.endbytime

now = time()
logger.info(f"{sampling_endtime - now:.2f} seconds left in sampling budget")

# Logic for tracking sampler progress and exiting when the cost
# of finding a new best score is too great.
self.progress.observe(state.score)
self.progress.observe(sampler_state.score)

exit_early = False
if now > sampling_endtime:
Expand All @@ -305,13 +335,15 @@ def _sampler_cb(self, state: SamplerState) -> bool:

return exit_early

def _tuner_callback(self, state: dict) -> bool:
def _tuner_callback(self, tuner_state: dict) -> bool:
now = time()
logger.info(
f"candidate pipeline in generation {state['nit']} finished. "
f"{self.endtime - now:.2f} seconds left in budget."
f"candidate pipeline in generation {tuner_state['nit']} finished. "
f"{self.state.endbytime - now:.2f} seconds left in budget."
)
logger.info(f"best score found so far: {tuner_state['fun']}")
logger.info(
f"best hyperparameter config found so far: {tuner_state['kwargs_opt']}"
)
logger.info(f"best score found so far: {state['fun']}")
logger.info(f"best hyperparameter config found so far: {state['kwargs_opt']}")
# We need to quit early if our time budget is used up.
return True if time() > self.endtime else False
return True if time() > self.state.endbytime else False

0 comments on commit a6de18a

Please sign in to comment.