Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink example #1586

Merged
merged 8 commits into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion apps/model-inference-examples/model-inference-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@
</dependency>
<dependency>
<groupId>com.intel.analytics.zoo</groupId>
<artifactId>analytics-zoo-bigdl_0.8.0-spark_2.4.3</artifactId>
<artifactId>analytics-zoo-bigdl_0.9.0-spark_2.4.3</artifactId>
<version>0.6.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.intel.analytics.zoo</groupId>
<artifactId>zoo-core-dist-all</artifactId>
<version>0.6.0-SNAPSHOT</version>
</dependency>
<dependency>
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package com.intel.analytics.zoo.apps.model.inference.flink.Resnet50ImageClassification

import java.io.{File, FileInputStream}
import java.util.{Arrays, List => JList}

import com.intel.analytics.zoo.pipeline.inference.JTensor
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStreamUtils
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}

import scala.collection.JavaConverters._

object ImageClassificationStreaming {

def main(args: Array[String]): Unit = {
var modelType = "resnet_v1_50"
var checkpointPathcheckpointPath: String = "/path/to/models/resnet_v1_50.ckpt"
var ifReverseInputChannels = true
var inputShape = Array(1, 224, 224, 3)
var meanValues = Array(123.68f, 116.78f, 103.94f)
var scale = 1.0f

try {
val params = ParameterTool.fromArgs(args)
modelType = params.get("modelType")
checkpointPathcheckpointPath = params.get("checkpointPathcheckpointPath")
inputShape = if (params.has("inputShape")) {
val inputShapeStr = params.get("inputShape")
inputShapeStr.split(",").map(_.toInt).toArray
} else Array(1, 224, 224, 3)
ifReverseInputChannels = if (params.has("ifReverseInputChannels")) params.getBoolean("ifReverseInputChannels") else true
meanValues = if (params.has("meanValues")) {
val meanValuesStr = params.get("meanValues")
meanValuesStr.split(",").map(_.toFloat).toArray
} else Array(123.68f, 116.78f, 103.94f)
scale = if (params.has("scale")) params.getFloat("scale") else 1.0f
} catch {
case e: Exception => {
System.err.println("Please run 'ImageClassificationStreaming --modelType <modelType> --checkpointPathcheckpointPath <checkpointPathcheckpointPath> " +
"--inputShape <inputShapes> --ifReverseInputChannels <ifReverseInputChannels> --meanValues <meanValues> --scale <scale>" +
"--parallelism <parallelism>'.")
return
}
}

println("start ImageClassificationStreaming job...")
println("params resolved", modelType, checkpointPathcheckpointPath, inputShape.mkString(","), ifReverseInputChannels, meanValues.mkString(","), scale)

val classLoader = this.getClass.getClassLoader
val content = classLoader.getResourceAsStream("n02110063_11239.JPEG")
val imageBytes = Stream.continually(content.read).takeWhile(_ != -1).map(_.toByte).toArray
val imageProcess = new ImageProcesser(imageBytes, 224, 224)
val res = imageProcess.preProcess(imageBytes, 224, 224)
val input = new Array[Float](res.nElement())
val inputs = List.fill(100)(input)

val fileSize = new File(checkpointPathcheckpointPath).length()
val inputStream = new FileInputStream(checkpointPathcheckpointPath)
val modelBytes = new Array[Byte](fileSize.toInt)
inputStream.read(modelBytes)

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
println(env.getConfig)

val dataStream: DataStream[Array[Float]] = env.fromCollection(inputs)
val tensorStream: DataStream[JList[JList[JTensor]]] = dataStream.map(value => {
val input = new JTensor(value, Array(1, 224, 224, 3))
val data = Arrays.asList(input)
List(data).asJava
})

val resultStream = tensorStream.map(new ModelPredictionMapFunction(modelType, modelBytes, inputShape, ifReverseInputChannels, meanValues, scale))

env.execute("ImageClassificationStreaming")

val results = DataStreamUtils.collect(resultStream.javaStream).asScala

println(" Printing result to stdout.")
results.foreach(println)
}
}

