Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster Serving ref hot fix #2965

Merged
merged 12 commits into from
Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,4 @@ object ClusterServingInferenceBlockOthersExample {
object Model {
var queueing: Int = 0
var model: String = "MyModel"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.concurrent.LinkedBlockingQueue
import java.util.{List => JList}

import com.intel.analytics.bigdl.nn.abstractnn.Activity
import com.intel.analytics.zoo.serving.engine.ModelHolder

import scala.collection.JavaConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,40 +105,35 @@ object MockParallelPipelineBaseline extends Supportive {
a = a :+ (i.toString(), b64string)
)
(0 until param.testNum).grouped(sParam.coreNum).flatMap(i => {
val preprocessed = timer.timing(s"Thread ${Thread.currentThread().getId} Preprocess", sParam.coreNum) {
val preprocessed = timer.timing(
s"Thread ${Thread.currentThread().getId} Preprocess", sParam.coreNum) {
a.map(item => {
// println(s"${System.currentTimeMillis()} Thread ${Thread.currentThread().getId} attempting to preprocess")
//
// while (ModelHolder.modelQueueing != 0) {
//// println(s"${System.currentTimeMillis()} Thread ${Thread.currentThread().getId} waiting at preprocess")
// ModelHolder.lock.lock()
// ModelHolder.modelAvailable.awaitUninterruptibly()
// ModelHolder.lock.unlock()
// }
println(s"${ModelHolder.modelQueueing} threads are queueing inference")
// println(s"${System.currentTimeMillis()} Thread ${Thread.currentThread().getId} preprocess lock checked")
val tensor = timer.timing(s"Thread ${Thread.currentThread().getId} Preprocess one record", sParam.coreNum) {
val tensor = timer.timing(
s"Thread ${Thread.currentThread().getId} Preprocess one record", sParam.coreNum) {
val deserializer = new ArrowDeserializer()
val arr = deserializer.create(b64string)
Tensor(arr(0)._1, arr(0)._2)
}

// println(s"${System.currentTimeMillis()} Thread ${Thread.currentThread().getId} preprocess finished")
(item._1, T(tensor))

})
}

val t = timer.timing(s"Thread ${Thread.currentThread().getId} Batch input", sParam.coreNum) {
clusterServingInference.batchInput(preprocessed, sParam.coreNum, false, sParam.resize)
val t = timer.timing(
s"Thread ${Thread.currentThread().getId} Batch input", sParam.coreNum) {
clusterServingInference.batchInput(
preprocessed, sParam.coreNum, false, sParam.resize)
}
clusterServingInference.dimCheck(t, "add", sParam.modelType)
val result = timer.timing(s"Thread ${Thread.currentThread().getId} Inference", sParam.coreNum) {
val result = timer.timing(
s"Thread ${Thread.currentThread().getId} Inference", sParam.coreNum) {
ModelHolder.model.doPredict(t)
}
clusterServingInference.dimCheck(t, "remove", sParam.modelType)
clusterServingInference.dimCheck(result, "remove", sParam.modelType)
val postprocessed = timer.timing(s"Thread ${Thread.currentThread().getId} Postprocess", sParam.coreNum) {
val postprocessed = timer.timing(
s"Thread ${Thread.currentThread().getId} Postprocess", sParam.coreNum) {
(0 until sParam.coreNum).map(i => {
ArrowSerializer.activityBatchToByte(result, i + 1)
})
Expand All @@ -148,11 +143,6 @@ object MockParallelPipelineBaseline extends Supportive {
}).toArray
timer.print()
})

}




}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,26 +116,32 @@ object OpenVINOBaseline extends Supportive {
a = a :+ (i.toString(), b64string)
)
(0 until param.testNum).grouped(sParam.coreNum).flatMap(i => {
val preprocessed = timer.timing(s"Thread ${Thread.currentThread().getId} Preprocess", sParam.coreNum) {
val preprocessed = timer.timing(
s"Thread ${Thread.currentThread().getId} Preprocess", sParam.coreNum) {
a.map(item => {
val deserializer = new ArrowDeserializer()
val arr = deserializer.create(b64string)
val tensor = Tensor(arr(0)._1, arr(0)._2)
println(s"${System.currentTimeMillis()} Thread ${Thread.currentThread().getId} preprocess finished")
println(s"${System.currentTimeMillis()} " +
s"Thread ${Thread.currentThread().getId} preprocess finished")
(item._1, T(tensor))
})
}
val t = timer.timing(s"Thread ${Thread.currentThread().getId} Batch input", sParam.coreNum) {
clusterServingInference.batchInput(preprocessed, sParam.coreNum, false, sParam.resize)
val t = timer.timing(
s"Thread ${Thread.currentThread().getId} Batch input", sParam.coreNum) {
clusterServingInference.batchInput(
preprocessed, sParam.coreNum, false, sParam.resize)
}
clusterServingInference.dimCheck(t, "add", sParam.modelType)
val result = timer.timing(s"Thread ${Thread.currentThread().getId} Inference", sParam.coreNum) {
val result = timer.timing(
s"Thread ${Thread.currentThread().getId} Inference", sParam.coreNum) {
model.predict(t)
// model.forward(t)
}
clusterServingInference.dimCheck(t, "remove", sParam.modelType)
clusterServingInference.dimCheck(result, "remove", sParam.modelType)
val postprocessed = timer.timing(s"Thread ${Thread.currentThread().getId} Postprocess", sParam.coreNum) {
val postprocessed = timer.timing(
s"Thread ${Thread.currentThread().getId} Postprocess", sParam.coreNum) {
(0 until sParam.coreNum).map(i => {
ArrowSerializer.activityBatchToByte(result, i + 1)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,17 @@ object PreprocessingBaseline extends Supportive {
a = a :+ (i.toString(), b64string)
)
(0 until param.testNum).grouped(sParam.coreNum).flatMap(i => {
val preprocessed = timer.timing(s"Thread ${Thread.currentThread().getId} Preprocess", sParam.coreNum) {
val preprocessed = timer.timing(
s"Thread ${Thread.currentThread().getId} Preprocess", sParam.coreNum) {
a.map(item => {
ModelHolder.synchronized{
while (ModelHolder.modelQueueing != 0) {
ModelHolder.wait()
}
ModelHolder.nonOMP += 1
}
val tensor = timer.timing(s"Thread ${Thread.currentThread().getId} Preprocess one record", sParam.coreNum) {
val tensor = timer.timing(
s"Thread ${Thread.currentThread().getId} Preprocess one record", sParam.coreNum) {
val deserializer = new ArrowDeserializer()
val arr = deserializer.create(b64string)
Tensor(arr(0)._1, arr(0)._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class ClusterServingInference(preProcessing: PreProcessing,
def multiThreadPipeline(in: List[(String, String)]): List[(String, String)] = {
multiThreadInference(preProcess(in, true))
}
def preProcess(in: List[(String, String)],

def preProcess(in: List[(String, String)],
multiThread: Boolean = false): List[(String, Activity)] = {
if (!multiThread) {
in.map(item => {
Expand All @@ -61,7 +61,7 @@ class ClusterServingInference(preProcessing: PreProcessing,
}
}
def singleThreadInference(in: List[(String, Activity)]): List[(String, String)] = {

val postProcessed = in.map(pathByte => {
try {
val t = typeCheck(pathByte._2)
Expand All @@ -88,7 +88,7 @@ class ClusterServingInference(preProcessing: PreProcessing,
* fixed size of input, thus mutable batch size is not supported
*/
def singleThreadBatchInference(in: List[(String, Activity)]): List[(String, String)] = {

val postProcessed = in.grouped(batchSize).flatMap(pathByte => {
try {
val thisBatchSize = pathByte.size
Expand Down Expand Up @@ -121,7 +121,7 @@ class ClusterServingInference(preProcessing: PreProcessing,
}

def multiThreadInference(in: List[(String, Activity)]): List[(String, String)] = {

val postProcessed = in.grouped(batchSize).flatMap(itemBatch => {
try {
val size = itemBatch.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,4 @@ class FlinkRedisSink(params: SerParams) extends RichSinkFunction[List[(String, S
}
object JedisPoolHolder {
var jedisPool: JedisPool = null
}
}