Skip to content

Commit

Permalink
feat: multi models support with MKL-DNN backend (intel-analytics#2936)
Browse files Browse the repository at this point in the history
* feat: multi models support with MKL-DNN backend
  • Loading branch information
i8run committed Oct 26, 2019
1 parent 0915e53 commit eff462c
Show file tree
Hide file tree
Showing 14 changed files with 300 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ import java.util.UUID

import com.intel.analytics.bigdl.Module
import com.intel.analytics.bigdl.nn.Container
import com.intel.analytics.bigdl.nn.abstractnn.Activity
import com.intel.analytics.bigdl.nn.mkldnn.{MklDnnLayer, TensorMMap}
import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric
import com.intel.analytics.bigdl.tensor._
import com.intel.analytics.bigdl.utils.Engine
import com.intel.analytics.bigdl.utils.{Engine, MklDnn}
import com.intel.analytics.bigdl.utils.Util._
import com.intel.analytics.bigdl.utils.intermediate.IRGraph
import org.apache.commons.lang3.SerializationUtils
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.zookeeper.KeeperException.UnimplementedException

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
Expand All @@ -46,6 +50,11 @@ trait ModelBroadcast[T] extends Serializable {
*/
def broadcast(sc: SparkContext, model: Module[T]): this.type

private[bigdl] def broadcast(sc: SparkContext, model: Module[T],
dummyInput: Activity): this.type = {
throw new UnimplementedException
}

/**
* Get the broadcast model on worker
*
Expand All @@ -55,6 +64,11 @@ trait ModelBroadcast[T] extends Serializable {
*/
def value(initGradient: Boolean = false, shareWeight: Boolean = true): Module[T]

private[bigdl] def value(initGradient: Boolean, shareWeight: Boolean,
dummyInput: Activity): Module[T] = {
throw new UnimplementedException
}

def uuid(): String = _uuid
}

Expand All @@ -81,9 +95,11 @@ object ModelBroadcast {
private[bigdl] class ModelBroadcastImp[T: ClassTag](applyProtoBuffer: Boolean = false)
(implicit ev: TensorNumeric[T]) extends ModelBroadcast[T] {

private type NativeType = (String, (Array[TensorMMap], Array[TensorMMap]))
private var broadcastModel: Broadcast[ModelInfo[T]] = _
private var broadcastConsts: Broadcast[Map[String, Tensor[_]]] = _
private var broadcastParameters: Broadcast[Array[Tensor[T]]] = _
private var broadcastParametersNative: Broadcast[Array[NativeType]] = _
private var nodeNumber : Int = _
private var coreNumber : Int = _

Expand Down Expand Up @@ -209,6 +225,54 @@ private[bigdl] class ModelBroadcastImp[T: ClassTag](applyProtoBuffer: Boolean =
Array()
}
}

private def getTensorMMaps(ir: IRGraph[T]) = {
ir.graph
.getSortedForwardExecutions()
.filter(_.element.isInstanceOf[MklDnnLayer])
.map { node =>
val element = node.element
val name = element.getName()
val tensorMMap = element.asInstanceOf[MklDnnLayer].paramsMMap()
(name, tensorMMap)
}
}

override def broadcast(sc: SparkContext, model: Module[T],
dummyInput: Activity): this.type = {
if (model.isInstanceOf[IRGraph[T]] && Engine.getEngineType() == MklDnn &&
Engine.isMultiModels) {
val clonedModel = model.asInstanceOf[IRGraph[T]].cloneModule()
clonedModel.forward(dummyInput)

broadcastParametersNative = sc.broadcast(getTensorMMaps(clonedModel))
}

this.broadcast(sc, model)
this
}

override def value(initGradient: Boolean, shareWeight: Boolean,
dummyInput: Activity): Module[T] = {
val model = value(initGradient, shareWeight)

if (model.isInstanceOf[IRGraph[T]] && Engine.getEngineType() == MklDnn &&
Engine.isMultiModels) {
model.forward(dummyInput)

if (shareWeight) {
getTensorMMaps(model.asInstanceOf[IRGraph[T]]).zip(broadcastParametersNative.value)
.foreach { case (src, dst) =>
if (src._1 == dst._1) {
src._2._1.zip(dst._2._1)
.filter(x => x._1 != null && x._2 != null)
.foreach{ case (x, y) => x.setNative(y) }
}
}
}
}
model
}
}

private[bigdl] class ModelInfo[T: ClassTag](val uuid: String, @transient var model: Module[T])(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ trait MklDnnLayer extends AbstractModule[Activity, Activity, Float] with MklDnnM
}

override def setQuantize(value: Boolean): MklDnnLayer.this.type = this

def paramsMMap(): (Array[TensorMMap], Array[TensorMMap]) = {
// return null for weight and gradWeight by default
(Array.empty[TensorMMap], Array.empty[TensorMMap])
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ class Linear(
(Array(weight.dense, bias.dense), Array(gradWeight.dense, gradBias.dense))
}

override def paramsMMap(): (Array[TensorMMap], Array[TensorMMap]) = {
(Array(weight, bias), Array(gradWeight, gradBias))
}

override def zeroGradParameters(): Unit = {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ class SpatialBatchNormalization(
(Array(weightAndBias.dense), Array(gradWeightAndBias.dense))
}

override def paramsMMap(): (Array[TensorMMap], Array[TensorMMap]) = {
(Array(weightAndBias), Array(gradWeightAndBias))
}

override def getExtraParameter(): Array[Tensor[Float]] = {
if (needScale) {
runningMeanScaled.copy(runningMean.dense).div(scaleFactor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,10 @@ class SpatialConvolution(

}

override def paramsMMap(): (Array[TensorMMap], Array[TensorMMap]) = {
(Array(weight, bias), Array(gradWeight, gradBias))
}

// we need not implement it, because the grad parameters will clean by mkldnn
override def zeroGradParameters(): Unit = {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import com.intel.analytics.bigdl.nn.abstractnn.Activity
import com.intel.analytics.bigdl.nn.mkldnn.Phase.{InferencePhase, TrainingPhase}
import com.intel.analytics.bigdl.tensor.{DnnTensor, FloatType, Tensor}

import scala.reflect.ClassTag

/**
* `TensorMMap` contains two tensors, dense and native, which are a map of each other.
* It's used in the layer which contains weights. For the weight, we should sync the
Expand All @@ -29,7 +31,7 @@ import com.intel.analytics.bigdl.tensor.{DnnTensor, FloatType, Tensor}
*
* @param _size the shape of Tensor, such as Array(4, 3, 224, 224)
*/
private[mkldnn] class TensorMMap(_size: Array[Int])(implicit owner: MemoryOwner)
private[bigdl] class TensorMMap(_size: Array[Int])(implicit owner: MemoryOwner)
extends Serializable {
// dense weight on heap is used to optimizer and so on, which is exposed to
// AbstractModule level.
Expand Down Expand Up @@ -115,4 +117,15 @@ private[mkldnn] class TensorMMap(_size: Array[Int])(implicit owner: MemoryOwner)
dense.size(index)
}

def release(): Unit = {
if (native != null) {
native.release()
}
}

def setNative(another: TensorMMap): Unit = {
if (native != null && another.native != null) {
native.set(another.native.asInstanceOf[Tensor[_]])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,19 @@ class Evaluator[T: ClassTag] private[optim](model: Module[T])(implicit ev: Tenso
vMethods: Array[ValidationMethod[T]],
batchSize: Option[Int] = None): Array[(ValidationResult, ValidationMethod[T])] = {

val modelBroad = ModelBroadcast[T]().broadcast(dataset.sparkContext,
ConversionUtils.convert(model.evaluate()))
val partitionNum = dataset.partitions.length

val totalBatch = batchSize.getOrElse(batchPerPartition * partitionNum)

val dummyInput = Predictor.getDummyData(dataset, totalBatch / partitionNum)

val modelBroad = ModelBroadcast[T]().broadcast(dataset.sparkContext,
ConversionUtils.convert(model.evaluate()), dummyInput)
val rdd = ConversionUtils.coalesce(dataset)
val otherBroad = rdd.sparkContext.broadcast(vMethods, SampleToMiniBatch(
batchSize = totalBatch, partitionNum = Some(rdd.partitions.length)))

rdd.mapPartitions(partition => {
val localModel = modelBroad.value()
val localModel = modelBroad.value(false, true, dummyInput)
val localMethod = otherBroad.value._1.map(_.clone())
val localTransformer = otherBroad.value._2.cloneTransformer()
val miniBatch = localTransformer(partition)
Expand All @@ -86,14 +88,14 @@ class Evaluator[T: ClassTag] private[optim](model: Module[T])(implicit ev: Tenso
vMethods: Array[ValidationMethod[T]]
): Array[(ValidationResult, ValidationMethod[T])] = {

val dummyInput = dataset.takeSample(withReplacement = false, num = 1).head.getInput()
val rdd = ConversionUtils.coalesce(dataset)
val modelBroad = ModelBroadcast[T]().broadcast(rdd.sparkContext,
ConversionUtils.convert(model.evaluate()))
ConversionUtils.convert(model.evaluate()), dummyInput)
val otherBroad = rdd.sparkContext.broadcast(vMethods)


rdd.mapPartitions(miniBatch => {
val localModel = modelBroad.value()
val localModel = modelBroad.value(false, true, dummyInput)
val localMethod = otherBroad.value
miniBatch.map(batch => {
val output = localModel.forward(batch.getInput())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ object Predictor {
model: Module[T],
featurePaddingParam: Option[PaddingParam[T]])(
implicit ev: TensorNumeric[T]): DistributedImageFrame = {

val dummyInput = getDummyData(imageFrame.rdd, batchPerPartition)
val totalBatch = imageFrame.rdd.partitions.length * batchPerPartition
val rdd = ConversionUtils.coalesce(imageFrame.asInstanceOf[DistributedImageFrame].rdd)
val modelBroad = ModelBroadcast[T]().broadcast(rdd.sparkContext,
ConversionUtils.convert(model.evaluate()))
val totalBatch = imageFrame.rdd.partitions.length * batchPerPartition
ConversionUtils.convert(model.evaluate()), dummyInput)

val realPartitionLength = rdd.partitions.length
val toBatchBroad = rdd.sparkContext.broadcast(SampleToMiniBatch(
Expand All @@ -138,7 +138,7 @@ object Predictor {
val localBatchPerPartition = totalBatch / realPartitionLength

val result = rdd.mapPartitions(partition => {
val localModel = modelBroad.value()
val localModel = modelBroad.value(false, true, dummyInput)
val localToBatch = toBatchBroad.value._1.cloneTransformer()
partition.grouped(localBatchPerPartition).flatMap(imageFeatures => {
Predictor.predictImageBatch[T](localModel, imageFeatures, outputLayer, predictKey,
Expand All @@ -152,8 +152,6 @@ object Predictor {
def predict[T: ClassTag](dataSet: RDD[Sample[T]], batchSize: Int = -1,
shareBuffer: Boolean = false, model: Module[T], batchPerPartition: Int,
featurePaddingParam: Option[PaddingParam[T]])(implicit ev: TensorNumeric[T]): RDD[Activity] = {
val modelBroad = ModelBroadcast[T]().broadcast(dataSet.sparkContext,
ConversionUtils.convert(model.evaluate()))
val partitionNum = dataSet.partitions.length
val totalBatch = if (batchSize > 0) {
require(batchSize % partitionNum == 0, s"Predictor.predict: total batch size $batchSize " +
Expand All @@ -162,13 +160,16 @@ object Predictor {
} else {
batchPerPartition * partitionNum
}
val dummyInput = getDummyData(dataSet, totalBatch / partitionNum)
val modelBroad = ModelBroadcast[T]().broadcast(dataSet.sparkContext,
ConversionUtils.convert(model.evaluate()), dummyInput)
val rdd = ConversionUtils.coalesce(dataSet)
val otherBroad = rdd.sparkContext.broadcast(SampleToMiniBatch(
batchSize = totalBatch,
partitionNum = Some(rdd.partitions.length),
featurePaddingParam = featurePaddingParam))
rdd.mapPartitions { partition =>
val localModel = modelBroad.value()
val localModel = modelBroad.value(false, true, dummyInput)
val localTransformer = otherBroad.value.cloneTransformer()
val miniBatch = localTransformer(partition)
miniBatch.flatMap(batch => {
Expand All @@ -192,6 +193,19 @@ object Predictor {
})
}
}

// because Evaluator will use it too, we extend the scope out of Predictor
private[optim] def getDummyData[T: ClassTag, R](dataSet: RDD[R],
batchSize: Int)(implicit ev: TensorNumeric[T]): Activity = {
// here has an assumption, batchSizePerPar is not very large.
val samples = dataSet.takeSample(withReplacement = false, num = batchSize)
.map {
case feature: ImageFeature => feature[Sample[T]](ImageFeature.sample)
case sample => sample.asInstanceOf[Sample[T]]
}
val sampleToMiniBatch = SampleToMiniBatch(batchSize)
sampleToMiniBatch(samples.toIterator).toSeq.head.getInput()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ class DnnTensor[T: ClassTag](
this
}

override def set(other: Tensor[T]): Tensor[T] = {
require(other.isInstanceOf[DnnTensor[T]], s"only support to set DnnTensor")
this._storage.release()
this._storage = other.storage().asInstanceOf[DnnStorage[T]]
this
}

override def toString: String = {
ev.getType() match {
case FloatType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ object Engine {

// Initialize some properties for mkldnn engine. We should call it at the beginning.
// Otherwise some properties will have no effect.
if (System.getProperty("bigdl.engineType") == "mkldnn") {
if (System.getProperty("bigdl.engineType") == "mkldnn" &&
System.getProperty("bigdl.multiModels", "false") == "false") {
setMklDnnEnvironments()
}

Expand Down Expand Up @@ -326,6 +327,13 @@ object Engine {
this.engineType
}

private[bigdl] def isMultiModels: Boolean = {
getEngineType() match {
case MklBlas => true
case MklDnn => System.getProperty("bigdl.multiModels", "false").toBoolean
}
}

private[bigdl] def model: ThreadPool = {
_model
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class ThreadPool(private var poolSize: Int) {
threadPool = Executors.newFixedThreadPool(poolSize, new ThreadFactory {
override def newThread(r: Runnable): Thread = {
val t = Executors.defaultThreadFactory().newThread(r)
t.setName("default-thread-computing")
t.setName("default-thread-computing " + t.getId)
t.setDaemon(true)
t
}
Expand Down Expand Up @@ -91,6 +91,7 @@ class ThreadPool(private var poolSize: Int) {
mklPoolSize = Some(size)
(1 to poolSize).map(i => Future {
MKL.setNumThreads(size)
BackendMklDnn.setNumThreads(size)
val tid = Thread.currentThread().getId()
logger.info(s"Set mkl threads to $size on thread $tid")
}(context)).foreach(Await.result(_, Duration.Inf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[bigdl] object ConversionUtils {
*/
def coalesce[T: ClassTag](dataset: RDD[T]): RDD[T] = {
if (dataset.partitions.length != Engine.nodeNumber()
&& Engine.getEngineType() == MklDnn) {
&& !Engine.isMultiModels) {
dataset.coalesce(Engine.nodeNumber(), false)
} else dataset
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,17 @@ private[bigdl] class IRGraph[T: ClassTag](
throw new UnsupportedOperationException("forward not supported, Please build graph first")
}
if (graph.isInstanceOf[DnnGraph]) {
Engine.dnnComputing.invokeAndWait2(Array(0).map(_ => () => {
// if using multi MKL-DNN model, we just use current thread directly
// because it's in sequential mode of MKL and MKL-DNN
if (Engine.isMultiModels) {
initPrimitives(input)
graph.updateOutput(input)
}))
} else {
Engine.dnnComputing.invokeAndWait2(Array(0).map(_ => () => {
initPrimitives(input)
graph.updateOutput(input)
}))
}
} else graph.updateOutput(input)
output = graph.output
output
Expand Down
Loading

0 comments on commit eff462c

Please sign in to comment.