class ModelPredictionMapFunction(modelType: String, modelBytes: Array[Byte], inputShape: Array[Int], ifReverseInputChannels: Boolean, meanValues: Array[Float], scale: Float) extends RichMapFunction[JList[JList[JTensor]], JList[JList[JTensor]]] {
var resnet50InferenceModel: Resnet50InferenceModel = _
override def open(parameters: Configuration): Unit = {
resnet50InferenceModel = new Resnet50InferenceModel(1, modelType, modelBytes, inputShape, ifReverseInputChannels, meanValues, scale)
}
override def close(): Unit = {
resnet50InferenceModel.doRelease()
}
override def map(in: JList[JList[JTensor]]): JList[JList[JTensor]] = {
resnet50InferenceModel.doPredict(in)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.intel.analytics.zoo.apps.model.inference.flink.Resnet50ImageClassification

class ImageProcesser(bytes: Array[Byte], cropWidth: Int, cropHeight: Int) extends ImageProcessing {
def preProcess(bytes: Array[Byte], cropWidth: Int, cropHeight: Int) = {
val imageMat = byteArrayToMat(bytes)
val imageCent = centerCrop(imageMat, cropWidth, cropHeight)
val imageTensor = matToNCHWAndRGBTensor(imageCent)
imageTensor
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.intel.analytics.zoo.apps.model.inference.flink.Resnet50ImageClassification

import com.intel.analytics.bigdl.tensor.Tensor
import com.intel.analytics.bigdl.transform.vision.image.opencv.OpenCVMat
import com.intel.analytics.bigdl.transform.vision.image.util.BoundingBox
import com.intel.analytics.zoo.feature.image.OpenCVMethod
import org.apache.commons.io.FileUtils
import org.opencv.core.Rect
import org.opencv.imgcodecs.Imgcodecs
import org.slf4j.LoggerFactory

trait ImageProcessing {
val logger = LoggerFactory.getLogger(getClass)

def byteArrayToMat(bytes: Array[Byte], imageCodec: Int = Imgcodecs.CV_LOAD_IMAGE_UNCHANGED): OpenCVMat = {
OpenCVMethod.fromImageBytes(bytes, imageCodec)
}

def centerCrop(mat: OpenCVMat, cropWidth: Int, cropHeight: Int, normalized: Boolean = false, isClip: Boolean = false): OpenCVMat = {
val height = mat.height().toFloat
val width = mat.width().toFloat
val startH = (height - cropHeight) / 2
val startW = (width - cropWidth) / 2
val box = BoundingBox(startW, startH, startW + cropWidth, startH + cropHeight, normalized)
val (wStart: Float, hStart: Float, wEnd: Float, hEnd: Float) = (box.x1, box.y1, box.x2, box.y2)
var (x1, y1, x2, y2) = if (normalized) {
(wStart * width, hStart * height, wEnd * width, hEnd * height)
} else {
(wStart, hStart, wEnd, hEnd)
}
if (isClip) {
x1 = Math.max(Math.min(x1, width), 0f)
y1 = Math.max(Math.min(y1, height), 0f)
x2 = Math.max(Math.min(x2, width), 0f)
y2 = Math.max(Math.min(y2, height), 0f)
}
val rect = new Rect(x1.toInt, y1.toInt, (x2 - x1).toInt, (y2 - y1).toInt)
val cropedMat = new OpenCVMat()
mat.submat(rect).copyTo(cropedMat)
cropedMat
}

def matToNCHWAndRGBTensor(mat: OpenCVMat) = {
val (height, width, channel) = (mat.height(), mat.width(), mat.channels())
val data = new Array[Float](height * width * channel)
OpenCVMat.toFloatPixels(mat, data)
val imageTensor: Tensor[Float] = Tensor[Float]()
imageTensor.resize(channel, height, width)
val storage = imageTensor.storage().array()
imageTensor.transpose(1, 2).transpose(2, 3)
val offset = 0
val frameLength = width * height
var j = 0
while (j < frameLength) {
storage(offset + j) = data(j * 3 + 2)
storage(offset + j + frameLength) = data(j * 3 + 1)
storage(offset + j + frameLength * 2) = data(j * 3)
j += 1
}
imageTensor
}

def channelScaledNormalize(tensor: Tensor[Float], meanR: Int, meanG: Int, meanB: Int, scale: Double) = {
val content = tensor.storage().array()
val frameLength = content.length / 3
val channel = tensor.size(1)
val height = tensor.size(2)
val width = tensor.size(3)
//println(channel, height, width)

val channels = 3
val mean = Array(meanR, meanG, meanB)
var c = 0
while (c < channels) {
var i = 0
while (i < frameLength) {
val data_index = c * frameLength + i
//println(content(data_index), ((content(data_index) - mean(c)) * scale).toFloat)
content(data_index) = ((content(data_index) - mean(c)) * scale).toFloat
//println(content(data_index))
i += 1
}
c += 1
}
tensor
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
## Analytics-Zoo InferenceModel with openVINO accelerating on Flink Streaming
This is the example of batch and streaming with Flink and Resnet50 model, as well as using Analytics-Zoo InferenceModel to acclerate prediction.
### Requirements
* JDK 1.8
* Flink 1.8.1
* scala 2.11/2.12
* Python 3.x

### Environment
Install dependencies for each flink node.
```
sudo apt install python3-pip
pip3 install numpy
pip3 install networkx
pip3 install tensorflow
```
### Start and stop Flink
you may start a flink cluster if there is no runing one:
```
./bin/start-cluster.sh
```
Check the Dispatcher's web frontend at http://localhost:8081 and make sure everything is up and running.
To stop Flink when you're done type:
```
./bin/stop-cluster.sh
```

### Run the Example
* Run `export FLINK_HOME=the root directory of flink`.
* Run `export ANALYTICS_ZOO_HOME=the folder of Analytics Zoo project`.
* Download [resnet_v1_50 model](http://download.tensorflow.org/models/resnet_v1_50_2016_08_28.tar.gz). Run `export MODEL_PATH=path to the downloaded model`.
* Prepare the prediction dataset from [imagenet](http://www.image-net.org/) and extract it.
* Go to the root directory of model-inference-flink and execute the `mvn clean package` command, which prepares the jar file for model-inference-flink.
* Edit flink-conf.yaml to set heap size or the number of task slots as you need, ie, `jobmanager.heap.size: 10g`
* Run the follwing command with arguments to submit the Flink program. Change parameter settings as you need.

```bash
${FLINK_HOME}/bin/flink run \
-m localhost:8081 -p ${task_slot_num} \
-c com.intel.analytics.zoo.apps.model.inference.flink.Resnet50ImageClassification.ImageClassificationStreaming \
${ANALYTICS_ZOO_HOME}/apps/model-inference-examples/model-inference-flink/target/model-inference-flink-0.1.0-SNAPSHOT-jar-with-dependencies.jar \
--modelType resnet_v1_50 --checkpointPathcheckpointPath ${MODEL_PATH} \
--inputShape "1,224,224,3" --ifReverseInputChannels true --meanValues "123.68,116.78,103.94" --scale 1
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.intel.analytics.zoo.apps.model.inference.flink.Resnet50ImageClassification

import com.intel.analytics.zoo.pipeline.inference.InferenceModel

class Resnet50InferenceModel(var concurrentNum: Int = 1, modelType: String, modelBytes: Array[Byte], inputShape: Array[Int], ifReverseInputChannels: Boolean, meanValues: Array[Float], scale: Float) extends InferenceModel(concurrentNum) with Serializable {
doLoadTF(null, modelType, modelBytes, inputShape, ifReverseInputChannels, meanValues, scale)
println(this)
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.intel.analytics.zoo.apps.model.inference.flink
package com.intel.analytics.zoo.apps.model.inference.flink.TextClassification

import com.intel.analytics.zoo.pipeline.inference.InferenceModel

Expand All @@ -8,4 +8,4 @@ class TextClassificationInferenceModel(val supportedConcurrentNum: Int, val stop
val textProcessor = new TextProcessor(stopWordsCount, sequenceLength, embeddingFilePath)

def preprocess(text: String) = textProcessor.preprocess(text)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.intel.analytics.zoo.apps.model.inference.flink
package com.intel.analytics.zoo.apps.model.inference.flink.TextClassification

import scala.collection.JavaConverters._
import java.util.{List => JList}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.intel.analytics.zoo.apps.model.inference.flink
package com.intel.analytics.zoo.apps.model.inference.flink.TextClassification

import java.io.File

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
</dependency>
<dependency>
<groupId>com.intel.analytics.zoo</groupId>
<artifactId>analytics-zoo-bigdl_0.8.0-spark_2.4.3</artifactId>
<artifactId>analytics-zoo-bigdl_0.9.0-spark_2.4.3</artifactId>
<version>0.6.0-SNAPSHOT</version>
</dependency>
<dependency>
Expand Down