Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster Serving refine Timer #2825

Merged
merged 7 commits into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions scripts/cluster-serving/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ data:
params:
# default, 4
core_number:
# default: auto-adjust
model_number:
# default: OFF
performance_mode:
redis:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,18 @@ class OpenVINOModel(var modelHolder: OpenVINOModelHolder,
transferListOfActivityToActivityOfBatch(outputs, batchSize)
}

override def copy(num: Int): Array[AbstractModel] = Array(this)
override def copy(num: Int): Array[AbstractModel] = {
val arr = new Array[AbstractModel](1)

(0 until 1).foreach(i => {
val modelBytes = modelHolder.modelBytes.clone()
val weightBytes = modelHolder.weightBytes.clone()

arr(i) = new OpenVINOModel(
new OpenVINOModelHolder(modelBytes, weightBytes), isInt8, batchSize, DeviceType.CPU)
})
arr
}

override def release(): Unit = {
isReleased match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scopt.OptionParser

object ClusterServing {
case class ServingParams(configPath: String = "config.yaml", testMode: Boolean = false,
sourceNum: Int = 1)
timerMode: Boolean = false)

val parser = new OptionParser[ServingParams]("Text Classification Example") {
opt[String]('c', "configPath")
Expand All @@ -36,16 +36,22 @@ object ClusterServing {
opt[Boolean]('t', "testMode")
.text("Text Mode of Parallelism 1")
.action((x, params) => params.copy(testMode = x))
opt[Boolean]("timerMode")
.text("Whether to open timer mode")
.action((x, params) => params.copy(timerMode = x))
}

Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("com.intel.analytics.zoo").setLevel(Level.INFO)
var params: SerParams = null
val logger = Logger.getLogger(getClass)
def run(configPath: String = "config.yaml", testMode: Boolean = false): Unit = {
def run(configPath: String = "config.yaml",
testMode: Boolean = false,
timerMode: Boolean = false): Unit = {
val helper = new ClusterServingHelper(configPath)
helper.initArgs()
params = new SerParams(helper)
params.timerMode = timerMode
val serving = StreamExecutionEnvironment.getExecutionEnvironment
serving.registerCachedFile(configPath, Conventions.SERVING_CONF_TMP_PATH)
serving.registerCachedFile(params.modelDir, Conventions.SERVING_MODEL_TMP_DIR)
Expand All @@ -66,6 +72,6 @@ object ClusterServing {
}
def main(args: Array[String]): Unit = {
val param = parser.parse(args, ServingParams()).head
run(param.configPath, param.testMode)
run(param.configPath, param.testMode, param.timerMode)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,23 @@ class FlinkInference(params: SerParams)
val t1 = System.nanoTime()
val postProcessed = if (params.inferenceMode == "single") {
val preProcessed = in.map(item => {
val uri = item._1
val input = pre.decodeArrowBase64(item._2)
(uri, input)
Timer.timing("preprocess one input", 1) {
val uri = item._1
val input = pre.decodeArrowBase64(item._2)
(uri, input)
}
}).toIterator
InferenceSupportive.singleThreadInference(preProcessed, params).toList
if (params.modelType == "openvino") {
InferenceSupportive.singleThreadBatchInference(preProcessed, params).toList
} else {
InferenceSupportive.singleThreadInference(preProcessed, params).toList
}

} else {
val preProcessed = in.grouped(params.coreNum).flatMap(itemBatch => {
Timer.timing("preprocess", itemBatch.size) {
itemBatch.indices.toParArray.map(i => {
Timer.timing("other", 1) {
Timer.timing("preprocess one input", 1) {
val uri = itemBatch(i)._1
val input = pre.decodeArrowBase64(itemBatch(i)._2)
(uri, input)
Expand All @@ -85,7 +92,9 @@ class FlinkInference(params: SerParams)
val t2 = System.nanoTime()
logger.info(s"${postProcessed.size} records backend time ${(t2 - t1) / 1e9} s. " +
s"Throughput ${postProcessed.size / ((t2 - t1) / 1e9)}")
Timer.print()
if (params.timerMode) {
Timer.print()
}
postProcessed
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,18 @@ class FlinkRedisSource(params: SerParams)
val start = System.nanoTime()
val groupName = "serving"
val consumerName = "consumer-" + UUID.randomUUID().toString
val readNumPerTime = if (params.inferenceMode == "single") {
if (params.modelType == "openvino") params.coreNum else 4
} else {
params.coreNum
}
val response = jedis.xreadGroup(
groupName,
consumerName,
params.coreNum,
readNumPerTime,
1,
false,
new SimpleEntry(Conventions.SERVING_STREAM_NAME, StreamEntryID.UNRECEIVED_ENTRY))
// logger.info(s">>> get from source readed redis ${System.currentTimeMillis()} ms")
if (response != null) {
for (streamMessages <- response.asScala) {
val key = streamMessages.getKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,36 @@ object InferenceSupportive {
})
postProcessed
}
def singleThreadBatchInference(preProcessed: Iterator[(String, Activity)],
params: SerParams): Iterator[(String, String)] = {
val postProcessed = preProcessed.grouped(params.coreNum).flatMap(pathByte => {
try {
val thisBatchSize = pathByte.size
val t = Timer.timing("batch", thisBatchSize) {
batchInputSingleThread(pathByte, params)
}
dimCheck(t, "add", params)
val result = Timer.timing("inference", thisBatchSize) {
ModelHolder.model.doPredict(t)
}
dimCheck(result, "remove", params)
dimCheck(t, "remove", params)
val kvResult = Timer.timing("postprocess", thisBatchSize) {
(0 until thisBatchSize).toParArray.map(i => {
val value = PostProcessing(result, params.filter, i + 1)
(pathByte(i)._1, value)
})
}
kvResult
} catch {
case e: Exception =>
logger.info(s"${e.printStackTrace()}, " +
s"Your input format is invalid to your model, this batch is skipped")
pathByte.toParArray.map(x => (x._1, "NaN"))
}
})
postProcessed
}
def multiThreadInference(preProcessed: Iterator[(String, Activity)],
params: SerParams): Iterator[(String, String)] = {
val postProcessed = preProcessed.grouped(params.coreNum).flatMap(pathByteBatch => {
Expand Down Expand Up @@ -80,6 +110,43 @@ object InferenceSupportive {
})
postProcessed
}
def batchInputSingleThread(seq: Seq[(String, Activity)],
params: SerParams): Activity = {
val thisBatchSize = seq.size
println(s"This batch size is ${thisBatchSize.toString}")

val inputSample = seq.head._2.toTable
val kvTuples = inputSample.keySet.map(key => {
(key, Tensor[Float](params.coreNum +:
inputSample(key).asInstanceOf[Tensor[Float]].size()))
}).toList
val t = T(kvTuples.head, kvTuples.tail: _*)
// Batch tensor and copy
(0 until thisBatchSize).foreach(i => {
val dataTable = seq(i)._2.toTable
t.keySet.foreach(key => {
t(key).asInstanceOf[Tensor[Float]].select(1, i + 1)
.copy(dataTable(key).asInstanceOf[Tensor[Float]])
})
})
// Resize and specific control
if (params.resize) {
t.keySet.foreach(key => {
val singleTensorSize = inputSample(key).asInstanceOf[Tensor[Float]].size()
var newSize = Array(thisBatchSize)
for (elem <- singleTensorSize) {
newSize = newSize :+ elem
}
t(key).asInstanceOf[Tensor[Float]].resize(newSize)
})
}
if (t.keySet.size == 1) {
t.keySet.foreach(key => {
return t(key).asInstanceOf[Tensor[Float]]
})
}
t
}
def batchInput(seq: Seq[(String, Activity)],
params: SerParams): Activity = {
val thisBatchSize = seq.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,17 @@ import java.util.PriorityQueue

import org.apache.log4j.Logger


class Timer(name: String = "") {
/**
* Timer class
* @param _countFlag To determine whether this timer should be count into total
* Sequential operations in main workflow should be count
* but parallel operations in one stage should not
* e.g. The whole cost of preprocessing should be taken,
* but the preprocessing time per image, which is counted in
* parallel manner, should be set false, otherwise the statistic
* count time would be wrong
*/
class Timer(_countFlag: Boolean = true) {
var total: Float = 0 // total cost up to now
var record: Int = 0 // total record number up to now
var batchNum: Int = 0 // total batch number up to now
Expand All @@ -32,6 +41,7 @@ class Timer(name: String = "") {
var min: Long = Long.MaxValue // min cost up to now
val topQ = new PriorityQueue[Long]()
val nQ = 10
val countFlag = _countFlag
topQ.add(Long.MinValue)
def print(): Unit = {
println(s"total cost $total, record num $record, average per input $average, " +
Expand All @@ -49,25 +59,33 @@ class Timer(name: String = "") {
}
}

/**
* Singleton class Timer
* Cluster Serving main workflow contains 4 steps,
* preprocess, batch, inference, postprocess
* these 4 steps are taken into total statistics
* In addition to these, developers could add Timer anywhere in code
* and the statistics would not be taken into total workflow timing statistics
* To timing a piece of code, just use
* Timer.timing(snippet_name, record_number_in_this_snippet)(code_snippet)
*/
object Timer {
var timerMap = Map[String, Timer]()
timerMap += ("preprocess" -> new Timer())
timerMap += ("batch" -> new Timer())
timerMap += ("inference" -> new Timer())
timerMap += ("postprocess" -> new Timer())
timerMap += ("other" -> new Timer())

def timing[T](name: String, num: Int)(f: => T): T = {
val begin = System.currentTimeMillis
val result = f
val end = System.currentTimeMillis
val cost = (end - begin)
Logger.getLogger(getClass).info(s"$name time elapsed [${cost / 1000} s, ${cost % 1000} ms].")
if (timerMap.contains(name)) {
updateTimer(timerMap(name), cost, num)
} else {
updateTimer(timerMap("others"), cost, num)
// Logger.getLogger(getClass).info(s"$name time elapsed [${cost / 1000} s, ${cost % 1000} ms].")
if (!timerMap.contains(name)) {
timerMap += (name -> new Timer(false))
}
updateTimer(timerMap(name), cost, num)
result
}
def updateTimer(timer: Timer, cost: Long, num: Int): Unit = {
Expand Down Expand Up @@ -98,8 +116,15 @@ object Timer {
timerMap.foreach(kv => {
println(s"Time stat for ${kv._1} up to now")
kv._2.print()
totalTime += kv._2.total
if (kv._2.countFlag) {
totalTime += kv._2.total
}
})
println(s"Total time of statistic $totalTime ms")
timerMap.foreach(kv => {
if (kv._2.countFlag) {
println(s"${kv._1} time cost total percentage ${(kv._2.total/totalTime) * 100} %")
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import com.intel.analytics.bigdl.transform.vision.image.opencv.OpenCVMat
import com.intel.analytics.zoo.feature.image.OpenCVMethod
import org.opencv.imgcodecs.Imgcodecs
import org.apache.log4j.Logger

import scala.collection.mutable.ArrayBuffer
import com.intel.analytics.bigdl.utils.{T, Table}
import com.intel.analytics.zoo.serving.engine.Timer
import com.intel.analytics.zoo.serving.http.Instances

import com.intel.analytics.zoo.serving.utils.SerParams
import org.opencv.core.Size
import org.opencv.imgproc.Imgproc
Expand All @@ -43,7 +44,7 @@ class PreProcessing(param: SerParams) {
def decodeArrowBase64(s: String): Activity = {
try {
byteBuffer = java.util.Base64.getDecoder.decode(s)
val instance = Instances.fromArrow(byteBuffer)
val instance = Timer.timing("decode arrow", 1)(Instances.fromArrow(byteBuffer))

val kvMap = instance.instances.flatMap(insMap => {
val oneInsMap = insMap.map(kv =>
Expand All @@ -52,7 +53,8 @@ class PreProcessing(param: SerParams) {
(kv._1, decodeString(kv._2.asInstanceOf[String]))
}
else {
(kv._1, decodeImage(kv._2.asInstanceOf[String]))
Timer.timing("decode image", 1)((kv._1, decodeImage(kv._2.asInstanceOf[String])))

}
}
else {
Expand All @@ -65,8 +67,7 @@ class PreProcessing(param: SerParams) {
Seq(T.array(arr.toArray))
})
kvMap.head
}
catch {
} catch {
case e: Exception =>
logger.error(s"Preprocessing error, msg ${e.getMessage}")
logger.error(s"Error stack trace ${e.getStackTrace.mkString("\n")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class ClusterServingHelper(_configPath: String = "config.yaml", _modelDir: Strin
var redisPort: String = null
var nodeNum: Int = 1
var coreNum: Int = 1
var modelPar: Int = 1
var blasFlag: Boolean = false
var chwFlag: Boolean = true

Expand Down Expand Up @@ -156,6 +157,12 @@ class ClusterServingHelper(_configPath: String = "config.yaml", _modelDir: Strin

val paramsConfig = configList.get("params").asInstanceOf[HM]
coreNum = getYaml(paramsConfig, "core_number", 4).asInstanceOf[Int]
modelPar = if (getYaml(paramsConfig, "model_number", default = -1).asInstanceOf[Int] <= 0) {
if (inferenceMode == "single") coreNum else 1
} else {
getYaml(paramsConfig, "model_number", default = -1).asInstanceOf[Int]
}


if (modelType == "caffe" || modelType == "bigdl") {
if (System.getProperty("bigdl.engineType", "mklblas")
Expand Down Expand Up @@ -239,11 +246,10 @@ class ClusterServingHelper(_configPath: String = "config.yaml", _modelDir: Strin
* backend engine type
* @return
*/
def loadInferenceModel(_parallelNum: Int = 1): InferenceModel = {
val parallelNum = if (inferenceMode == "single") coreNum else 1
val batchSize: Int = coreNum / parallelNum
logger.info(s"Cluster Serving load Inference Model with Parallelism $parallelNum")
val model = new InferenceModel(parallelNum)
def loadInferenceModel(): InferenceModel = {

logger.info(s"Cluster Serving load Inference Model with Parallelism $modelPar")
val model = new InferenceModel(modelPar)

// Used for Tensorflow Model, it could not have intraThreadNum > 2^8
// in some models, thus intraThreadNum should be limited
Expand Down Expand Up @@ -290,8 +296,8 @@ class ClusterServingHelper(_configPath: String = "config.yaml", _modelDir: Strin
case "pytorch" => model.doLoadPyTorch(weightPath)
case "keras" => logError("Keras currently not supported in Cluster Serving")
case "openvino" => modelEncrypted match {
case true => model.doLoadEncryptedOpenVINO(defPath, weightPath, secret, salt, batchSize)
case false => model.doLoadOpenVINO(defPath, weightPath, batchSize)
case true => model.doLoadEncryptedOpenVINO(defPath, weightPath, secret, salt, coreNum)
case false => model.doLoadOpenVINO(defPath, weightPath, coreNum)
}
case _ => logError("Invalid model type, please check your model directory")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class SerParams(helper: ClusterServingHelper) extends Serializable {
val redisSecureEnabled = helper.redisSecureEnabled
val redisSecureTrustStorePath = helper.redisSecureTrustStorePath
val redisSecureTrustStorePassword = helper.redisSecureTrustStorePassword
var timerMode: Boolean = false
println(s"loading params, time is ${sdf.format(lastModified)}")

val resize = helper.resize
Expand Down