Skip to content

Commit

Permalink
Fix zouwu TCMFForecaster interface (#2949)
Browse files Browse the repository at this point in the history
* move some params from constructor to fit for zouwu TCMFForecaster

* fix style

* update doc and add covariates/dti in evaluate

* add check and UT

* remove unsupport input x in tcmfforecaster

* revert num_levels and channel_size to num_channels
  • Loading branch information
shanyu-sys committed Oct 15, 2020
1 parent 40138b2 commit 4b325dd
Show file tree
Hide file tree
Showing 8 changed files with 590 additions and 288 deletions.
114 changes: 71 additions & 43 deletions docs/docs/Zouwu/API/TCMFForecaster.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,18 @@ __Ubuntu 16.04 or later__ and __macOS 10.12.6 or later__.
```python
from zoo.zouwu.model.forecast.tcmf_forecaster import TCMFForecaster
model = TCMFForecaster(
vbsize=128,
hbsize=256,
num_channels_X=[32, 32, 32, 32, 32, 1],
num_channels_Y=[16, 16, 16, 16, 16, 1],
kernel_size=7,
vbsize=128,
hbsize=256,
num_channels_X=[32, 32, 32, 32, 32, 1],
num_channels_Y=[16, 16, 16, 16, 16, 1],
kernel_size=7,
dropout=0.1,
rank=64,
kernel_size_Y=7,
learning_rate=0.0005,
val_len=24,
normalize=False,
start_date="2020-4-1",
freq="1H",
covariates=None,
use_time=True,
dti=None,
svd=True,
period=24,
y_iters=10,
init_FX_epoch=100,
max_FX_epoch=300,
max_TCN_epoch=300,
alt_iters=10)
svd=True,)
```
* `vbsize`: int, default is 128.
Vertical batch size, which is the number of cells per batch.
Expand All @@ -64,25 +53,46 @@ model = TCMFForecaster(
* `kernel_size_Y`: int, default is 7.
Kernel size of hybrid model
* `learning_rate`: float, default is 0.0005
* `val_len`:int, default is 24.
Validation length. We will use the last val_len time points as validation data.
* `normalize`: boolean, false by default.
Whether to normalize input data for training.
* `use_time`: boolean, default is True.
Whether to use time coveriates.
* `svd`: boolean, default is False.
Whether factor matrices are initialized by NMF

### Use TCMFForecaster
#### **Train model**
After an TCMFForecaster is created, you can call forecaster API to train a tcmf model:
```python
model.fit(x,
val_len=24,
start_date="2020-4-1",
freq="1H",
covariates=None,
dti=None,
period=24,
y_iters=10,
init_FX_epoch=100,
max_FX_epoch=300,
max_TCN_epoch=300,
alt_iters=10,
num_workers=None)
```
* `x`: the input for fit. Only dict of ndarray and SparkXShards of dict of ndarray
are supported. Example: {'id': id_arr, 'y': data_ndarray}. If input is SparkXShards, each partition will use one model to fit.
* `val_len`:int, default is 24.
Validation length. We will use the last val_len time points as validation data.
* `start_date`: str or datetime-like.
Start date time for the time-series. e.g. "2020-01-01"
* `freq`: str or DateOffset, default is 'H'
Frequency of data
* `use_time`: boolean, default is True.
Whether to use time coveriates.
* `covariates`: 2-D ndarray or None. The shape of ndarray should be (r, T), where r is
the number of covariates and T is the number of time points.
Global covariates for all time series. If None, only default time coveriates will be
used while use_time is True. If not, the time coveriates used is the stack of input
covariates and default time coveriates.
* `dti`: DatetimeIndex or None.
If None, use default fixed frequency DatetimeIndex generated with start_date and freq.
* `svd`: boolean, default is False.
Whether factor matrices are initialized by NMF
* `period`: int, default is 24.
Periodicity of input time series, leave it out if not known
* `y_iters`: int, default is 10.
Expand All @@ -95,60 +105,78 @@ model = TCMFForecaster(
Max number of iterations while training the local model.
* `alt_iters`: int, default is 10.
Number of iterations while alternate training.

### Use TCMFForecaster
#### **Train model**
After an TCMFForecaster is created, you can call forecaster API to train a tcmf model:
```
model.fit(x,
num_workers=None)
```
* `x`: the input for fit. Only dict of ndarray and SparkXShards of dict of ndarray
are supported. Example: {'id': id_arr, 'y': data_ndarray}. If input is SparkXShards, each partition will use one model to fit.

* `num_workers`: the number of workers you want to use for fit. It is only effective while input x is dict of ndarray. If None, it defaults to
num_ray_nodes in the created RayContext or 1 if there is no active RayContext.

#### **Get prediction results of model**
After Training, you can call forecaster API to get the prediction result of tcmf model. `model.predict` will output the prediction results of future `horizon` steps after `x` in `fit`.
```
model.predict(x=None,
horizon=24,
covariates=None,
model.predict(horizon=24,
future_covariates=None,
future_dti=None,
num_workers=None,
)
```
* `x`: the input. We don't support input x directly. This is just for consistence with the forecaster API.
* `covariates`: the global covariates. Defaults to None.
* `future_covariates`: covariates corresponding to future horizon steps data to predict.
2-D ndarray or None.
The shape of ndarray should be (r, horizon), where r is the number of covariates.
Global covariates for all time series. If None, only default time coveriates will be
used while use_time is True. If not, the time coveriates used is the stack of input
covariates and default time coveriates.
* `future_dti`: dti corresponding to future horizon steps data to predict.
DatetimeIndex or None.
If None, use default fixed frequency DatetimeIndex generated with the last date of x in
fit and freq.
* `num_workers`: the number of workers to use in predict. It is only effective while input `x` in `fit` is dict of ndarray. If None, it defaults to
num_ray_nodes in the created RayContext or 1 if there is no active RayContext.

#### **Evaluate model**
After Training, you can call forecaster API to evaluate the tcmf model. `model.evaluate` will output the evaluation results for future `horizon` steps after `x` in `fit`.
```
model.evaluate(target_value,
x=None,
metric=['mae'],
covariates=None,
target_covariates=None,
target_dti=None,
num_workers=None,
)
```
* `target_value`: target value for evaluation. It should be of the same format as input x in fit, which is a dict of ndarray or SparkXShards of dict of ndarray.
We interpret the second dimension of y in target value as the horizon length for evaluation.
* `covariates`: global covariates corresponding to target value. Defaults to None.
* `metric`: the metrics. A list of metric names.
* `target_covariates`: covariates corresponding to target_value.
2-D ndarray or None.
The shape of ndarray should be (r, horizon), where r is the number of covariates.
Global covariates for all time series. If None, only default time coveriates will be
used while use_time is True. If not, the time coveriates used is the stack of input
covariates and default time coveriates.
* `target_dti`: dti corresponding to target_value.
DatetimeIndex or None.
If None, use default fixed frequency DatetimeIndex generated with the last date of x in
fit and freq.
* `num_workers`: the number of workers to use in evaluate. It is only effective while input target value is dict of ndarray. If None, it defaults to
num_ray_nodes in the created RayContext or 1 if there is no active RayContext.

#### **Incrementally fit the model with additional data**
Incrementally fit the model. Note that we only incrementally fit X_seq (TCN in global model). We haven't enable fit_incremental for input SparkXshards yet.
```python
model.fit_incremental(x_incr)
model.fit_incremental(x_incr,
covariates_incr=None,
dti_incr=None
)
```
* `x_incr`: incremental data to be fitted. It should be of the same format as input x in fit, which is a dict of ndarray or SparkXShards of dict of ndarray.
Example: {'id': id_arr, 'y': incr_ndarray}, and incr_ndarray is of shape (n, T_incr), where
n is the number of target time series, T_incr is the number of time steps incremented. You
can choose not to input 'id' in x_incr, but if you do, the elements of id in x_incr should
be the same as id in x of fit.
* `covariates_incr`: covariates corresponding to x_incr. 2-D ndarray or None.
The shape of ndarray should be (r, T_incr), where r is the number of covariates.
Global covariates for all time series. If None, only default time coveriates will be
used while use_time is True. If not, the time coveriates used is the stack of input
covariates and default time coveriates.
* `dti_incr`: dti corresponding to the x_incr. DatetimeIndex or None.
If None, use default fixed frequency DatetimeIndex generated with the last date of x in
fit and freq.

#### **Save model**
You can save model after fit for future deployment.
Expand Down
54 changes: 32 additions & 22 deletions docs/docs/Zouwu/tutorials/TCMFForecaster.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,51 +73,61 @@ model = TCMFForecaster(
num_channels_X=[32, 32, 32, 32, 32, 1],
num_channels_Y=[16, 16, 16, 16, 16, 1],
kernel_size=7,
dropout=0.1,
rank=64,
kernel_size_Y=7,
learning_rate=0.0005,
val_len=24,
normalize=False,
start_date="2020-4-1",
freq="1H",
covariates=None,
use_time=True,
dti=None,
svd=True,
period=24,
y_iters=10,
init_FX_epoch=100,
max_FX_epoch=300,
max_TCN_epoch=300,
alt_iters=10)
dropout=0.1,
rank=64,
kernel_size_Y=7,
learning_rate=0.0005,
normalize=False,
use_time=True,
svd=True,)
```
### **Step 3: Use TCMFForecaster**

#### **Fit with TCMFForecaster**

```
model.fit(x, num_workers=num_workers_for_fit)
model.fit(
x,
val_len=24,
start_date="2020-4-1",
freq="1H",
covariates=None,
dti=None,
period=24,
y_iters=10,
init_FX_epoch=100,
max_FX_epoch=300,
max_TCN_epoch=300,
alt_iters=10,
num_workers=num_workers_for_fit)
```

#### **Evaluate with TCMFForecaster**
You can either directly call `model.evaluate` as
```
model.evaluate(target_value,metric=['mae'], num_workers=num_workers_for_predict)
model.evaluate(target_value,
metric=['mae'],
target_covariates=None,
target_dti=None,
num_workers=num_workers_for_predict,
)
```

Or you could predict first and then evaluate with metric name.

```
yhat = model.predict(horizon, num_workers=num_workers_for_predict)
yhat = model.predict(horizon,
future_covariates=None,
future_dti=None,
num_workers=num_workers_for_predict)
from zoo.automl.common.metrics import Evaluator
evaluate_mse = Evaluator.evaluate("mse", target_data, yhat)
```

#### **Incremental fit TCMFForecaster**
```python
model.fit_incremental(x_incr)
model.fit_incremental(x_incr, covariates_incr=None, dti_incr=None)
```

#### **Save and Load**
Expand Down
90 changes: 87 additions & 3 deletions pyzoo/test/zoo/automl/model/test_TCMF.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import numpy as np
import os
from numpy.testing import assert_array_almost_equal
import pandas as pd


class TestTCMF(ZooTestCase):

def setup_method(self, method):
seq_len = 480
self.seq_len = 480
self.num_samples = 300
self.config = {
"y_iters": 1,
Expand All @@ -36,7 +37,7 @@ def setup_method(self, method):
"alt_iters": 2,
}
self.model = TCMF()
self.Ymat = np.random.rand(self.num_samples, seq_len)
self.Ymat = np.random.rand(self.num_samples, self.seq_len)
self.horizon = np.random.randint(2, 50)

def teardown_method(self, method):
Expand All @@ -62,6 +63,74 @@ def test_tcmf(self):
incr_result = self.model.predict(horizon=self.horizon)
np.testing.assert_raises(AssertionError, np.testing.assert_array_equal, result, incr_result)

def test_tcmf_covariates_dti(self):
# test wrong format in fit
with pytest.raises(ValueError, match="Input covariates must be a ndarray. Got"):
self.model.fit_eval(x=self.Ymat,
covariates='None',
**self.config)

with pytest.raises(ValueError, match="The second dimension shape of covariates should be"):
self.model.fit_eval(x=self.Ymat,
covariates=np.random.randn(3, self.seq_len - 1),
**self.config)

with pytest.raises(ValueError, match="You should input a 2-D ndarray of covariates."):
self.model.fit_eval(x=self.Ymat,
covariates=np.random.randn(3, 4, 5),
**self.config)

with pytest.raises(ValueError, match="Input dti must be a pandas DatetimeIndex. Got"):
self.model.fit_eval(x=self.Ymat,
covariates=np.random.randn(3, self.seq_len),
dti='None',
**self.config)

with pytest.raises(ValueError, match="Input dti length should be equal to"):
self.model.fit_eval(x=self.Ymat,
covariates=np.random.randn(3, self.seq_len),
dti=pd.date_range('20130101', periods=self.seq_len-1),
**self.config)

# valid covariates and dti in fit_eval
self.model.fit_eval(x=self.Ymat, y=None,
covariates=np.random.rand(3, self.seq_len),
dti=pd.date_range('20130101', periods=self.seq_len),
**self.config)

# inconsistent covariates and dti
with pytest.raises(ValueError, match="Find valid covariates in fit but invalid covariates "
"in predict."):
self.model.predict(horizon=self.horizon)

with pytest.raises(ValueError, match="be the same as the input covariates number in fit."):
self.model.predict(horizon=self.horizon,
future_covariates=np.random.randn(2, self.horizon),
)
with pytest.raises(ValueError, match="Find valid dti in fit but invalid dti in"):
self.model.predict(horizon=self.horizon,
future_covariates=np.random.randn(3, self.horizon),
)

with pytest.raises(ValueError, match="Find valid covariates in fit but invalid covariates "
"in fit_incremental."):
self.model.fit_incremental(x=np.random.rand(self.num_samples, self.horizon))

with pytest.raises(ValueError, match="be the same as the input covariates number in fit."):
self.model.fit_incremental(x=np.random.rand(self.num_samples, self.horizon),
covariates_new=np.random.randn(2, self.horizon),)
with pytest.raises(ValueError, match="Find valid dti in fit but invalid dti in"):
self.model.fit_incremental(x=np.random.rand(self.num_samples, self.horizon),
covariates_new=np.random.randn(3, self.horizon), )

self.model.predict(horizon=self.horizon,
future_covariates=np.random.randn(3, self.horizon),
future_dti=pd.date_range('20130101', periods=self.horizon),
)
self.model.fit_incremental(x=np.random.rand(self.num_samples, self.horizon),
covariates_new=np.random.randn(3, self.horizon),
dti_new=pd.date_range('20130101', periods=self.horizon),)

def test_error(self):
with pytest.raises(ValueError, match="We don't support input x directly"):
self.model.predict(x=1)
Expand All @@ -87,11 +156,26 @@ def test_error(self):
"calling fit_incremental"):
self.model.fit_incremental(x=np.random.rand(self.num_samples, self.horizon))

self.model.fit_eval(x=self.Ymat, y=None, **self.config)
with pytest.raises(Exception, match=f"Expected incremental input with {self.num_samples} "
f"time series, got {self.num_samples - 1} instead"):
self.model.fit_eval(x=self.Ymat, y=None, **self.config)
self.model.fit_incremental(x=np.random.rand(self.num_samples - 1, self.horizon))

with pytest.raises(ValueError, match="but invalid covariates in fit. "):
self.model.predict(horizon=self.horizon,
future_covariates=np.random.randn(3, self.horizon),
)
with pytest.raises(ValueError, match="but invalid dti in fit. "):
self.model.predict(horizon=self.horizon,
future_dti=pd.date_range('20130101', periods=self.horizon),
)
with pytest.raises(ValueError, match="but invalid covariates in fit. "):
self.model.fit_incremental(x=np.random.rand(self.num_samples, self.horizon),
covariates_new=np.random.randn(3, self.horizon),)
with pytest.raises(ValueError, match="but invalid dti in fit. "):
self.model.fit_incremental(x=np.random.rand(self.num_samples, self.horizon),
dti_new=pd.date_range('20130101', periods=self.horizon), )

def test_save_restore(self):
self.model.fit_eval(x=self.Ymat, y=None, **self.config)
result_save = self.model.predict(horizon=self.horizon)
Expand Down
Loading

0 comments on commit 4b325dd

Please sign in to comment.