diff --git a/zoo/pom.xml b/zoo/pom.xml
index 85174476b08..3e3a1db219d 100644
--- a/zoo/pom.xml
+++ b/zoo/pom.xml
@@ -24,7 +24,7 @@
2.1.0
2.2.4
2.1.0
- 0.9.0
+ 0.9.1
zoo-core-dist-all
pom
http://download.tensorflow.org
diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/common/NNContext.scala b/zoo/src/main/scala/com/intel/analytics/zoo/common/NNContext.scala
index 9d7f8d9c49e..7c0f41bef55 100644
--- a/zoo/src/main/scala/com/intel/analytics/zoo/common/NNContext.scala
+++ b/zoo/src/main/scala/com/intel/analytics/zoo/common/NNContext.scala
@@ -209,35 +209,39 @@ object NNContext {
private[zoo] def initConf(zooConf: SparkConf) : Unit = {
// check env and set spark conf
// Set default value
- var kmpAffinity = "granularity=fine,compact,1,0"
- var kmpBlockTime = "0"
- var kmpSettings = "1"
- var ompNumThreads = "1"
- // Set value with env
- if (env.contains("KMP_AFFINITY")) {
- kmpAffinity = env("KMP_AFFINITY")
- }
- if (env.contains("KMP_BLOCKTIME")) {
- kmpBlockTime = env("KMP_BLOCKTIME")
- }
- if (env.contains("KMP_SETTINGS")) {
- kmpSettings = env("KMP_SETTINGS")
- }
- if (env.contains("OMP_NUM_THREADS")) {
- ompNumThreads = env("OMP_NUM_THREADS")
- } else if (env.contains("ZOO_NUM_MKLTHREADS")) {
- if (env("ZOO_NUM_MKLTHREADS").equalsIgnoreCase("all")) {
- val cores = Runtime.getRuntime.availableProcessors()
- ompNumThreads = cores.toString
- } else {
- ompNumThreads = env("ZOO_NUM_MKLTHREADS")
+ // We should skip this env, when engineType is mkldnn.
+ if (System.getProperty("bigdl.engineType", "mklblas")
+ .toLowerCase() == "mklblas") {
+ var kmpAffinity = "granularity=fine,compact,1,0"
+ var kmpBlockTime = "0"
+ var kmpSettings = "1"
+ var ompNumThreads = "1"
+ // Set value with env
+ if (env.contains("KMP_AFFINITY")) {
+ kmpAffinity = env("KMP_AFFINITY")
+ }
+ if (env.contains("KMP_BLOCKTIME")) {
+ kmpBlockTime = env("KMP_BLOCKTIME")
+ }
+ if (env.contains("KMP_SETTINGS")) {
+ kmpSettings = env("KMP_SETTINGS")
+ }
+ if (env.contains("OMP_NUM_THREADS")) {
+ ompNumThreads = env("OMP_NUM_THREADS")
+ } else if (env.contains("ZOO_NUM_MKLTHREADS")) {
+ if (env("ZOO_NUM_MKLTHREADS").equalsIgnoreCase("all")) {
+ val cores = Runtime.getRuntime.availableProcessors()
+ ompNumThreads = cores.toString
+ } else {
+ ompNumThreads = env("ZOO_NUM_MKLTHREADS")
+ }
}
+ // Set Spark Conf
+ zooConf.setExecutorEnv("KMP_AFFINITY", kmpAffinity)
+ zooConf.setExecutorEnv("KMP_BLOCKTIME", kmpBlockTime)
+ zooConf.setExecutorEnv("KMP_SETTINGS", kmpSettings)
+ zooConf.setExecutorEnv("OMP_NUM_THREADS", ompNumThreads)
}
- // Set Spark Conf
- zooConf.setExecutorEnv("KMP_AFFINITY", kmpAffinity)
- zooConf.setExecutorEnv("KMP_BLOCKTIME", kmpBlockTime)
- zooConf.setExecutorEnv("KMP_SETTINGS", kmpSettings)
- zooConf.setExecutorEnv("OMP_NUM_THREADS", ompNumThreads)
}
diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/examples/resnet/Utils.scala b/zoo/src/main/scala/com/intel/analytics/zoo/examples/resnet/Utils.scala
index c8de61f0910..b95a96d3e23 100644
--- a/zoo/src/main/scala/com/intel/analytics/zoo/examples/resnet/Utils.scala
+++ b/zoo/src/main/scala/com/intel/analytics/zoo/examples/resnet/Utils.scala
@@ -25,6 +25,7 @@ import com.intel.analytics.bigdl.transform.vision.image.ImageFeature
import com.intel.analytics.bigdl.transform.vision.image.augmentation.RandomAlterAspect
import com.intel.analytics.bigdl.utils.T
import com.intel.analytics.zoo.feature.FeatureSet
+import com.intel.analytics.zoo.feature.common.MTSampleToMiniBatch
import com.intel.analytics.zoo.feature.image._
import com.intel.analytics.zoo.feature.pmem.{DRAM, MemoryType}
import com.intel.analytics.zoo.pipeline.api.keras.layers.utils.EngineRef
@@ -163,9 +164,9 @@ object Utils {
ImageMatToTensor[Float](toRGB = false) ->
ImageSetToSample[Float](inputKeys = Array(ImageFeature.imageTensor),
targetKeys = Array(ImageFeature.label)) ->
- ImageFeatureToSample[Float]() ->
- SampleToMiniBatch[Float](batchSize)
- featureSet.transform(transformer)
+ ImageFeatureToSample[Float]()
+ val toMiniBatch = MTSampleToMiniBatch[ImageFeature, Float](batchSize, transformer)
+ featureSet -> toMiniBatch
}
def loadImageNetValDataSet(path: String, sc: SparkContext, imageSize: Int, batchSize: Int,
@@ -183,9 +184,9 @@ object Utils {
ImageMatToTensor[Float](toRGB = false) ->
ImageSetToSample[Float](inputKeys = Array(ImageFeature.imageTensor),
targetKeys = Array(ImageFeature.label)) ->
- ImageFeatureToSample[Float]() ->
- SampleToMiniBatch[Float](batchSize)
- featureSet.transform(transformer)
+ ImageFeatureToSample[Float]()
+ val toMiniBatch = MTSampleToMiniBatch[ImageFeature, Float](batchSize, transformer)
+ featureSet -> toMiniBatch
}
}
diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/feature/common/MTSampleToMiniBatch.scala b/zoo/src/main/scala/com/intel/analytics/zoo/feature/common/MTSampleToMiniBatch.scala
new file mode 100644
index 00000000000..0d5b1e386a4
--- /dev/null
+++ b/zoo/src/main/scala/com/intel/analytics/zoo/feature/common/MTSampleToMiniBatch.scala
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2018 Analytics Zoo Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.intel.analytics.zoo.feature.common
+
+import com.intel.analytics.bigdl.dataset._
+import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric
+import com.intel.analytics.zoo.pipeline.api.keras.layers.utils.EngineRef
+
+import scala.reflect.ClassTag
+
+/**
+ * Convert a sequence of [[Sample]] to a sequence of [[MiniBatch]]
+ * through function toMiniBatch using multi thread.
+ */
+class MTSampleToMiniBatch[A: ClassTag, T: ClassTag] (
+ totalBatch: Int,
+ transformer: Transformer[A, Sample[T]],
+ miniBatch: Option[MiniBatch[T]] = None,
+ featurePaddingParam: Option[PaddingParam[T]] = None,
+ labelPaddingParam: Option[PaddingParam[T]] = None,
+ partitionNum: Option[Int] = None)
+ (implicit ev: TensorNumeric[T]) extends Transformer[A, MiniBatch[T]] {
+
+ private val batchPerPartition = Utils.getBatchSize(totalBatch, partitionNum)
+ var miniBatchBuffer = miniBatch.orNull
+ private val batchSize = batchPerPartition
+ private val sampleData = new Array[Sample[T]](batchSize)
+
+ private val parallelism = EngineRef.getCoreNumber()
+
+ private val transformers = (0 until parallelism).map(
+ _ => transformer.cloneTransformer()
+ ).toArray
+
+ private val rawDataCache = new Array[Iterator[A]](batchSize)
+
+ override def apply(prev: Iterator[A]): Iterator[MiniBatch[T]] = {
+ new Iterator[MiniBatch[T]] {
+
+ override def hasNext: Boolean = prev.hasNext
+
+ override def next(): MiniBatch[T] = {
+ if (prev.hasNext) {
+ // prefetch
+ var count = 0
+ while (count < batchSize && prev.hasNext) {
+ val raw = prev.next()
+ rawDataCache(count) = Iterator.single(raw)
+ count += 1
+ }
+
+ // multi thread processing
+ (0 until parallelism).toParArray.foreach{tid =>
+ var j = tid
+ while (j < count) {
+ sampleData(j) = transformers(tid).apply(rawDataCache(j)).next()
+ j += parallelism
+ }
+ }
+
+ if (null == miniBatchBuffer) {
+ val firstSample = sampleData(0)
+ miniBatchBuffer = if (firstSample.isInstanceOf[TensorSample[T]]) {
+ SparseMiniBatch(firstSample.numFeature(), firstSample.numLabel())
+ } else {
+ MiniBatch(firstSample.numFeature(), firstSample.numLabel(),
+ featurePaddingParam, labelPaddingParam)
+ }
+ }
+
+ if (count < batchSize) {
+ miniBatchBuffer.set(sampleData.slice(0, count))
+ } else {
+ miniBatchBuffer.set(sampleData)
+ }
+ } else {
+ null
+ }
+ }
+ }
+ }
+}
+
+object MTSampleToMiniBatch {
+ /**
+ * Apply an MTSampleToMiniBatch transformer.
+ *
+ * @param batchSize total batch size
+ * @param transformer transformer who rawData to Sample
+ * @param featurePaddingParam feature padding strategy, see
+ * [[com.intel.analytics.bigdl.dataset.PaddingParam]] for details.
+ * @param labelPaddingParam label padding strategy, see
+ * [[com.intel.analytics.bigdl.dataset.PaddingParam]] for details.
+ * @return
+ */
+ def apply[A: ClassTag, T: ClassTag](
+ batchSize : Int,
+ transformer: Transformer[A, Sample[T]],
+ featurePaddingParam: Option[PaddingParam[T]] = None,
+ labelPaddingParam: Option[PaddingParam[T]] = None,
+ partitionNum: Option[Int] = None
+ )(implicit ev: TensorNumeric[T]): MTSampleToMiniBatch[A, T] = {
+ new MTSampleToMiniBatch[A, T](batchSize,
+ transformer,
+ None, featurePaddingParam, labelPaddingParam, partitionNum)
+ }
+
+ /**
+ * Apply an MTSampleToMiniBatch transformer with UDF MiniBatch.
+ *
+ * @param batchSize total batch size
+ * @param miniBatch An User-Defined MiniBatch to construct a mini batch.
+ * @param transformer transformer who rawData to Sample
+ * @return
+ */
+ def apply[A: ClassTag, T: ClassTag](
+ miniBatch: MiniBatch[T],
+ batchSize : Int,
+ transformer: Transformer[A, Sample[T]],
+ partitionNum: Option[Int])
+ (implicit ev: TensorNumeric[T]): MTSampleToMiniBatch[A, T] = {
+ new MTSampleToMiniBatch[A, T](batchSize,
+ transformer,
+ Some(miniBatch), partitionNum = partitionNum)
+ }
+}