Skip to content

Commit

Permalink
add multi thread sample to minibatch (#1589)
Browse files Browse the repository at this point in the history
* add mt sample to minibatch

* revert log level

* delete some debug code

* add comments

* fix style check

* update pom
  • Loading branch information
qiuxin2012 committed Sep 11, 2019
1 parent 50aa324 commit 4d0d42e
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 34 deletions.
2 changes: 1 addition & 1 deletion zoo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<scala.macros.version>2.1.0</scala.macros.version>
<scalatest.version>2.2.4</scalatest.version>
<spark.version>2.1.0</spark.version>
<bigdl.version>0.9.0</bigdl.version>
<bigdl.version>0.9.1</bigdl.version>
<core.artifactId>zoo-core-dist-all</core.artifactId>
<core.dependencyType>pom</core.dependencyType>
<data-store-url>http://download.tensorflow.org</data-store-url>
Expand Down
58 changes: 31 additions & 27 deletions zoo/src/main/scala/com/intel/analytics/zoo/common/NNContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -209,35 +209,39 @@ object NNContext {
private[zoo] def initConf(zooConf: SparkConf) : Unit = {
// check env and set spark conf
// Set default value
var kmpAffinity = "granularity=fine,compact,1,0"
var kmpBlockTime = "0"
var kmpSettings = "1"
var ompNumThreads = "1"
// Set value with env
if (env.contains("KMP_AFFINITY")) {
kmpAffinity = env("KMP_AFFINITY")
}
if (env.contains("KMP_BLOCKTIME")) {
kmpBlockTime = env("KMP_BLOCKTIME")
}
if (env.contains("KMP_SETTINGS")) {
kmpSettings = env("KMP_SETTINGS")
}
if (env.contains("OMP_NUM_THREADS")) {
ompNumThreads = env("OMP_NUM_THREADS")
} else if (env.contains("ZOO_NUM_MKLTHREADS")) {
if (env("ZOO_NUM_MKLTHREADS").equalsIgnoreCase("all")) {
val cores = Runtime.getRuntime.availableProcessors()
ompNumThreads = cores.toString
} else {
ompNumThreads = env("ZOO_NUM_MKLTHREADS")
// We should skip this env, when engineType is mkldnn.
if (System.getProperty("bigdl.engineType", "mklblas")
.toLowerCase() == "mklblas") {
var kmpAffinity = "granularity=fine,compact,1,0"
var kmpBlockTime = "0"
var kmpSettings = "1"
var ompNumThreads = "1"
// Set value with env
if (env.contains("KMP_AFFINITY")) {
kmpAffinity = env("KMP_AFFINITY")
}
if (env.contains("KMP_BLOCKTIME")) {
kmpBlockTime = env("KMP_BLOCKTIME")
}
if (env.contains("KMP_SETTINGS")) {
kmpSettings = env("KMP_SETTINGS")
}
if (env.contains("OMP_NUM_THREADS")) {
ompNumThreads = env("OMP_NUM_THREADS")
} else if (env.contains("ZOO_NUM_MKLTHREADS")) {
if (env("ZOO_NUM_MKLTHREADS").equalsIgnoreCase("all")) {
val cores = Runtime.getRuntime.availableProcessors()
ompNumThreads = cores.toString
} else {
ompNumThreads = env("ZOO_NUM_MKLTHREADS")
}
}
// Set Spark Conf
zooConf.setExecutorEnv("KMP_AFFINITY", kmpAffinity)
zooConf.setExecutorEnv("KMP_BLOCKTIME", kmpBlockTime)
zooConf.setExecutorEnv("KMP_SETTINGS", kmpSettings)
zooConf.setExecutorEnv("OMP_NUM_THREADS", ompNumThreads)
}
// Set Spark Conf
zooConf.setExecutorEnv("KMP_AFFINITY", kmpAffinity)
zooConf.setExecutorEnv("KMP_BLOCKTIME", kmpBlockTime)
zooConf.setExecutorEnv("KMP_SETTINGS", kmpSettings)
zooConf.setExecutorEnv("OMP_NUM_THREADS", ompNumThreads)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.intel.analytics.bigdl.transform.vision.image.ImageFeature
import com.intel.analytics.bigdl.transform.vision.image.augmentation.RandomAlterAspect
import com.intel.analytics.bigdl.utils.T
import com.intel.analytics.zoo.feature.FeatureSet
import com.intel.analytics.zoo.feature.common.MTSampleToMiniBatch
import com.intel.analytics.zoo.feature.image._
import com.intel.analytics.zoo.feature.pmem.{DRAM, MemoryType}
import com.intel.analytics.zoo.pipeline.api.keras.layers.utils.EngineRef
Expand Down Expand Up @@ -163,9 +164,9 @@ object Utils {
ImageMatToTensor[Float](toRGB = false) ->
ImageSetToSample[Float](inputKeys = Array(ImageFeature.imageTensor),
targetKeys = Array(ImageFeature.label)) ->
ImageFeatureToSample[Float]() ->
SampleToMiniBatch[Float](batchSize)
featureSet.transform(transformer)
ImageFeatureToSample[Float]()
val toMiniBatch = MTSampleToMiniBatch[ImageFeature, Float](batchSize, transformer)
featureSet -> toMiniBatch
}

def loadImageNetValDataSet(path: String, sc: SparkContext, imageSize: Int, batchSize: Int,
Expand All @@ -183,9 +184,9 @@ object Utils {
ImageMatToTensor[Float](toRGB = false) ->
ImageSetToSample[Float](inputKeys = Array(ImageFeature.imageTensor),
targetKeys = Array(ImageFeature.label)) ->
ImageFeatureToSample[Float]() ->
SampleToMiniBatch[Float](batchSize)
featureSet.transform(transformer)
ImageFeatureToSample[Float]()
val toMiniBatch = MTSampleToMiniBatch[ImageFeature, Float](batchSize, transformer)
featureSet -> toMiniBatch
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright 2018 Analytics Zoo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.intel.analytics.zoo.feature.common

import com.intel.analytics.bigdl.dataset._
import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric
import com.intel.analytics.zoo.pipeline.api.keras.layers.utils.EngineRef

import scala.reflect.ClassTag

/**
* Convert a sequence of [[Sample]] to a sequence of [[MiniBatch]]
* through function toMiniBatch using multi thread.
*/
class MTSampleToMiniBatch[A: ClassTag, T: ClassTag] (
totalBatch: Int,
transformer: Transformer[A, Sample[T]],
miniBatch: Option[MiniBatch[T]] = None,
featurePaddingParam: Option[PaddingParam[T]] = None,
labelPaddingParam: Option[PaddingParam[T]] = None,
partitionNum: Option[Int] = None)
(implicit ev: TensorNumeric[T]) extends Transformer[A, MiniBatch[T]] {

private val batchPerPartition = Utils.getBatchSize(totalBatch, partitionNum)
var miniBatchBuffer = miniBatch.orNull
private val batchSize = batchPerPartition
private val sampleData = new Array[Sample[T]](batchSize)

private val parallelism = EngineRef.getCoreNumber()

private val transformers = (0 until parallelism).map(
_ => transformer.cloneTransformer()
).toArray

private val rawDataCache = new Array[Iterator[A]](batchSize)

override def apply(prev: Iterator[A]): Iterator[MiniBatch[T]] = {
new Iterator[MiniBatch[T]] {

override def hasNext: Boolean = prev.hasNext

override def next(): MiniBatch[T] = {
if (prev.hasNext) {
// prefetch
var count = 0
while (count < batchSize && prev.hasNext) {
val raw = prev.next()
rawDataCache(count) = Iterator.single(raw)
count += 1
}

// multi thread processing
(0 until parallelism).toParArray.foreach{tid =>
var j = tid
while (j < count) {
sampleData(j) = transformers(tid).apply(rawDataCache(j)).next()
j += parallelism
}
}

if (null == miniBatchBuffer) {
val firstSample = sampleData(0)
miniBatchBuffer = if (firstSample.isInstanceOf[TensorSample[T]]) {
SparseMiniBatch(firstSample.numFeature(), firstSample.numLabel())
} else {
MiniBatch(firstSample.numFeature(), firstSample.numLabel(),
featurePaddingParam, labelPaddingParam)
}
}

if (count < batchSize) {
miniBatchBuffer.set(sampleData.slice(0, count))
} else {
miniBatchBuffer.set(sampleData)
}
} else {
null
}
}
}
}
}

object MTSampleToMiniBatch {
/**
* Apply an MTSampleToMiniBatch transformer.
*
* @param batchSize total batch size
* @param transformer transformer who rawData to Sample
* @param featurePaddingParam feature padding strategy, see
* [[com.intel.analytics.bigdl.dataset.PaddingParam]] for details.
* @param labelPaddingParam label padding strategy, see
* [[com.intel.analytics.bigdl.dataset.PaddingParam]] for details.
* @return
*/
def apply[A: ClassTag, T: ClassTag](
batchSize : Int,
transformer: Transformer[A, Sample[T]],
featurePaddingParam: Option[PaddingParam[T]] = None,
labelPaddingParam: Option[PaddingParam[T]] = None,
partitionNum: Option[Int] = None
)(implicit ev: TensorNumeric[T]): MTSampleToMiniBatch[A, T] = {
new MTSampleToMiniBatch[A, T](batchSize,
transformer,
None, featurePaddingParam, labelPaddingParam, partitionNum)
}

/**
* Apply an MTSampleToMiniBatch transformer with UDF MiniBatch.
*
* @param batchSize total batch size
* @param miniBatch An User-Defined MiniBatch to construct a mini batch.
* @param transformer transformer who rawData to Sample
* @return
*/
def apply[A: ClassTag, T: ClassTag](
miniBatch: MiniBatch[T],
batchSize : Int,
transformer: Transformer[A, Sample[T]],
partitionNum: Option[Int])
(implicit ev: TensorNumeric[T]): MTSampleToMiniBatch[A, T] = {
new MTSampleToMiniBatch[A, T](batchSize,
transformer,
Some(miniBatch), partitionNum = partitionNum)
}
}

0 comments on commit 4d0d42e

Please sign in to comment.