Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster Serving http py and inference exception support and config parsing fix #2631

Merged
merged 5 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions pyzoo/zoo/serving/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#

from http.client import HTTPSConnection
import redis
import time
from zoo.serving.schema import *
Expand Down Expand Up @@ -44,15 +45,31 @@ def __init__(self, host=None, port=None):


class InputQueue(API):
def __init__(self, host=None, port=None):
def __init__(self, host=None, port=None, sync=False):
super().__init__(host, port)
self.c, self.h, self.w = None, None, None
self.sync = sync
if sync:
try:
self.conn = HTTPSConnection("localhost", 8081)
self.conn.request("GET", "/")
self.conn.getresponse()
except Exception as e:
print("Connection error, please check your HTTP server. Error msg is ", e)
self.conn.close()
self.stream_name = "serving_stream"

# TODO: these params can be read from config in future
self.input_threshold = 0.6
self.interval_if_error = 1

def predict(self, request_str):
"""
Sync API, block waiting until get response
:return:
"""
self.conn.request("POST", "predict", request_str)
return self.conn.getresponse()

def enqueue(self, uri, **data):
sink = pa.BufferOutputStream()
field_list = []
Expand Down
75 changes: 40 additions & 35 deletions scripts/cluster-serving/cluster-serving-start
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,28 @@ function parse_yaml {
}
}'
}
while [ "$1" != "" ]; do
case $1 in
-e | --engine ) shift
backend=$1
;;
-c | --config_path ) shift
config_path=$1
;;
-h | --help ) usage
exit
;;
* ) usage
exit 1
esac
shift
done

if [ -z "${config_path// }" ]; then
config_path=config.yaml

parse_yaml config.yaml
eval $(parse_yaml config.yaml)
parse_yaml $config_path
eval $(parse_yaml $config_path)

if [ -z "${spark_master}" ]; then
echo "master of spark cluster not set, using default value local[*]"
Expand Down Expand Up @@ -86,40 +105,26 @@ fi
if [ -z "${ZOO_JAR}" ]; then
ZOO_JAR=zoo.jar
fi
if [ -z "${BIGDL_JAR}" ]; then
if [ -f "bigdl.jar" ]; then
BIGDL_JAR=bigdl.jar
mkdir tmp
(cd tmp; unzip -uo ../zoo.jar)
(cd tmp; unzip -uo ../bigdl.jar)
jar -cvf all-in-one.jar -C tmp .
else
BIGDL_JAR=
fi
fi
if [ -z "${SPARK_REDIS_JAR}" ]; then
SPARK_REDIS_JAR=spark-redis-2.4.0-jar-with-dependencies.jar
fi
#if [ -z "${BIGDL_JAR}" ]; then
# if [ -f "bigdl.jar" ]; then
# BIGDL_JAR=bigdl.jar
# mkdir tmp
# (cd tmp; unzip -uo ../zoo.jar)
# (cd tmp; unzip -uo ../bigdl.jar)
# jar -cvf all-in-one.jar -C tmp .
# else
# BIGDL_JAR=
# fi
#fi
#if [ -z "${SPARK_REDIS_JAR}" ]; then
# SPARK_REDIS_JAR=spark-redis-2.4.0-jar-with-dependencies.jar
#fi


while [ "$1" != "" ]; do
case $1 in
-e | --engine ) shift
backend=$1
;;
-c | --config_path ) shift
config_path=$1
;;
-h | --help ) usage
exit
;;
* ) usage
exit 1
esac
shift
done
if [ -z "${backend// }" ]; then
echo "Using default setting, starting Cluster Serving with Flink backend"
${FLINK_HOME}/bin/flink run -c com.intel.analytics.zoo.serving.ClusterServing -p 1 zoo.jar -c $config_path
echo "Starting Cluster Serving with Flink backend"

