diff --git a/python/orca/src/bigdl/orca/learn/bigdl/estimator.py b/python/orca/src/bigdl/orca/learn/bigdl/estimator.py index d55043d4024..0e43144df0e 100644 --- a/python/orca/src/bigdl/orca/learn/bigdl/estimator.py +++ b/python/orca/src/bigdl/orca/learn/bigdl/estimator.py @@ -210,7 +210,7 @@ def predict(self, data, batch_size=8, feature_cols="features", sample_preprocess return self.nn_model.transform(data) elif isinstance(data, SparkXShards): from zoo.orca.data.utils import to_sample - from zoo.orca.learn.tf.utils import convert_predict_to_xshard + from zoo.orca.learn.utils import convert_predict_to_xshard sample_rdd = data.rdd.flatMap(to_sample) result_rdd = self.model.predict(sample_rdd) return convert_predict_to_xshard(result_rdd) diff --git a/python/orca/src/bigdl/orca/learn/openvino/__init__.py b/python/orca/src/bigdl/orca/learn/openvino/__init__.py new file mode 100644 index 00000000000..0b441c95014 --- /dev/null +++ b/python/orca/src/bigdl/orca/learn/openvino/__init__.py @@ -0,0 +1,16 @@ +# +# Copyright 2018 Analytics Zoo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from .estimator import Estimator diff --git a/python/orca/src/bigdl/orca/learn/openvino/estimator.py b/python/orca/src/bigdl/orca/learn/openvino/estimator.py new file mode 100644 index 00000000000..c752ecad0be --- /dev/null +++ b/python/orca/src/bigdl/orca/learn/openvino/estimator.py @@ -0,0 +1,170 @@ +# +# Copyright 2018 Analytics Zoo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from zoo.pipeline.inference import InferenceModel +from zoo.orca.data import SparkXShards +from zoo import get_node_and_core_number +from zoo.util import nest +from zoo.common.nncontext import init_nncontext + +import numpy as np + + +class Estimator(object): + def fit(self, data, epochs, **kwargs): + pass + + def predict(self, data, **kwargs): + pass + + def evaluate(self, data, **kwargs): + pass + + def get_model(self): + pass + + def save(self, model_path): + pass + + def load(self, model_path): + pass + + @staticmethod + def from_openvino(*, model_path, batch_size=0): + """ + Load an openVINO Estimator. + + :param model_path: String. The file path to the OpenVINO IR xml file. + :param batch_size: Int. + """ + return OpenvinoEstimatorWrapper(model_path=model_path, batch_size=batch_size) + + +class OpenvinoEstimatorWrapper(Estimator): + def __init__(self, + *, + model_path, + batch_size=0): + self.node_num, self.core_num = get_node_and_core_number() + self.path = model_path + if batch_size != 0: + self.batch_size = batch_size + else: + import xml.etree.ElementTree as ET + tree = ET.parse(model_path) + root = tree.getroot() + shape_item = root.find('./layers/layer/output/port/dim[1]') + if shape_item is None: + raise ValueError("Invalid openVINO IR xml file, please check your model_path") + self.batch_size = int(shape_item.text) + self.model = InferenceModel(supported_concurrent_num=self.core_num) + self.model.load_openvino(model_path=model_path, + weight_path=model_path[:model_path.rindex(".")] + ".bin", + batch_size=batch_size) + + def fit(self, data, epochs, **kwargs): + raise NotImplementedError + + def predict(self, data, **kwargs): + def predict_transform(dict_data, batch_size): + assert isinstance(dict_data, dict), "each shard should be an dict" + assert "x" in dict_data, "key x should in each shard" + feature_data = dict_data["x"] + if isinstance(feature_data, np.ndarray): + assert feature_data.shape[1] <= batch_size, \ + "The batch size of input data (the second dim) should be less than the model " \ + "batch size, otherwise some inputs will be ignored." + elif isinstance(feature_data, list): + for elem in feature_data: + assert isinstance(elem, np.ndarray), "Each element in the x list should be " \ + "a ndarray, but get " + \ + elem.__class__.__name__ + assert elem.shape[1] <= batch_size, "The batch size of each input data (the " \ + "second dim) should be less than the " \ + "model batch size, otherwise some inputs " \ + "will be ignored." + else: + raise ValueError("x in each shard should be a ndarray or a list of ndarray.") + return dict_data["x"] + + sc = init_nncontext() + + if isinstance(data, SparkXShards): + assert sc is not None, "You should pass sc(spark context) if data is a XShards." + from zoo.orca.learn.utils import convert_predict_to_xshard + data = data.transform_shard(predict_transform, self.batch_size) + result_rdd = self.model.distributed_predict(data.rdd, sc) + return convert_predict_to_xshard(result_rdd) + elif isinstance(data, (np.ndarray, list)): + total_core_num = self.core_num * self.node_num + if isinstance(data, np.ndarray): + assert data.shape[1] <= self.batch_size, "The batch size of input data (the " \ + "second dim) should be less than the " \ + "model batch size, otherwise some " \ + "inputs will be ignored." + split_num = min(total_core_num, data.shape[0]) + arrays = np.array_split(data, split_num) + data_rdd = sc.parallelize(arrays, numSlices=split_num) + elif isinstance(data, list): + flattened = nest.flatten(data) + data_length = len(flattened[0]) + data_to_be_rdd = [] + split_num = min(total_core_num, flattened[0].shape[0]) + for i in range(split_num): + data_to_be_rdd.append([]) + for x in flattened: + assert isinstance(x, np.ndarray), "the data in the data list should be " \ + "ndarrays, but get " + \ + x.__class__.__name__ + assert len(x) == data_length, \ + "the ndarrays in data must all have the same size in first dimension" \ + ", got first ndarray of size {} and another {}".format(data_length, len(x)) + assert x.shape[1] <= self.batch_size, "The batch size of each input data (" \ + "the second dim) should be less than " \ + "the model batch size, otherwise some " \ + "inputs will be ignored." + x_parts = np.array_split(x, split_num) + for idx, x_part in enumerate(x_parts): + data_to_be_rdd[idx].append(x_part) + + data_to_be_rdd = [nest.pack_sequence_as(data, shard) for shard in data_to_be_rdd] + data_rdd = sc.parallelize(data_to_be_rdd, numSlices=split_num) + + result_rdd = self.model.distributed_predict(data_rdd, sc) + result_arr_list = result_rdd.collect() + result_arr = np.concatenate(result_arr_list, axis=0) + return result_arr + else: + raise ValueError("Only XShards, a numpy array and a list of numpy arrays are supported " + "as input data, but get " + data.__class__.__name__) + + def evaluate(self, data, **kwargs): + raise NotImplementedError + + def get_model(self): + raise NotImplementedError + + def save(self, model_path): + raise NotImplementedError + + def load(self, model_path): + """ + Load an openVINO model. + + :param model_path: String. The file path to the OpenVINO IR xml file. + :return: + """ + self.model.load_openvino(model_path=model_path, + weight_path=model_path[:model_path.rindex(".")] + ".bin") diff --git a/python/orca/src/bigdl/orca/learn/tf/estimator.py b/python/orca/src/bigdl/orca/learn/tf/estimator.py index cfa0b6131dc..06633023af7 100644 --- a/python/orca/src/bigdl/orca/learn/tf/estimator.py +++ b/python/orca/src/bigdl/orca/learn/tf/estimator.py @@ -19,7 +19,7 @@ from zoo.orca.data.tf.data import Dataset, TFDataDataset2 from zoo.orca.learn.tf.utils import * -from zoo.orca.learn.utils import find_latest_checkpoint +from zoo.orca.learn.utils import find_latest_checkpoint, convert_predict_to_xshard from zoo.tfpark import KerasModel from zoo.tfpark import TFOptimizer, TFNet, ZooOptimizer from zoo.tfpark.tf_optimizer import StatelessMetric diff --git a/python/orca/src/bigdl/orca/learn/tf/utils.py b/python/orca/src/bigdl/orca/learn/tf/utils.py index b27f69eb445..d09ec555052 100644 --- a/python/orca/src/bigdl/orca/learn/tf/utils.py +++ b/python/orca/src/bigdl/orca/learn/tf/utils.py @@ -90,21 +90,6 @@ def combine(pair): return result_df -def convert_predict_to_xshard(prediction_rdd): - def transform_predict(iter): - predictions = list(iter) - # list of np array - if isinstance(predictions[0], list): - predictions = np.array(predictions).T.tolist() - result = [np.array(predict) for predict in predictions] - return [{'prediction': result}] - # np array - else: - return [{'prediction': np.array(predictions)}] - - return SparkXShards(prediction_rdd.mapPartitions(transform_predict)) - - def find_latest_checkpoint(model_dir): import os import re diff --git a/python/orca/src/bigdl/orca/learn/utils.py b/python/orca/src/bigdl/orca/learn/utils.py index be71c8354d4..203731dbcb8 100644 --- a/python/orca/src/bigdl/orca/learn/utils.py +++ b/python/orca/src/bigdl/orca/learn/utils.py @@ -58,3 +58,21 @@ def find_latest_checkpoint(model_dir, model_type="bigdl"): optim_prefix = prefix break return ckpt_path, optim_prefix, latest_version + + +def convert_predict_to_xshard(prediction_rdd): + import numpy as np + from zoo.orca.data import SparkXShards + + def transform_predict(iter): + predictions = list(iter) + # list of np array + if isinstance(predictions[0], list): + predictions = np.array(predictions).T.tolist() + result = [np.array(predict) for predict in predictions] + return [{'prediction': result}] + # np array + else: + return [{'prediction': np.array(predictions)}] + + return SparkXShards(prediction_rdd.mapPartitions(transform_predict)) diff --git a/python/orca/test/bigdl/orca/learn/spark/test_estimator_openvino.py b/python/orca/test/bigdl/orca/learn/spark/test_estimator_openvino.py new file mode 100644 index 00000000000..ffe1718cbd5 --- /dev/null +++ b/python/orca/test/bigdl/orca/learn/spark/test_estimator_openvino.py @@ -0,0 +1,75 @@ +# +# Copyright 2018 Analytics Zoo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os +import tempfile +import subprocess +from unittest import TestCase + +import numpy as np +from zoo import init_nncontext +from zoo.orca.data import SparkXShards +from zoo.orca.learn.openvino.estimator import Estimator +from bigdl.dataset.base import maybe_download + +property_path = os.path.join(os.path.split(__file__)[0], + "../../../../../../zoo/target/classes/app.properties") +data_url = "http://10.239.45.10:8081/repository/raw" + +with open(property_path) as f: + for _ in range(2): # skip the first two lines + next(f) + for line in f: + if "inner-ftp-uri" in line: + line = line.strip() + data_url = line.split("=")[1].replace("\\", "") + + +class TestEstimatorForOpenVINO(TestCase): + def test_openvino(self): + with tempfile.TemporaryDirectory() as local_path: + model_url = data_url + "/analytics-zoo-data/openvino2020_resnet50.tar" + model_path = maybe_download("openvino2020_resnet50.tar", + local_path, model_url) + cmd = "tar -xvf " + model_path + " -C " + local_path + subprocess.Popen(cmd.split()) + model_path = os.path.join(local_path, "openvino2020_resnet50/resnet_v1_50.xml") + est = Estimator.from_openvino(model_path=model_path) + + # ndarray + input_data = np.random.random([20, 4, 3, 224, 224]) + result = est.predict(input_data) + print(result) + + # xshards + input_data_list = [np.random.random([1, 4, 3, 224, 224]), + np.random.random([2, 4, 3, 224, 224])] + sc = init_nncontext() + rdd = sc.parallelize(input_data_list, numSlices=2) + shards = SparkXShards(rdd) + + def pre_processing(images): + return {"x": images} + + shards = shards.transform_shard(pre_processing) + result = est.predict(shards) + result_c = result.collect() + print(result_c) + + +if __name__ == "__main__": + import pytest + + pytest.main([__file__])