Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multivarient in autots #2936

Merged
merged 10 commits into from
Oct 13, 2020
41 changes: 25 additions & 16 deletions pyzoo/zoo/automl/feature/time_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TimeSequenceFeatureTransformer(BaseFeatureTransformer):

def __init__(self, future_seq_len=1,
dt_col="datetime",
target_col="value",
target_col=["value"],
extra_features_col=None,
drop_missing=True):
"""
Expand Down Expand Up @@ -199,10 +199,11 @@ def transform(self, input_df, is_train=True):
return output_x, output_y

def _unscale(self, y):
# for standard scalar
value_mean = self.scaler.mean_[0]
value_scale = self.scaler.scale_[0]
y_unscale = y * value_scale + value_mean
target_col_indexes = list(range(len(self.target_col)))
dummy_feature_shape = self.scaler.scale_.shape[0]
y_dummy = np.zeros((y.shape[0], dummy_feature_shape))
y_dummy[:, target_col_indexes] = y
y_unscale = self.scaler.inverse_transform(y_dummy)[:, target_col_indexes]
return y_unscale

def unscale_uncertainty(self, y_uncertainty):
Expand All @@ -221,10 +222,12 @@ def _get_y_pred_df(self, y_pred_dt_df, y_pred_unscale):
"""
y_pred_df = y_pred_dt_df
if self.future_seq_len > 1:
columns = ["{}_{}".format(self.target_col, i) for i in range(self.future_seq_len)]
y_pred_df[columns] = pd.DataFrame(y_pred_unscale)
for i in range(self.future_seq_len):
for j in range(self.target_col):
column = self.target_col[j] + str(i)
y_pred_df[column] = pd.DataFrame(y_pred_unscale[j])
else:
y_pred_df[self.target_col] = y_pred_unscale
y_pred_df[self.target_col] = pd.DataFrame(y_pred_unscale)
return y_pred_df

