Skip to content

Commit

Permalink
Orca openvino estimator (#3011)
Browse files Browse the repository at this point in the history
* init

* add distripredict

* javardd

* java spark context

* test

* fix

* remove tes

* remove

* add type check

* meet comments

* fix import

* fix

* fix style

* combine testcase

* fix style

* replaceable variable
  • Loading branch information
cyita authored and yangw1234 committed Sep 27, 2021
1 parent b91f4cb commit bf09a66
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 17 deletions.
2 changes: 1 addition & 1 deletion python/orca/src/bigdl/orca/learn/bigdl/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def predict(self, data, batch_size=8, feature_cols="features", sample_preprocess
return self.nn_model.transform(data)
elif isinstance(data, SparkXShards):
from zoo.orca.data.utils import to_sample
from zoo.orca.learn.tf.utils import convert_predict_to_xshard
from zoo.orca.learn.utils import convert_predict_to_xshard
sample_rdd = data.rdd.flatMap(to_sample)
result_rdd = self.model.predict(sample_rdd)
return convert_predict_to_xshard(result_rdd)
Expand Down
16 changes: 16 additions & 0 deletions python/orca/src/bigdl/orca/learn/openvino/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from .estimator import Estimator
170 changes: 170 additions & 0 deletions python/orca/src/bigdl/orca/learn/openvino/estimator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from zoo.pipeline.inference import InferenceModel
from zoo.orca.data import SparkXShards
from zoo import get_node_and_core_number
from zoo.util import nest
from zoo.common.nncontext import init_nncontext

import numpy as np


class Estimator(object):
def fit(self, data, epochs, **kwargs):
pass

def predict(self, data, **kwargs):
pass

def evaluate(self, data, **kwargs):
pass

def get_model(self):
pass

def save(self, model_path):
pass

def load(self, model_path):
pass

@staticmethod
def from_openvino(*, model_path, batch_size=0):
"""
Load an openVINO Estimator.
:param model_path: String. The file path to the OpenVINO IR xml file.
:param batch_size: Int.
"""
return OpenvinoEstimatorWrapper(model_path=model_path, batch_size=batch_size)


class OpenvinoEstimatorWrapper(Estimator):
def __init__(self,
*,
model_path,
batch_size=0):
self.node_num, self.core_num = get_node_and_core_number()
self.path = model_path
if batch_size != 0:
self.batch_size = batch_size
else:
import xml.etree.ElementTree as ET
tree = ET.parse(model_path)
root = tree.getroot()
shape_item = root.find('./layers/layer/output/port/dim[1]')
if shape_item is None:
raise ValueError("Invalid openVINO IR xml file, please check your model_path")
self.batch_size = int(shape_item.text)
self.model = InferenceModel(supported_concurrent_num=self.core_num)
self.model.load_openvino(model_path=model_path,
weight_path=model_path[:model_path.rindex(".")] + ".bin",
batch_size=batch_size)

def fit(self, data, epochs, **kwargs):
raise NotImplementedError

def predict(self, data, **kwargs):
def predict_transform(dict_data, batch_size):
assert isinstance(dict_data, dict), "each shard should be an dict"
assert "x" in dict_data, "key x should in each shard"
feature_data = dict_data["x"]
if isinstance(feature_data, np.ndarray):
assert feature_data.shape[1] <= batch_size, \
"The batch size of input data (the second dim) should be less than the model " \
"batch size, otherwise some inputs will be ignored."
elif isinstance(feature_data, list):
for elem in feature_data:
assert isinstance(elem, np.ndarray), "Each element in the x list should be " \
"a ndarray, but get " + \
elem.__class__.__name__
assert elem.shape[1] <= batch_size, "The batch size of each input data (the " \
"second dim) should be less than the " \
"model batch size, otherwise some inputs " \
"will be ignored."
else:
raise ValueError("x in each shard should be a ndarray or a list of ndarray.")
return dict_data["x"]

sc = init_nncontext()

if isinstance(data, SparkXShards):
assert sc is not None, "You should pass sc(spark context) if data is a XShards."
from zoo.orca.learn.utils import convert_predict_to_xshard
data = data.transform_shard(predict_transform, self.batch_size)
result_rdd = self.model.distributed_predict(data.rdd, sc)
return convert_predict_to_xshard(result_rdd)
elif isinstance(data, (np.ndarray, list)):
total_core_num = self.core_num * self.node_num
if isinstance(data, np.ndarray):
assert data.shape[1] <= self.batch_size, "The batch size of input data (the " \
"second dim) should be less than the " \
"model batch size, otherwise some " \
"inputs will be ignored."
split_num = min(total_core_num, data.shape[0])
arrays = np.array_split(data, split_num)
data_rdd = sc.parallelize(arrays, numSlices=split_num)
elif isinstance(data, list):
flattened = nest.flatten(data)
data_length = len(flattened[0])
data_to_be_rdd = []
split_num = min(total_core_num, flattened[0].shape[0])
for i in range(split_num):
data_to_be_rdd.append([])
for x in flattened:
assert isinstance(x, np.ndarray), "the data in the data list should be " \
"ndarrays, but get " + \
x.__class__.__name__
assert len(x) == data_length, \
"the ndarrays in data must all have the same size in first dimension" \
", got first ndarray of size {} and another {}".format(data_length, len(x))
assert x.shape[1] <= self.batch_size, "The batch size of each input data (" \
"the second dim) should be less than " \
"the model batch size, otherwise some " \
"inputs will be ignored."
x_parts = np.array_split(x, split_num)
for idx, x_part in enumerate(x_parts):
data_to_be_rdd[idx].append(x_part)

data_to_be_rdd = [nest.pack_sequence_as(data, shard) for shard in data_to_be_rdd]
data_rdd = sc.parallelize(data_to_be_rdd, numSlices=split_num)

result_rdd = self.model.distributed_predict(data_rdd, sc)
result_arr_list = result_rdd.collect()
result_arr = np.concatenate(result_arr_list, axis=0)
return result_arr
else:
raise ValueError("Only XShards, a numpy array and a list of numpy arrays are supported "
"as input data, but get " + data.__class__.__name__)

def evaluate(self, data, **kwargs):
raise NotImplementedError

def get_model(self):
raise NotImplementedError

def save(self, model_path):
raise NotImplementedError

def load(self, model_path):
"""
Load an openVINO model.
:param model_path: String. The file path to the OpenVINO IR xml file.
:return:
"""
self.model.load_openvino(model_path=model_path,
weight_path=model_path[:model_path.rindex(".")] + ".bin")
2 changes: 1 addition & 1 deletion python/orca/src/bigdl/orca/learn/tf/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from zoo.orca.data.tf.data import Dataset, TFDataDataset2

from zoo.orca.learn.tf.utils import *
from zoo.orca.learn.utils import find_latest_checkpoint
from zoo.orca.learn.utils import find_latest_checkpoint, convert_predict_to_xshard
from zoo.tfpark import KerasModel
from zoo.tfpark import TFOptimizer, TFNet, ZooOptimizer
from zoo.tfpark.tf_optimizer import StatelessMetric
Expand Down
15 changes: 0 additions & 15 deletions python/orca/src/bigdl/orca/learn/tf/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,6 @@ def combine(pair):
return result_df


def convert_predict_to_xshard(prediction_rdd):
def transform_predict(iter):
predictions = list(iter)
# list of np array
if isinstance(predictions[0], list):
predictions = np.array(predictions).T.tolist()
result = [np.array(predict) for predict in predictions]
return [{'prediction': result}]
# np array
else:
return [{'prediction': np.array(predictions)}]

return SparkXShards(prediction_rdd.mapPartitions(transform_predict))


def find_latest_checkpoint(model_dir):
import os
import re
Expand Down
18 changes: 18 additions & 0 deletions python/orca/src/bigdl/orca/learn/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,21 @@ def find_latest_checkpoint(model_dir, model_type="bigdl"):
optim_prefix = prefix
break
return ckpt_path, optim_prefix, latest_version


def convert_predict_to_xshard(prediction_rdd):
import numpy as np
from zoo.orca.data import SparkXShards

def transform_predict(iter):
predictions = list(iter)
# list of np array
if isinstance(predictions[0], list):
predictions = np.array(predictions).T.tolist()
result = [np.array(predict) for predict in predictions]
return [{'prediction': result}]
# np array
else:
return [{'prediction': np.array(predictions)}]

return SparkXShards(prediction_rdd.mapPartitions(transform_predict))
75 changes: 75 additions & 0 deletions python/orca/test/bigdl/orca/learn/spark/test_estimator_openvino.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import tempfile
import subprocess
from unittest import TestCase

import numpy as np
from zoo import init_nncontext
from zoo.orca.data import SparkXShards
from zoo.orca.learn.openvino.estimator import Estimator
from bigdl.dataset.base import maybe_download

property_path = os.path.join(os.path.split(__file__)[0],
"../../../../../../zoo/target/classes/app.properties")
data_url = "http://10.239.45.10:8081/repository/raw"

with open(property_path) as f:
for _ in range(2): # skip the first two lines
next(f)
for line in f:
if "inner-ftp-uri" in line:
line = line.strip()
data_url = line.split("=")[1].replace("\\", "")


class TestEstimatorForOpenVINO(TestCase):
def test_openvino(self):
with tempfile.TemporaryDirectory() as local_path:
model_url = data_url + "/analytics-zoo-data/openvino2020_resnet50.tar"
model_path = maybe_download("openvino2020_resnet50.tar",
local_path, model_url)
cmd = "tar -xvf " + model_path + " -C " + local_path
subprocess.Popen(cmd.split())
model_path = os.path.join(local_path, "openvino2020_resnet50/resnet_v1_50.xml")
est = Estimator.from_openvino(model_path=model_path)

# ndarray
input_data = np.random.random([20, 4, 3, 224, 224])
result = est.predict(input_data)
print(result)

# xshards
input_data_list = [np.random.random([1, 4, 3, 224, 224]),
np.random.random([2, 4, 3, 224, 224])]
sc = init_nncontext()
rdd = sc.parallelize(input_data_list, numSlices=2)
shards = SparkXShards(rdd)

def pre_processing(images):
return {"x": images}

shards = shards.transform_shard(pre_processing)
result = est.predict(shards)
result_c = result.collect()
print(result_c)


if __name__ == "__main__":
import pytest

pytest.main([__file__])

0 comments on commit bf09a66

Please sign in to comment.