Skip to content

Commit

Permalink
Adding levels to distributed backends (#581)
Browse files Browse the repository at this point in the history
* Finished cross validation

* Fixing exogenous variables

* Running tests

* Adding tests

* Cleaned notebook

* Cleaning notebook
  • Loading branch information
kvnkho authored Jul 11, 2023
1 parent a6eff82 commit 6ed5bd3
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 151 deletions.
22 changes: 17 additions & 5 deletions action_files/test_dask.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
import dask.dataframe as dd
import pytest

from statsforecast.utils import generate_series
from .utils import pipeline
from .utils import pipeline, pipeline_with_level

def test_dask_flow():
n_series = 2
horizon = 7
@pytest.fixture()
def n_series():
return 2

@pytest.fixture()
def sample_data(n_series):
series = generate_series(n_series).reset_index()
series['unique_id'] = series['unique_id'].astype(str)
series = dd.from_pandas(series, npartitions=2)
pipeline(series, n_series, horizon)
return series

def test_dask_flow(sample_data, n_series):
horizon = 7
pipeline(sample_data, n_series, horizon)

def test_dask_flow_with_level(sample_data, n_series):
horizon = 7
pipeline_with_level(sample_data, n_series, horizon)
24 changes: 17 additions & 7 deletions action_files/test_ray.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import pytest
import ray

from statsforecast.utils import generate_series
from .utils import pipeline
from .utils import pipeline, pipeline_with_level

def test_ray_flow():
n_series = 2
horizon = 7
@pytest.fixture()
def n_series():
return 2

@pytest.fixture()
def sample_data(n_series):
series = generate_series(n_series).reset_index()
series['unique_id'] = series['unique_id'].astype(str)
ctx = ray.data.context.DatasetContext.get_current()
ctx.use_streaming_executor = False
series = ray.data.from_pandas(series).repartition(2)
pipeline(series, n_series, horizon)
return series

def test_ray_flow(sample_data, n_series):
horizon = 7
pipeline(sample_data, n_series, horizon)

def test_ray_flow_with_level(sample_data, n_series):
horizon = 7
pipeline_with_level(sample_data, n_series, horizon)
21 changes: 17 additions & 4 deletions action_files/test_spark.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
import pytest
from pyspark.sql import SparkSession

from statsforecast.utils import generate_series
from .utils import pipeline
from .utils import pipeline, pipeline_with_level

def test_spark_flow():
@pytest.fixture()
def n_series():
return 2

@pytest.fixture()
def sample_data(n_series):
n_series = 2
horizon = 7
series = generate_series(n_series).reset_index()
series['unique_id'] = series['unique_id'].astype(str)
spark = SparkSession.builder.getOrCreate()
series = spark.createDataFrame(series).repartition(2, 'unique_id')
pipeline(series, n_series, horizon)
return series

def test_spark_flow(sample_data, n_series):
horizon = 7
pipeline(sample_data, n_series, horizon)

def test_spark_flow_with_level(sample_data, n_series):
horizon = 7
pipeline_with_level(sample_data, n_series, horizon)
14 changes: 13 additions & 1 deletion action_files/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,19 @@ def pipeline(series, n_series, horizon):
models=models,
freq='D',
)
forecast = fa.as_pandas(sf.forecast(df=series, h=horizon, level=[80, 90]))
forecast = fa.as_pandas(sf.forecast(df=series, h=horizon))
print(forecast)
assert forecast.shape == (n_series * horizon, len(models) + 2)

def pipeline_with_level(series, n_series, horizon):
models = [
AutoARIMA(season_length=7),
]
sf = StatsForecast(
models=models,
freq='D',
)
forecast = fa.as_pandas(sf.forecast(df=series, h=horizon, level=[80, 90]))
print(forecast.columns)
expected = ["unique_id","ds","AutoARIMA","AutoARIMA-lo-90","AutoARIMA-hi-90", "AutoARIMA-lo-80","AutoARIMA-hi-80"]
assert forecast.shape == (n_series * horizon, len(expected))
65 changes: 34 additions & 31 deletions nbs/src/core/core.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
"from typing import Any, List, Optional, Union, Dict\n",
"import pkg_resources\n",
"\n",
"import fugue.api as fa\n",
"from fugue.execution.factory import make_execution_engine\n",
"import matplotlib.pyplot as plt\n",
"import matplotlib.colors as cm \n",
"import numpy as np\n",
Expand Down Expand Up @@ -1386,6 +1386,9 @@
" \n",
" def _parse_X_level(self, h, X, level):\n",
" if X is not None:\n",
" if isinstance(X, pd.DataFrame):\n",
" if X.index.name != \"unique_id\":\n",
" X = X.set_index(\"unique_id\")\n",
" expected_shape_rows = h * len(self.ga)\n",
" ga_shape = self.ga.data.shape[1]\n",
" # Polars doesn't have index, hence, extra \"column\"\n",
Expand Down Expand Up @@ -2198,19 +2201,19 @@
" prediction_intervals=prediction_intervals,\n",
" )\n",
" assert df is not None\n",
" with fa.engine_context(infer_by=[df]) as e:\n",
" backend = make_backend(e)\n",
" return backend.forecast(\n",
" df=df,\n",
" models=self.models,\n",
" freq=self.freq,\n",
" fallback_model=self.fallback_model,\n",
" h=h,\n",
" X_df=X_df,\n",
" level=level,\n",
" fitted=fitted,\n",
" prediction_intervals=prediction_intervals,\n",
" )\n",
" engine = make_execution_engine(infer_by=[df])\n",
" backend = make_backend(engine)\n",
" return backend.forecast(\n",
" df=df,\n",
" models=self.models,\n",
" freq=self.freq,\n",
" fallback_model=self.fallback_model,\n",
" h=h,\n",
" X_df=X_df,\n",
" level=level,\n",
" fitted=fitted,\n",
" prediction_intervals=prediction_intervals,\n",
" )\n",
" \n",
" def cross_validation(\n",
" self,\n",
Expand Down Expand Up @@ -2241,23 +2244,23 @@
" prediction_intervals=prediction_intervals,\n",
" )\n",
" assert df is not None\n",
" with fa.engine_context(infer_by=[df]) as e:\n",
" backend = make_backend(e)\n",
" return backend.cross_validation(\n",
" df=df,\n",
" models=self.models,\n",
" freq=self.freq,\n",
" fallback_model=self.fallback_model,\n",
" h=h,\n",
" n_windows=n_windows,\n",
" step_size=step_size,\n",
" test_size=test_size,\n",
" input_size=input_size,\n",
" level=level,\n",
" refit=refit,\n",
" fitted=fitted,\n",
" prediction_intervals=prediction_intervals,\n",
" )\n",
" engine = make_execution_engine(infer_by=[df])\n",
" backend = make_backend(engine)\n",
" return backend.cross_validation(\n",
" df=df,\n",
" models=self.models,\n",
" freq=self.freq,\n",
" fallback_model=self.fallback_model,\n",
" h=h,\n",
" n_windows=n_windows,\n",
" step_size=step_size,\n",
" test_size=test_size,\n",
" input_size=input_size,\n",
" level=level,\n",
" refit=refit,\n",
" fitted=fitted,\n",
" prediction_intervals=prediction_intervals,\n",
" )\n",
"\n",
" def _is_native(self, df) -> bool:\n",
" engine = try_get_context_execution_engine()\n",
Expand Down
Loading

0 comments on commit 6ed5bd3

Please sign in to comment.