def post_processing(self, input_df, y_pred, is_train):
Expand All @@ -246,13 +249,13 @@ def post_processing(self, input_df, y_pred, is_train):
if isinstance(input_df, list):
y_unscale_list = []
for df in input_df:
_, y_unscale = self._roll_train(df[[self.target_col]],
_, y_unscale = self._roll_train(df[self.target_col],
self.past_seq_len,
self.future_seq_len)
y_unscale_list.append(y_unscale)
output_y_unscale = np.concatenate(y_unscale_list, axis=0)
else:
_, output_y_unscale = self._roll_train(input_df[[self.target_col]],
_, output_y_unscale = self._roll_train(input_df[self.target_col],
self.past_seq_len,
self.future_seq_len)
return output_y_unscale, y_pred_unscale
Expand Down Expand Up @@ -415,8 +418,12 @@ def _check_input(self, input_df, mode="train"):
def _roll_data(self, data, seq_len):
result = []
mask = []

for i in range(len(data) - seq_len + 1):
result.append(data[i: i + seq_len])
if seq_len == 1 and len(self.target_col) > 1:
result.append(data[i])
else:
result.append(data[i: i + seq_len])

if pd.isna(data[i: i + seq_len]).any(axis=None):
mask.append(0)
Expand All @@ -442,7 +449,10 @@ def _roll_train(self, dataframe, past_seq_len, future_seq_len):
length = 1
"""
x = dataframe[0:-future_seq_len].values
y = dataframe.iloc[past_seq_len:, 0].values
if len(self.target_col) == 1:
y = dataframe.iloc[past_seq_len:, 0].values
else:
y = dataframe.iloc[past_seq_len:, list(range(0, len(self.target_col)))].values
output_x, mask_x = self._roll_data(x, past_seq_len)
output_y, mask_y = self._roll_data(y, future_seq_len)
# assert output_x.shape[0] == output_y.shape[0],
Expand Down Expand Up @@ -517,9 +527,8 @@ def _rearrange_data(self, input_df):
:return:
"""
cols = input_df.columns.tolist()
new_cols = [self.dt_col,
self.target_col] + [col for col in cols
if col != self.dt_col and col != self.target_col]
new_cols = [self.dt_col] + self.target_col +\
[col for col in cols if col != self.dt_col and col not in self.target_col]
rearranged_data = input_df[new_cols].copy
return rearranged_data

Expand Down Expand Up @@ -561,7 +570,7 @@ def _get_features(self, input_df, config):
feature_cols = np.asarray(json.loads(config.get("selected_features")))
# we do not include target col in candidates.
# the first column is designed to be the default position of target column.
target_col = np.array([self.target_col])
target_col = np.array(self.target_col)
cols = np.concatenate([target_col, feature_cols])
target_feature_matrix = feature_matrix[cols]
return target_feature_matrix.astype(float)
Expand Down
1 change: 1 addition & 0 deletions pyzoo/zoo/automl/model/time_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def fit_eval(self, x, y, validation_data=None, mc=False, metric="mse", verbose=0
:return: the resulting metric
"""
if not self.model:
config["output_dim"] = y.shape[-1]
self._sel_model(config, verbose=1)

return self.model.fit_eval(x, y,
Expand Down
4 changes: 2 additions & 2 deletions pyzoo/zoo/automl/regression/time_sequence_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(self,
logs_dir="~/zoo_automl_logs",
future_seq_len=1,
dt_col="datetime",
target_col="value",
target_col=["value"],
extra_features_col=None,
drop_missing=True,
):
Expand Down Expand Up @@ -167,7 +167,7 @@ def _check_input_format(self, input_df):
"input_df should be a data frame or a list of data frames")

def _check_missing_col(self, input_df):
cols_list = [self.dt_col, self.target_col]
cols_list = [self.dt_col] + self.target_col
if self.extra_features_col is not None:
if not isinstance(self.extra_features_col, (list,)):
raise ValueError(
Expand Down
5 changes: 4 additions & 1 deletion pyzoo/zoo/zouwu/autots/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ def __init__(self,
:param target_col: the target column to forecast
:param extra_features_col: extra feature columns
"""
target_col_list = target_col
if isinstance(target_col, str):
target_col_list = [target_col]
self.internal = TimeSequencePredictor(
dt_col=dt_col,
target_col=target_col,
target_col=target_col_list,
future_seq_len=horizon,
extra_features_col=extra_features_col,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
def get_drop_dates_and_len(df, allow_missing_num=3):
"""
Find missing values and get records to drop
"""
missing_num = df.total.isnull().astype(int).groupby(df.total.notnull().astype(int).cumsum()).sum()
drop_missing_num = missing_num[missing_num > allow_missing_num]
drop_datetimes = df.iloc[drop_missing_num.index].index
drop_len = drop_missing_num.values
return drop_datetimes, drop_len

def rm_missing_weeks(start_dts, missing_lens, df):
"""
Drop weeks that contains more than 3 consecutive missing values.
If consecutive missing values across weeks, we remove all the weeks.
"""
for start_time, l in zip(start_dts, missing_lens):
start = start_time - pd.Timedelta(days=start_time.dayofweek)
start = start.replace(hour=0, minute=0, second=0)
start_week_end = start + pd.Timedelta(days=6)
start_week_end = start_week_end.replace(hour=22, minute=0, second=0)

end_time = start_time + l*pd.Timedelta(hours=2)
if start_week_end < end_time:
end = end_time + pd.Timedelta(days=6-end_time.dayofweek)
end = end.replace(hour=22, minute=0, second=0)
else:
end = start_week_end
df = df.drop(df[start:end].index)
return df

import os
import numpy as np
import pandas as pd

raw_df = pd.read_csv("data/data.csv")

df = pd.DataFrame(pd.to_datetime(raw_df.StartTime))
df['AvgRate'] = raw_df.AvgRate.apply(lambda x: float(x[:-4]) if x.endswith("Mbps") else float(x[:-4]) * 1000)
df["total"] = raw_df["total"]
df.set_index("StartTime", inplace=True)
full_idx = pd.date_range(start=df.index.min(), end=df.index.max(), freq='2H')
df = df.reindex(full_idx)
drop_dts, drop_len = get_drop_dates_and_len(df)
df = rm_missing_weeks(drop_dts, drop_len, df)
df.ffill(inplace=True)
df.index.name = "datetime"
df = df.reset_index()

from zoo import init_spark_on_local
from zoo.ray import RayContext
sc = init_spark_on_local(cores=4, spark_log_level="INFO")
ray_ctx = RayContext(sc=sc, object_store_memory="1g")
ray_ctx.init()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use init_orca_context() instead.


from zoo.zouwu.autots.forecast import AutoTSTrainer
from zoo.automl.config.recipe import *

trainer = AutoTSTrainer(dt_col="datetime",
target_col=["AvgRate", "total"],
horizon=1,
extra_features_col=None)

look_back = (36, 84)
from zoo.automl.common.util import train_val_test_split
train_df, val_df, test_df = train_val_test_split(df,
val_ratio=0.1,
test_ratio=0.1,
look_back=look_back[0])

ts_pipeline = trainer.fit(train_df, val_df,
recipe=MTNetGridRandomRecipe(
num_rand_samples=1,
time_step=[12],
long_num=[6],
ar_size=[6],
cnn_height=[4],
cnn_hid_size=[32],
training_iteration=1,
epochs=20,
batch_size=[1024]),
metric="mse")

ts_pipeline.internal.config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to print out the result or this might be a useless line.

pred_df = ts_pipeline.predict(test_df)

mse, smape = ts_pipeline.evaluate(test_df, metrics=["mse", "smape"])
print("Evaluate: the mean square error is", mse)
print("Evaluate: the smape value is", smape)

ray_ctx.stop()
sc.stop()