${FLINK_HOME}/bin/flink run -c com.intel.analytics.zoo.serving.ClusterServing -p 1 zoo.jar -c $config_path
else
if [ $backend != "spark" ]; then
echo "You provided invalid parameter when start serving, supported parameter is spark, your is "$backend
Expand Down Expand Up @@ -154,7 +159,7 @@ else
echo -e "\033[31m${PARAMS[1]}, use local[*] instead \033[0m"
fi
fi
${SPARK_HOME}/bin/spark-submit --master ${spark_master} --driver-memory ${spark_driver_memory} --executor-memory ${spark_executor_memory} --num-executors ${spark_num_executors} --executor-cores ${spark_executor_cores} --total-executor-cores ${spark_total_executor_cores} --jars ${SPARK_REDIS_JAR},${BIGDL_JAR} --class com.intel.analytics.zoo.serving.SparkStreamingClusterServing ${ZOO_JAR}
${SPARK_HOME}/bin/spark-submit --master ${spark_master} --driver-memory ${spark_driver_memory} --executor-memory ${spark_executor_memory} --num-executors ${spark_num_executors} --executor-cores ${spark_executor_cores} --total-executor-cores ${spark_total_executor_cores} --class com.intel.analytics.zoo.serving.SparkStreamingClusterServing ${ZOO_JAR}
fi


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,57 +31,70 @@ object InferenceSupportive {
val postProcessed = preProcessed.grouped(params.coreNum).flatMap(pathByteBatch => {
val thisBatchSize = pathByteBatch.size
(0 until thisBatchSize).toParArray.map(idx => {
val t = pathByteBatch(idx)._2
val result = params.model.doPredict(t)
val value = PostProcessing(result.toTensor[Float], params.filter)
(pathByteBatch(idx)._1, value)
})
try {
val t = pathByteBatch(idx)._2
val result = params.model.doPredict(t)
val value = PostProcessing(result.toTensor[Float], params.filter)
(pathByteBatch(idx)._1, value)
} catch {
case _ =>
logger.info("Your input format is invalid to your model, this batch is skipped")
null
}
}).filter(x => x != null)
})
postProcessed
}
def multiThreadInference(preProcessed: Iterator[(String, Activity)],
params: SerParams): Iterator[(String, String)] = {
println("Inference new batch ..................")
val postProcessed = preProcessed.grouped(params.coreNum).flatMap(pathByteBatch => {
val thisBatchSize = pathByteBatch.size
try {
val thisBatchSize = pathByteBatch.size

val t1 = System.nanoTime()
val t = batchInput(pathByteBatch, params)
val t2 = System.nanoTime()
println(s"Batch input (copy, resize) time ${(t2 - t1) / 1e9} s")
/**
* addSingletonDimension method will modify the
* original Tensor, thus if reuse of Tensor is needed,
* have to squeeze it back.
*/
// dimCheck(t, "add", params)
val result = params.model.doPredict(t)
dimCheck(result, "remove", params)
val t3 = System.nanoTime()
println(s"Inference and Dim check time ${(t3 - t2) / 1e9} s")
val kvResult = if (result.isTensor) {
(0 until thisBatchSize).toParArray.map(i => {
val value = PostProcessing(result.toTensor[Float].select(1, i + 1), params.filter)
(pathByteBatch(i)._1, value)
})
} else if (result.isTable) {
val dataTable = result.toTable
(0 until thisBatchSize).toParArray.map(i => {
var value = ""
dataTable.keySet.foreach(key => {
value += PostProcessing(dataTable(key).asInstanceOf[Tensor[Float]]
.select(1, i + 1), params.filter)
val t1 = System.nanoTime()
val t = batchInput(pathByteBatch, params)
val t2 = System.nanoTime()
println(s"Batch input (copy, resize) time ${(t2 - t1) / 1e9} s")
/**
* addSingletonDimension method will modify the
* original Tensor, thus if reuse of Tensor is needed,
* have to squeeze it back.
*/
// dimCheck(t, "add", params)
val result = params.model.doPredict(t)
dimCheck(result, "remove", params)
val t3 = System.nanoTime()
println(s"Inference and Dim check time ${(t3 - t2) / 1e9} s")
val kvResult = if (result.isTensor) {
(0 until thisBatchSize).toParArray.map(i => {
val value = PostProcessing(result.toTensor[Float].select(1, i + 1), params.filter)
(pathByteBatch(i)._1, value)
})
} else if (result.isTable) {
val dataTable = result.toTable
(0 until thisBatchSize).toParArray.map(i => {
var value = ""
dataTable.keySet.foreach(key => {
value += PostProcessing(dataTable(key).asInstanceOf[Tensor[Float]]
.select(1, i + 1), params.filter)
})
(pathByteBatch(i)._1, value)
})
(pathByteBatch(i)._1, value)
})
} else {
throw new Exception("Wrong output format, neither Tensor nor Table.")
} else {
throw new Exception("Wrong output format, neither Tensor nor Table.")
}
val t4 = System.nanoTime()
println(s"Post-processing time ${(t4 - t3) / 1e9} s")
println(s"Inference logic total time ${(t4 - t1) / 1e9} s")
kvResult
} catch {
case _ =>
logger.info("Your input format is invalid to your model, this batch is skipped")
null
}
val t4 = System.nanoTime()
println(s"Post-processing time ${(t4 - t3) / 1e9} s")
println(s"Inference logic total time ${(t4 - t1) / 1e9} s")
kvResult
})
postProcessed
postProcessed.filter(x => x != null)
}
def batchInput(seq: Seq[(String, Activity)],
params: SerParams): Activity = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class PreProcessing(param: SerParams) {
else {
(kv._1, decodeTensor(kv._2.asInstanceOf[(
ArrayBuffer[Int], ArrayBuffer[Float], ArrayBuffer[Int], ArrayBuffer[Int])]))
}).toList
}
).toList
// Seq(T(oneInsMap.head, oneInsMap.tail: _*))
val arr = oneInsMap.map(x => x._2)
Seq(T.array(arr.toArray))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,22 +210,16 @@ class ClusterServingHelper(_configPath: String = "config.yaml") {
* @return
*/
def getYaml(configList: HM, key: String, default: Any): Any = {
val configValue = try {
configList.get(key)
} catch {
case _ => null
}

val configValue: Any = configList.get(key)
if (configValue == null) {
if (default == null) throw new Error(configList.toString + key + " must be provided")
else {
// println(configList.toString + key + " is null, using default.")
println(configList.toString + key + " is null, using default.")
return default
}
}
else {
// println(configList.toString + key + " getted: " + configValue)
logger.info(configList.toString + key + " getted: " + configValue)
println(configList.toString + key + " getted: " + configValue)
return configValue
}
}
Expand Down