Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lastfill #2523

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/docs/powered-by.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ Learning on Apache Spark and Analytics Zoo](https://www.dellemc.com/resources/en
<br>[Image Similarity-Based House Recommendations and Search](https://software.intel.com/content/www/us/en/develop/articles/using-bigdl-to-build-image-similarity-based-house-recommendations.html)
* __NeuSoft/BMW__
<br>[Neusoft RealSight APM partners with Intel to create an application performance management platform with active defense capabilities](https://platform.neusoft.com/2020/01/17/xw-intel.html) (in Chinese)
* __NeuSoft/Mazda__
<br>[JD, Neusoft and Intel Jointly Building Intelligent and Connected Vehicle Cloud for Hainan-Mazda](https://platform.neusoft.com/2020/06/11/jjfa-haimaqiche.html) (in Chinese)
* __Office Depot__
<br>[Real-time Product Recommendations for Office Depot Using Apache Spark and Analytics Zoo on AWS](https://software.intel.com/en-us/articles/real-time-product-recommendations-for-office-depot-using-apache-spark-and-analytics-zoo-on)
<br>[Office Depot product recommender using Analytics Zoo on AWS](https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/73079)
Expand Down
3 changes: 3 additions & 0 deletions pyzoo/test/zoo/orca/learn/spark/test_estimator_for_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ def transform(df):
steps=10,
validation_data=dataset)

result = est.evaluate(dataset, batch_size=4)
assert 'loss' in result


def test_estimator_graph_predict_dataset(estimator_for_spark_fixture):

Expand Down
56 changes: 56 additions & 0 deletions pyzoo/test/zoo/zouwu/test_impute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import numpy as np
import pandas as pd
import pytest

from test.zoo.pipeline.utils.test_utils import ZooTestCase
from zoo.automl.feature.time_sequence import TimeSequenceFeatureTransformer
from zoo.zouwu.preprocessing.impute.LastFill import LastFill


class TestDataImputation(ZooTestCase):

def setup_method(self, method):
self.ft = TimeSequenceFeatureTransformer()
self.create_data()

def teardown_method(self, method):
pass

def create_data(self):
data = np.random.random_sample((5, 5))
mask = np.random.random_sample((5, 5))
mask[mask >= 0.2] = 1
mask[mask < 0.2] = 0
data[mask == 0] = None
df = pd.DataFrame(data)
idx = pd.date_range(start='2020-07-01 00:00:00', end='2020-07-01 08:00:00', freq='2H')
df.index = idx
self.data = df

def test_lastfill(self):
last_fill = LastFill()
last_fill = LastFill()
mse_missing = last_fill.evaluate(self.data, 0.1)
imputed_data = last_fill.impute(self.data)
assert imputed_data.isna().sum().sum() == 0
mse = last_fill.evaluate(imputed_data, 0.1)

if __name__ == "__main__":
pytest.main([__file__])

17 changes: 10 additions & 7 deletions pyzoo/zoo/automl/model/tcmf/DeepGLO.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,21 +634,25 @@ def get_time_covs(self, start_date, num_ts):
covariates = self.covariates
return covariates

def get_prediction_time_covs(self, rg, horizon):
covs_past = self.Yseq.covariates[:, -rg:]
future_start_date = pd.Timestamp(self.start_date) + pd.Timedelta('1h') * self.Ymat.shape[1]
def get_prediction_time_covs(self, rg, horizon, last_step):
covs_past = self.Yseq.covariates[:, last_step - rg: last_step]
if self.freq[0].isalpha():
freq = "1" + self.freq
else:
freq = self.freq
future_start_date = pd.Timestamp(self.start_date) + pd.Timedelta(freq) * last_step
covs_future = self.get_time_covs(start_date=future_start_date, num_ts=horizon)
covs = np.concatenate([covs_past, covs_future], axis=1)
return covs

def predict_horizon(
self, ind=None, future=10, normalize=False, bsize=90
):

last_step = self.end_index
if ind is None:
ind = np.arange(self.Ymat.shape[0])

self.Xseq = self.Xseq
self.Xseq = self.Xseq.cpu()

self.Yseq.seq = self.Yseq.seq.eval()
self.Xseq = self.Xseq.eval()
Expand All @@ -657,8 +661,7 @@ def predict_horizon(
1 + 2 * (self.kernel_size - 1) * 2 ** (len(self.num_channels_X) - 1),
1 + 2 * (self.kernel_size_Y - 1) * 2 ** (len(self.num_channels_Y) - 1),
)
covs = self.get_prediction_time_covs(rg, future)
last_step = self.Ymat.shape[1]
covs = self.get_prediction_time_covs(rg, future, last_step)

yc = self.predict_global(
ind=ind,
Expand Down
3 changes: 2 additions & 1 deletion pyzoo/zoo/feature/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ def pytorch_dataloader(cls, dataloader, bigdl_type="float"):
if dataloader.batch_size % node_num != 0:
true_bs = math.ceil(dataloader.batch_size / node_num) * node_num
warning_msg = "Detect dataloader's batch_size is not divisible by node number(" + \
node_num + "), will adjust batch_size to " + true_bs + " automatically"
str(node_num) + "), will adjust batch_size to " + str(true_bs) + \
" automatically"
warnings.warn(warning_msg)

bys = CloudPickleSerializer.dumps(CloudPickleSerializer, dataloader)
Expand Down
7 changes: 4 additions & 3 deletions pyzoo/zoo/orca/data/tf/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,10 @@ def _get_prediction_data(self):
return rdd

def _get_evaluation_data(self):

feature_length = len(nest.flatten(self.tensor_structure[0]))
jvalue = callZooFunc("float", "createMiniBatchRDDFromTFDatasetEval",
self.rdd.map(lambda x: x[0]), self.init_op_name, self.table_init_op,
self.output_names,
self.output_types, self.shard_index_op_name, feature_length)
self.output_types, self.shard_index_op_name)
rdd = jvalue.value().toJavaRDD()
return rdd

Expand All @@ -109,6 +107,9 @@ def _get_validation_data(self):
return FeatureSet(jvalue=jvalue)
return None

def get_num_partitions(self):
return self.rdd.getNumPartitions()


class Dataset(object):

Expand Down
71 changes: 71 additions & 0 deletions pyzoo/zoo/zouwu/preprocessing/impute/LastFill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import numpy as np
import pandas as pd
import sklearn.metrics as metrics

from zoo.zouwu.preprocessing.impute.abstract import BaseImpute


class LastFill(BaseImpute):
"""
Impute missing data with last seen value
"""
def __init__(self):
"""
Construct model for last filling method
"""
pass

def impute(self, df):
"""
impute data
:params df: input dataframe
:return: imputed dataframe
"""
df.iloc[0] = df.iloc[0].fillna(0)
return df.fillna(method='pad')

def evaluate(self, df, drop_rate):
"""
evaluate model by randomly drop some value
:params df: input dataframe
:params drop_rate: percentage value that will be randomly dropped
:return: MSE results
"""
missing = df.isna()*1
df1 = self.impute(df)
print("na", df.isna().sum().sum())
missing = missing.to_numpy()
mask = np.zeros(df.shape[0]*df.shape[1])
idx = np.random.choice(mask.shape[0], int(mask.shape[0]*drop_rate), replace=False)
mask[idx] = 1
mask = np.reshape(mask, (df.shape[0], df.shape[1]))
np_df = df.to_numpy()
np_df[mask == 1] = None
new_df = pd.DataFrame(np_df)
impute_df = self.impute(new_df)
print("NA", impute_df.isna().sum().sum())
pred = impute_df.to_numpy()
true = df1.to_numpy()
print(pred)
print("true", true)
pred[missing == 1] = 0
true[missing == 1] = 0
a = metrics.mean_squared_error(true[:, 0], pred[:, 0])
b = metrics.mean_squared_error(true[:, 1], pred[:, 1])
return [a, b]
42 changes: 42 additions & 0 deletions pyzoo/zoo/zouwu/preprocessing/impute/abstract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from abc import ABC, abstractmethod


class BaseImpute(ABC):
"""
base model for data imputation
"""

@abstractmethod
def impute(self, df):
"""
fill in missing values in the dataframe
:param df: dataframe containing missing values
:return: dataframe without missing values
"""
raise NotImplementError

@abstractmethod
def evaluate(self, df, drop_rate):
"""
randomly drop some values and evaluate the data imputation method
:param df: input dataframe (better without missing values)
:param drop_rate: percentage value of randomly dropping data
:return: MSE results
"""
raise NotImplementError
Loading