diff --git a/action_files/test_dask.py b/action_files/test_dask.py index 3d0189076..f19eff405 100644 --- a/action_files/test_dask.py +++ b/action_files/test_dask.py @@ -1,12 +1,24 @@ import dask.dataframe as dd +import pytest from statsforecast.utils import generate_series -from .utils import pipeline +from .utils import pipeline, pipeline_with_level -def test_dask_flow(): - n_series = 2 - horizon = 7 +@pytest.fixture() +def n_series(): + return 2 + +@pytest.fixture() +def sample_data(n_series): series = generate_series(n_series).reset_index() series['unique_id'] = series['unique_id'].astype(str) series = dd.from_pandas(series, npartitions=2) - pipeline(series, n_series, horizon) + return series + +def test_dask_flow(sample_data, n_series): + horizon = 7 + pipeline(sample_data, n_series, horizon) + +def test_dask_flow_with_level(sample_data, n_series): + horizon = 7 + pipeline_with_level(sample_data, n_series, horizon) \ No newline at end of file diff --git a/action_files/test_ray.py b/action_files/test_ray.py index 84e7a0a70..cbc8c6f96 100644 --- a/action_files/test_ray.py +++ b/action_files/test_ray.py @@ -1,14 +1,24 @@ +import pytest import ray from statsforecast.utils import generate_series -from .utils import pipeline +from .utils import pipeline, pipeline_with_level -def test_ray_flow(): - n_series = 2 - horizon = 7 +@pytest.fixture() +def n_series(): + return 2 + +@pytest.fixture() +def sample_data(n_series): series = generate_series(n_series).reset_index() series['unique_id'] = series['unique_id'].astype(str) - ctx = ray.data.context.DatasetContext.get_current() - ctx.use_streaming_executor = False series = ray.data.from_pandas(series).repartition(2) - pipeline(series, n_series, horizon) + return series + +def test_ray_flow(sample_data, n_series): + horizon = 7 + pipeline(sample_data, n_series, horizon) + +def test_ray_flow_with_level(sample_data, n_series): + horizon = 7 + pipeline_with_level(sample_data, n_series, horizon) \ No newline at end of file diff --git a/action_files/test_spark.py b/action_files/test_spark.py index 6c213898a..23e4b8b6d 100644 --- a/action_files/test_spark.py +++ b/action_files/test_spark.py @@ -1,13 +1,26 @@ +import pytest from pyspark.sql import SparkSession from statsforecast.utils import generate_series -from .utils import pipeline +from .utils import pipeline, pipeline_with_level -def test_spark_flow(): +@pytest.fixture() +def n_series(): + return 2 + +@pytest.fixture() +def sample_data(n_series): n_series = 2 - horizon = 7 series = generate_series(n_series).reset_index() series['unique_id'] = series['unique_id'].astype(str) spark = SparkSession.builder.getOrCreate() series = spark.createDataFrame(series).repartition(2, 'unique_id') - pipeline(series, n_series, horizon) + return series + +def test_spark_flow(sample_data, n_series): + horizon = 7 + pipeline(sample_data, n_series, horizon) + +def test_spark_flow_with_level(sample_data, n_series): + horizon = 7 + pipeline_with_level(sample_data, n_series, horizon) diff --git a/action_files/utils.py b/action_files/utils.py index 29d2eddf2..75d6965a7 100644 --- a/action_files/utils.py +++ b/action_files/utils.py @@ -41,7 +41,19 @@ def pipeline(series, n_series, horizon): models=models, freq='D', ) - forecast = fa.as_pandas(sf.forecast(df=series, h=horizon, level=[80, 90])) + forecast = fa.as_pandas(sf.forecast(df=series, h=horizon)) print(forecast) assert forecast.shape == (n_series * horizon, len(models) + 2) +def pipeline_with_level(series, n_series, horizon): + models = [ + AutoARIMA(season_length=7), + ] + sf = StatsForecast( + models=models, + freq='D', + ) + forecast = fa.as_pandas(sf.forecast(df=series, h=horizon, level=[80, 90])) + print(forecast.columns) + expected = ["unique_id","ds","AutoARIMA","AutoARIMA-lo-90","AutoARIMA-hi-90", "AutoARIMA-lo-80","AutoARIMA-hi-80"] + assert forecast.shape == (n_series * horizon, len(expected)) \ No newline at end of file diff --git a/nbs/src/core/core.ipynb b/nbs/src/core/core.ipynb index 067cc3a8a..68aba889b 100644 --- a/nbs/src/core/core.ipynb +++ b/nbs/src/core/core.ipynb @@ -88,7 +88,7 @@ "from typing import Any, List, Optional, Union, Dict\n", "import pkg_resources\n", "\n", - "import fugue.api as fa\n", + "from fugue.execution.factory import make_execution_engine\n", "import matplotlib.pyplot as plt\n", "import matplotlib.colors as cm \n", "import numpy as np\n", @@ -1386,6 +1386,9 @@ " \n", " def _parse_X_level(self, h, X, level):\n", " if X is not None:\n", + " if isinstance(X, pd.DataFrame):\n", + " if X.index.name != \"unique_id\":\n", + " X = X.set_index(\"unique_id\")\n", " expected_shape_rows = h * len(self.ga)\n", " ga_shape = self.ga.data.shape[1]\n", " # Polars doesn't have index, hence, extra \"column\"\n", @@ -2198,19 +2201,19 @@ " prediction_intervals=prediction_intervals,\n", " )\n", " assert df is not None\n", - " with fa.engine_context(infer_by=[df]) as e:\n", - " backend = make_backend(e)\n", - " return backend.forecast(\n", - " df=df,\n", - " models=self.models,\n", - " freq=self.freq,\n", - " fallback_model=self.fallback_model,\n", - " h=h,\n", - " X_df=X_df,\n", - " level=level,\n", - " fitted=fitted,\n", - " prediction_intervals=prediction_intervals,\n", - " )\n", + " engine = make_execution_engine(infer_by=[df])\n", + " backend = make_backend(engine)\n", + " return backend.forecast(\n", + " df=df,\n", + " models=self.models,\n", + " freq=self.freq,\n", + " fallback_model=self.fallback_model,\n", + " h=h,\n", + " X_df=X_df,\n", + " level=level,\n", + " fitted=fitted,\n", + " prediction_intervals=prediction_intervals,\n", + " )\n", " \n", " def cross_validation(\n", " self,\n", @@ -2241,23 +2244,23 @@ " prediction_intervals=prediction_intervals,\n", " )\n", " assert df is not None\n", - " with fa.engine_context(infer_by=[df]) as e:\n", - " backend = make_backend(e)\n", - " return backend.cross_validation(\n", - " df=df,\n", - " models=self.models,\n", - " freq=self.freq,\n", - " fallback_model=self.fallback_model,\n", - " h=h,\n", - " n_windows=n_windows,\n", - " step_size=step_size,\n", - " test_size=test_size,\n", - " input_size=input_size,\n", - " level=level,\n", - " refit=refit,\n", - " fitted=fitted,\n", - " prediction_intervals=prediction_intervals,\n", - " )\n", + " engine = make_execution_engine(infer_by=[df])\n", + " backend = make_backend(engine)\n", + " return backend.cross_validation(\n", + " df=df,\n", + " models=self.models,\n", + " freq=self.freq,\n", + " fallback_model=self.fallback_model,\n", + " h=h,\n", + " n_windows=n_windows,\n", + " step_size=step_size,\n", + " test_size=test_size,\n", + " input_size=input_size,\n", + " level=level,\n", + " refit=refit,\n", + " fitted=fitted,\n", + " prediction_intervals=prediction_intervals,\n", + " )\n", "\n", " def _is_native(self, df) -> bool:\n", " engine = try_get_context_execution_engine()\n", diff --git a/nbs/src/core/distributed.fugue.ipynb b/nbs/src/core/distributed.fugue.ipynb index 267bcdd8f..016d6f38b 100644 --- a/nbs/src/core/distributed.fugue.ipynb +++ b/nbs/src/core/distributed.fugue.ipynb @@ -52,6 +52,7 @@ "outputs": [], "source": [ "#| export\n", + "import inspect\n", "from typing import Any, Dict, List\n", "\n", "import numpy as np\n", @@ -113,16 +114,16 @@ " [Source code](https://github.com/Nixtla/statsforecast/blob/main/statsforecast/distributed/fugue.py).\n", "\n", " This class uses [Fugue](https://github.com/fugue-project/fugue) backend capable of distributing \n", - " computation on Spark and Dask without any rewrites.\n", + " computation on Spark, Dask and Ray without any rewrites.\n", "\n", " **Parameters:**
\n", - " `engine`: fugue.ExecutionEngine, a selection between spark and dask.
\n", + " `engine`: fugue.ExecutionEngine, a selection between Spark, Dask, and Ray.
\n", " `conf`: fugue.Config, engine configuration.
\n", " `**transform_kwargs`: additional kwargs for Fugue's transform method.
\n", "\n", " **Notes:**
\n", - " A short introduction to Fugue, with examples on how to scale pandas code to scale pandas \n", - " based code to Spark or Dask is available [here](https://fugue-tutorials.readthedocs.io/tutorials/quick_look/ten_minutes.html).\n", + " A short introduction to Fugue, with examples on how to scale pandas code to Spark, Dask or Ray\n", + " is available [here](https://fugue-tutorials.readthedocs.io/tutorials/quick_look/ten_minutes.html).\n", " \"\"\"\n", " def __init__(\n", " self, \n", @@ -153,7 +154,7 @@ "\n", " **Parameters:**
\n", " `df`: pandas.DataFrame, with columns [`unique_id`, `ds`, `y`] and exogenous.
\n", - " `freq`: str, frequency of the data, [panda's available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases).
\n", + " `freq`: str, frequency of the data, [pandas available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases).
\n", " `models`: List[typing.Any], list of instantiated objects `StatsForecast.models`.
\n", " `fallback_model`: Any, Model to be used if a model fails.
\n", " `X_df`: pandas.DataFrame, with [unique_id, ds] columns and df’s future exogenous.\n", @@ -165,13 +166,14 @@ " \n", " **References:**
\n", " For more information check the \n", - " [Fugue's transform](https://fugue-tutorials.readthedocs.io/tutorials/beginner/introduction.html#fugue-transform)\n", + " [Fugue's transform](https://fugue-tutorials.readthedocs.io/tutorials/beginner/transform.html)\n", " tutorial.
\n", " The [core.StatsForecast's forecast](https://nixtla.github.io/statsforecast/core.html#statsforecast.forecast)\n", " method documentation.
\n", - " Or the list of available [StatsForecast's models](https://nixtla.github.io/statsforecast/models.html).\n", + " Or the list of available [StatsForecast's models](https://nixtla.github.io/statsforecast/src/core/models.html).\n", " \"\"\"\n", - " schema = \"*-y+\" + str(self._get_output_schema(models))\n", + " level = kwargs.get(\"level\", [])\n", + " schema = \"*-y+\" + str(self._get_output_schema(models, level))\n", " if X_df is None:\n", " return transform(\n", " df,\n", @@ -185,7 +187,7 @@ " **self._transform_kwargs,\n", " )\n", " else:\n", - " schema = \"unique_id:str,ds:str,\" + str(self._get_output_schema(models))\n", + " schema = \"unique_id:str,ds:str,\" + str(self._get_output_schema(models, level))\n", " return _cotransform(\n", " df,\n", " X_df,\n", @@ -234,7 +236,8 @@ " method documentation.
\n", " [Rob J. Hyndman and George Athanasopoulos (2018). \"Forecasting principles and practice, Temporal Cross-Validation\"](https://otexts.com/fpp3/tscv.html).\n", " \"\"\"\n", - " schema = \"*-y+\" + str(self._get_output_schema(models, mode=\"cv\"))\n", + " level = kwargs.get(\"level\", [])\n", + " schema = \"*-y+\" + str(self._get_output_schema(models, level, mode=\"cv\"))\n", " return transform(\n", " df,\n", " self._cv,\n", @@ -269,9 +272,19 @@ " fallback_model=fallback_model, n_jobs=1)\n", " return model.cross_validation(**kwargs).reset_index()\n", "\n", - " def _get_output_schema(self, models, mode=\"forecast\") -> Schema:\n", - " cols: List[Any]\n", - " cols = [(repr(model), np.float32) for model in models]\n", + " def _get_output_schema(self, models, level=None, mode=\"forecast\") -> Schema:\n", + " cols: List[Any] = []\n", + " if level is None:\n", + " level = []\n", + " for model in models:\n", + " has_levels = (\n", + " \"level\" in inspect.signature(getattr(model, \"forecast\")).parameters\n", + " and len(level) > 0\n", + " )\n", + " cols.append((repr(model), np.float32))\n", + " if has_levels:\n", + " cols.extend([(f\"{repr(model)}-lo-{l}\", np.float32) for l in reversed(level)])\n", + " cols.extend([(f\"{repr(model)}-hi-{l}\", np.float32) for l in level])\n", " if mode == \"cv\":\n", " cols = [(\"cutoff\", \"datetime\"), (\"y\", np.float32)] + cols\n", " return Schema(cols)\n", @@ -282,6 +295,57 @@ " return FugueBackend(obj, **kwargs)" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "b5369129", + "metadata": {}, + "outputs": [], + "source": [ + "from statsforecast.core import StatsForecast\n", + "from statsforecast.models import ( \n", + " AutoARIMA,\n", + " AutoETS,\n", + ")\n", + "from statsforecast.utils import generate_series\n", + "\n", + "n_series = 4\n", + "horizon = 7\n", + "\n", + "series = generate_series(n_series)\n", + "\n", + "sf = StatsForecast(\n", + " models=[AutoETS(season_length=7)],\n", + " freq='D',\n", + ")\n", + "\n", + "sf.cross_validation(df=series, h=horizon, step_size = 24,\n", + " n_windows = 2, level=[90]).head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3d84def8", + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "\n", + "spark = SparkSession.builder.getOrCreate()\n", + "\n", + "# Make unique_id a column\n", + "series = series.reset_index()\n", + "series['unique_id'] = series['unique_id'].astype(str)\n", + "\n", + "# Convert to Spark\n", + "sdf = spark.createDataFrame(series)\n", + "\n", + "# Returns a Spark DataFrame\n", + "sf.cross_validation(df=sdf, h=horizon, step_size = 24,\n", + " n_windows = 2, level=[90]).show()" + ] + }, { "cell_type": "code", "execution_count": null, @@ -391,14 +455,14 @@ "sf = StatsForecast(models=[Naive()], freq='D', fallback_model=Naive())\n", "dask_fcst = sf.forecast(df=df, h=12).compute()\n", "fcst_stats = sf.forecast(df=df.compute(), h=12)\n", - "test_eq(dask_fcst.sort_values(by=['unique_id', 'ds']).reset_index(drop=True), \n", - " fcst_stats.reset_index())" + "test_eq(dask_fcst.sort_values(by=['unique_id', 'ds']).reset_index(drop=True).astype({\"unique_id\": str}), \n", + " fcst_stats.reset_index().astype({\"unique_id\": str}))" ] }, { "cell_type": "code", "execution_count": null, - "id": "964caa9d-d580-44a7-9115-2522e9b64ee9", + "id": "69b6bbb5", "metadata": {}, "outputs": [], "source": [ @@ -510,14 +574,14 @@ "# Distribute predictions.\n", "sf = StatsForecast(models=[Naive()], freq='D')\n", "fcst_fugue = sf.forecast(df=df, h=12).compute().sort_values(['unique_id', 'ds']).reset_index(drop=True)\n", - "fcst_stats = sf.forecast(df=df.compute(), h=12)\n", - "test_eq(fcst_fugue, fcst_stats.reset_index())\n", + "fcst_stats = sf.forecast(df=df.compute(), h=12).reset_index().astype({\"unique_id\": str})\n", + "test_eq(fcst_fugue, fcst_stats)\n", "\n", "# Distribute cross-validation predictions.\n", "sf = StatsForecast(models=[Naive()], freq='D')\n", "fcst_fugue = sf.cross_validation(df=df, h=12).compute().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)\n", - "fcst_stats = sf.cross_validation(df=df.compute(), h=12)\n", - "test_eq(fcst_fugue, fcst_stats.reset_index())\n", + "fcst_stats = sf.cross_validation(df=df.compute(), h=12).reset_index().astype({\"unique_id\": str})\n", + "test_eq(fcst_fugue, fcst_stats)\n", "\n", "# fallback model\n", "class FailNaive:\n", @@ -529,8 +593,8 @@ "#cross validation fallback model\n", "fcst = StatsForecast(models=[FailNaive()], freq='D', fallback_model=Naive())\n", "fcst_fugue = fcst.cross_validation(df=df, h=12).compute().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)\n", - "fcst_stats = sf.cross_validation(df=df.compute(), h=12)\n", - "test_eq(fcst_fugue, fcst_stats.reset_index())" + "fcst_stats = sf.cross_validation(df=df.compute(), h=12).reset_index().astype({\"unique_id\": str})\n", + "test_eq(fcst_fugue, fcst_stats)" ] }, { @@ -555,14 +619,14 @@ "# Distribute predictions.\n", "sf = StatsForecast(models=[Naive()], freq='D')\n", "fcst_fugue = sf.forecast(df=df, h=12).to_pandas().sort_values(['unique_id', 'ds']).reset_index(drop=True)\n", - "fcst_stats = sf.forecast(df=df.to_pandas(), h=12)\n", - "test_eq(fcst_fugue, fcst_stats.reset_index())\n", + "fcst_stats = sf.forecast(df=df.to_pandas(), h=12).reset_index().astype({\"unique_id\": str})\n", + "test_eq(fcst_fugue, fcst_stats)\n", "\n", "# Distribute cross-validation predictions.\n", "fcst = StatsForecast(models=[Naive()], freq='D')\n", "fcst_fugue = fcst.cross_validation(df=df, h=12).to_pandas().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)\n", - "fcst_stats = sf.cross_validation(df=df.to_pandas(), h=12)\n", - "test_eq(fcst_fugue, fcst_stats.reset_index())\n", + "fcst_stats = sf.cross_validation(df=df.to_pandas(), h=12).reset_index().astype({\"unique_id\": str})\n", + "test_eq(fcst_fugue, fcst_stats)\n", "\n", "# fallback model\n", "class FailNaive:\n", @@ -574,8 +638,8 @@ "#cross validation fallback model\n", "sf = StatsForecast(models=[FailNaive()], freq='D', fallback_model=Naive())\n", "fcst_fugue = sf.cross_validation(df=df, h=12).to_pandas().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)\n", - "fcst_stats = sf.cross_validation(df=df.to_pandas(), h=12)\n", - "test_eq(fcst_fugue, fcst_stats.reset_index())" + "fcst_stats = sf.cross_validation(df=df.to_pandas(), h=12).reset_index().astype({\"unique_id\": str})\n", + "test_eq(fcst_fugue, fcst_stats)" ] }, { diff --git a/nbs/src/core/models_intro.qmd b/nbs/src/core/models_intro.qmd index 96d45fb22..b28601e01 100644 --- a/nbs/src/core/models_intro.qmd +++ b/nbs/src/core/models_intro.qmd @@ -8,10 +8,10 @@ Automatic forecasting tools search for the best parameters and select the best p |Model | Point Forecast | Probabilistic Forecast | Insample fitted values | Probabilistic fitted values | |:------|:-------------:|:----------------------:|:---------------------:|:----------------------------:| -|[`AutoARIMA`](../models.html#autoarima)|✅|✅|✅|✅|✅| -|[`AutoETS`](../models.html#autoets)|✅|✅|✅|✅|✅| -|[`AutoCES`](../models.html#autoces)|✅|✅|✅|✅|| -|[`AutoTheta`](../models.html#autotheta)|✅|✅|✅|✅|✅| +|[`AutoARIMA`](./models.html#autoarima)|✅|✅|✅|✅|✅| +|[`AutoETS`](./models.html#autoets)|✅|✅|✅|✅|✅| +|[`AutoCES`](./models.html#autoces)|✅|✅|✅|✅|| +|[`AutoTheta`](./models.html#autotheta)|✅|✅|✅|✅|✅| : {tbl-colwidths="[75,25]"} ## ARIMA Family @@ -19,8 +19,8 @@ These models exploit the existing autocorrelations in the time series. |Model | Point Forecast | Probabilistic Forecast | Insample fitted values | Probabilistic fitted values | |:------|:-------------:|:----------------------:|:---------------------:|:----------------------------:| -|[`ARIMA`](../models.html#arima)|✅|✅|✅|✅|✅| -|[`AutoRegressive`](../models.html#autoregressive)|✅|✅|✅|✅|✅| +|[`ARIMA`](./models.html#arima)|✅|✅|✅|✅|✅| +|[`AutoRegressive`](./models.html#autoregressive)|✅|✅|✅|✅|✅| : {tbl-colwidths="[75,25]"} ## Theta Family @@ -28,10 +28,10 @@ Fit two theta lines to a deseasonalized time series, using different techniques |Model | Point Forecast | Probabilistic Forecast | Insample fitted values | Probabilistic fitted values | |:------|:-------------:|:----------------------:|:---------------------:|:----------------------------:| -|[`Theta`](../models.html#theta)|✅|✅|✅|✅|✅| -|[`OptimizedTheta`](../models.html#optimizedtheta)|✅|✅|✅|✅|✅| -|[`DynamicTheta`](../models.html#dynamictheta)|✅|✅|✅|✅|✅| -|[`DynamicOptimizedTheta`](../models.html#dynamicoptimizedtheta)|✅|✅|✅|✅|✅| +|[`Theta`](./models.html#theta)|✅|✅|✅|✅|✅| +|[`OptimizedTheta`](./models.html#optimizedtheta)|✅|✅|✅|✅|✅| +|[`DynamicTheta`](./models.html#dynamictheta)|✅|✅|✅|✅|✅| +|[`DynamicOptimizedTheta`](./models.html#dynamicoptimizedtheta)|✅|✅|✅|✅|✅| : {tbl-colwidths="[75,25]"} ## Multiple Seasonalities @@ -39,7 +39,7 @@ Suited for signals with more than one clear seasonality. Useful for low-frequenc |Model | Point Forecast | Probabilistic Forecast | Insample fitted values | Probabilistic fitted values | |:------|:-------------:|:----------------------:|:---------------------:|:----------------------------:| -|[`MSTL`](../models.html#mstl)|✅|✅|✅|✅|✅| +|[`MSTL`](./models.html#mstl)|✅|✅|✅|✅|✅| : {tbl-colwidths="[75,25]"} ## GARCH and ARCH Models @@ -47,8 +47,8 @@ Suited for modeling time series that exhibit non-constant volatility over time. |Model | Point Forecast | Probabilistic Forecast | Insample fitted values | Probabilistic fitted values | |:------|:-------------:|:----------------------:|:---------------------:|:----------------------------:| -|[`GARCH`](../models.html#garch)|✅|✅|✅|✅|✅| -|[`ARCH`](../models.html#arch)|✅|✅|✅|✅|✅| +|[`GARCH`](./models.html#garch)|✅|✅|✅|✅|✅| +|[`ARCH`](./models.html#arch)|✅|✅|✅|✅|✅| : {tbl-colwidths="[75,25]"} ## Baseline Models @@ -56,12 +56,12 @@ Classical models for establishing baseline. |Model | Point Forecast | Probabilistic Forecast | Insample fitted values | Probabilistic fitted values | |:------|:-------------:|:----------------------:|:---------------------:|:----------------------------:| -|[`HistoricAverage`](../models.html#historicaverage)|✅|✅|✅|✅|✅| -|[`Naive`](../models.html#naive)|✅|✅|✅|✅|✅| -|[`RandomWalkWithDrift`](../models.html#randomwalkwithdrift)|✅|✅|✅|✅|✅| -|[`SeasonalNaive`](../models.html#seasonalnaive)|✅|✅|✅|✅|✅| -|[`WindowAverage`](../models.html#windowaverage)|✅||||| -|[`SeasonalWindowAverage`](../models.html#seasonalwindowaverage)|✅||||| +|[`HistoricAverage`](./models.html#historicaverage)|✅|✅|✅|✅|✅| +|[`Naive`](./models.html#naive)|✅|✅|✅|✅|✅| +|[`RandomWalkWithDrift`](./models.html#randomwalkwithdrift)|✅|✅|✅|✅|✅| +|[`SeasonalNaive`](./models.html#seasonalnaive)|✅|✅|✅|✅|✅| +|[`WindowAverage`](./models.html#windowaverage)|✅||||| +|[`SeasonalWindowAverage`](./models.html#seasonalwindowaverage)|✅||||| : {tbl-colwidths="[75,25]"} ## Exponential Smoothing @@ -69,12 +69,12 @@ Uses a weighted average of all past observations where the weights decrease expo |Model | Point Forecast | Probabilistic Forecast | Insample fitted values | Probabilistic fitted values | |:------|:-------------:|:----------------------:|:---------------------:|:----------------------------:| -|[`SimpleExponentialSmoothing`](../models.html#simpleexponentialsmoothing)|✅||||| -|[`SimpleExponentialSmoothingOptimized`](../models.html#simpleexponentialsmoothingoptimized)|✅||||| -|[`SeasonalExponentialSmoothing`](../models.html#seasonalexponentialsmoothing)|✅||||| -|[`SeasonalExponentialSmoothingOptimized`](../models.html#seasonalexponentialsmoothingoptimized)|✅||||| -|[`Holt`](../models.html#holt)|✅|✅|✅|✅|✅| -|[`HoltWinters`](../models.html#holtwinters)|✅|✅|✅|✅|✅| +|[`SimpleExponentialSmoothing`](./models.html#simpleexponentialsmoothing)|✅||||| +|[`SimpleExponentialSmoothingOptimized`](./models.html#simpleexponentialsmoothingoptimized)|✅||||| +|[`SeasonalExponentialSmoothing`](./models.html#seasonalexponentialsmoothing)|✅||||| +|[`SeasonalExponentialSmoothingOptimized`](./models.html#seasonalexponentialsmoothingoptimized)|✅||||| +|[`Holt`](./models.html#holt)|✅|✅|✅|✅|✅| +|[`HoltWinters`](./models.html#holtwinters)|✅|✅|✅|✅|✅| : {tbl-colwidths="[75,25]"} ## Sparse or Intermittent @@ -82,12 +82,12 @@ Suited for series with very few non-zero observations |Model | Point Forecast | Probabilistic Forecast | Insample fitted values | Probabilistic fitted values | |:------|:-------------:|:----------------------:|:---------------------:|:----------------------------:| -|[`ADIDA`](../models.html#adida)|✅||||| -|[`CrostonClassic`](../models.html#crostonclassic)|✅||||| -|[`CrostonOptimized`](../models.html#crostonoptimized)|✅||||| -|[`CrostonSBA`](../models.html#crostonsba)|✅||||| -|[`IMAPA`](../models.html#imapa)|✅||||| -|[`TSB`](../models.html#tsb)|✅||||| +|[`ADIDA`](./models.html#adida)|✅||||| +|[`CrostonClassic`](./models.html#crostonclassic)|✅||||| +|[`CrostonOptimized`](./models.html#crostonoptimized)|✅||||| +|[`CrostonSBA`](./models.html#crostonsba)|✅||||| +|[`IMAPA`](./models.html#imapa)|✅||||| +|[`TSB`](./models.html#tsb)|✅||||| : {tbl-colwidths="[75,25]"} diff --git a/statsforecast/core.py b/statsforecast/core.py index 7f3880263..9ea0c9abd 100644 --- a/statsforecast/core.py +++ b/statsforecast/core.py @@ -13,7 +13,7 @@ from typing import Any, List, Optional, Union, Dict import pkg_resources -import fugue.api as fa +from fugue.execution.factory import make_execution_engine import matplotlib.pyplot as plt import matplotlib.colors as cm import numpy as np @@ -899,6 +899,9 @@ def _make_future_df(self, h: int): def _parse_X_level(self, h, X, level): if X is not None: + if isinstance(X, pd.DataFrame): + if X.index.name != "unique_id": + X = X.set_index("unique_id") expected_shape_rows = h * len(self.ga) ga_shape = self.ga.data.shape[1] # Polars doesn't have index, hence, extra "column" @@ -1838,19 +1841,19 @@ def forecast( prediction_intervals=prediction_intervals, ) assert df is not None - with fa.engine_context(infer_by=[df]) as e: - backend = make_backend(e) - return backend.forecast( - df=df, - models=self.models, - freq=self.freq, - fallback_model=self.fallback_model, - h=h, - X_df=X_df, - level=level, - fitted=fitted, - prediction_intervals=prediction_intervals, - ) + engine = make_execution_engine(infer_by=[df]) + backend = make_backend(engine) + return backend.forecast( + df=df, + models=self.models, + freq=self.freq, + fallback_model=self.fallback_model, + h=h, + X_df=X_df, + level=level, + fitted=fitted, + prediction_intervals=prediction_intervals, + ) def cross_validation( self, @@ -1881,23 +1884,23 @@ def cross_validation( prediction_intervals=prediction_intervals, ) assert df is not None - with fa.engine_context(infer_by=[df]) as e: - backend = make_backend(e) - return backend.cross_validation( - df=df, - models=self.models, - freq=self.freq, - fallback_model=self.fallback_model, - h=h, - n_windows=n_windows, - step_size=step_size, - test_size=test_size, - input_size=input_size, - level=level, - refit=refit, - fitted=fitted, - prediction_intervals=prediction_intervals, - ) + engine = make_execution_engine(infer_by=[df]) + backend = make_backend(engine) + return backend.cross_validation( + df=df, + models=self.models, + freq=self.freq, + fallback_model=self.fallback_model, + h=h, + n_windows=n_windows, + step_size=step_size, + test_size=test_size, + input_size=input_size, + level=level, + refit=refit, + fitted=fitted, + prediction_intervals=prediction_intervals, + ) def _is_native(self, df) -> bool: engine = try_get_context_execution_engine() diff --git a/statsforecast/distributed/fugue.py b/statsforecast/distributed/fugue.py index e84745042..8d844fd24 100644 --- a/statsforecast/distributed/fugue.py +++ b/statsforecast/distributed/fugue.py @@ -4,6 +4,7 @@ __all__ = ['FugueBackend'] # %% ../../nbs/src/core/distributed.fugue.ipynb 4 +import inspect from typing import Any, Dict, List import numpy as np @@ -49,16 +50,16 @@ class FugueBackend(ParallelBackend): [Source code](https://github.com/Nixtla/statsforecast/blob/main/statsforecast/distributed/fugue.py). This class uses [Fugue](https://github.com/fugue-project/fugue) backend capable of distributing - computation on Spark and Dask without any rewrites. + computation on Spark, Dask and Ray without any rewrites. **Parameters:**
- `engine`: fugue.ExecutionEngine, a selection between spark and dask.
+ `engine`: fugue.ExecutionEngine, a selection between Spark, Dask, and Ray.
`conf`: fugue.Config, engine configuration.
`**transform_kwargs`: additional kwargs for Fugue's transform method.
**Notes:**
- A short introduction to Fugue, with examples on how to scale pandas code to scale pandas - based code to Spark or Dask is available [here](https://fugue-tutorials.readthedocs.io/tutorials/quick_look/ten_minutes.html). + A short introduction to Fugue, with examples on how to scale pandas code to Spark, Dask or Ray + is available [here](https://fugue-tutorials.readthedocs.io/tutorials/quick_look/ten_minutes.html). """ def __init__(self, engine: Any = None, conf: Any = None, **transform_kwargs: Any): @@ -85,7 +86,7 @@ def forecast( **Parameters:**
`df`: pandas.DataFrame, with columns [`unique_id`, `ds`, `y`] and exogenous.
- `freq`: str, frequency of the data, [panda's available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases).
+ `freq`: str, frequency of the data, [pandas available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases).
`models`: List[typing.Any], list of instantiated objects `StatsForecast.models`.
`fallback_model`: Any, Model to be used if a model fails.
`X_df`: pandas.DataFrame, with [unique_id, ds] columns and df’s future exogenous. @@ -97,13 +98,14 @@ def forecast( **References:**
For more information check the - [Fugue's transform](https://fugue-tutorials.readthedocs.io/tutorials/beginner/introduction.html#fugue-transform) + [Fugue's transform](https://fugue-tutorials.readthedocs.io/tutorials/beginner/transform.html) tutorial.
The [core.StatsForecast's forecast](https://nixtla.github.io/statsforecast/core.html#statsforecast.forecast) method documentation.
- Or the list of available [StatsForecast's models](https://nixtla.github.io/statsforecast/models.html). + Or the list of available [StatsForecast's models](https://nixtla.github.io/statsforecast/src/core/models.html). """ - schema = "*-y+" + str(self._get_output_schema(models)) + level = kwargs.get("level", []) + schema = "*-y+" + str(self._get_output_schema(models, level)) if X_df is None: return transform( df, @@ -121,7 +123,9 @@ def forecast( **self._transform_kwargs, ) else: - schema = "unique_id:str,ds:str," + str(self._get_output_schema(models)) + schema = "unique_id:str,ds:str," + str( + self._get_output_schema(models, level) + ) return _cotransform( df, X_df, @@ -173,7 +177,8 @@ def cross_validation( method documentation.
[Rob J. Hyndman and George Athanasopoulos (2018). "Forecasting principles and practice, Temporal Cross-Validation"](https://otexts.com/fpp3/tscv.html). """ - schema = "*-y+" + str(self._get_output_schema(models, mode="cv")) + level = kwargs.get("level", []) + schema = "*-y+" + str(self._get_output_schema(models, level, mode="cv")) return transform( df, self._cv, @@ -217,9 +222,21 @@ def _cv( ) return model.cross_validation(**kwargs).reset_index() - def _get_output_schema(self, models, mode="forecast") -> Schema: - cols: List[Any] - cols = [(repr(model), np.float32) for model in models] + def _get_output_schema(self, models, level=None, mode="forecast") -> Schema: + cols: List[Any] = [] + if level is None: + level = [] + for model in models: + has_levels = ( + "level" in inspect.signature(getattr(model, "forecast")).parameters + and len(level) > 0 + ) + cols.append((repr(model), np.float32)) + if has_levels: + cols.extend( + [(f"{repr(model)}-lo-{l}", np.float32) for l in reversed(level)] + ) + cols.extend([(f"{repr(model)}-hi-{l}", np.float32) for l in level]) if mode == "cv": cols = [("cutoff", "datetime"), ("y", np.float32)] + cols return Schema(cols)