From 2bfcb2c6d220942ca1231e1b9379c84d93630505 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 8 Nov 2019 13:23:27 +0800 Subject: [PATCH] Support iterate a dataset in sequential order when training (#1743) * support iterate a dataset in sequential order when training add unit test fix style * unpersist * fix bug --- FeatureSet.scala | 80 ++++++++++++++----- .../bigdl/dllib/feature/pmem/FeatureSet.scala | 20 +++-- .../feature/python/PythonFeatureSet.scala | 23 ++++-- 3 files changed, 92 insertions(+), 31 deletions(-) diff --git a/FeatureSet.scala b/FeatureSet.scala index 1af9e13c2f1..fffe7ca67e7 100644 --- a/FeatureSet.scala +++ b/FeatureSet.scala @@ -220,7 +220,9 @@ private[zoo] class DistributedDataSetWrapper[T: ClassTag](featureSet: Distribute */ // T is the returning value type. like ByteRecord class CachedDistributedFeatureSet[T: ClassTag] -(buffer: RDD[ArrayLike[T]]) +(buffer: RDD[ArrayLike[T]], + sequentialOrder: Boolean = false, + shouldShuffle: Boolean = true) extends DistributedFeatureSet[T]{ protected lazy val count: Long = buffer.mapPartitions(iter => { @@ -230,24 +232,37 @@ class CachedDistributedFeatureSet[T: ClassTag] Iterator.single(array.length) }).reduce(_ + _) - protected var indexes: RDD[Array[Int]] = buffer.mapPartitions(iter => { - Iterator.single[Array[Int]]((0 until iter.next().length).toArray[Int]) + protected var indexes: RDD[(Array[Int], AtomicInteger)] = buffer.mapPartitions(iter => { + Iterator.single[(Array[Int], AtomicInteger)](((0 until iter.next().length).toArray[Int], + new AtomicInteger(0))) }).setName(s"origin index of ${buffer.name}").cache() + protected var offsets: RDD[AtomicInteger] = buffer.mapPartitions(iter => { + Iterator.single[AtomicInteger](new AtomicInteger(0)) + }).setName(s"offsets of ${buffer.name}") + + override def data(train: Boolean): RDD[T] = { val _train = train + val _seq = sequentialOrder buffer.zipPartitions(indexes)((dataIter, indexIter) => { - val indexes = indexIter.next() - val indexOffset = math.max(1, indexes.length) + val (indexes, seqOffset) = indexIter.next() + + + val maxOffset = math.max(1, indexes.length) val localData = dataIter.next() - val offset = if (_train) { - RandomGenerator.RNG.uniform(0, indexOffset).toInt + val offset = if (_train && !_seq) { + RandomGenerator.RNG.uniform(0, maxOffset).toInt + } else if (_train && _seq) { + seqOffset.get() } else { 0 } + seqOffset.set(offset) + new Iterator[T] { - private val _offset = new AtomicInteger(offset) + private val _offset = seqOffset override def hasNext: Boolean = { if (_train) true else _offset.get() < localData.length @@ -255,6 +270,14 @@ class CachedDistributedFeatureSet[T: ClassTag] override def next(): T = { val i = _offset.getAndIncrement() + if (_train && i >= localData.length) { + this.synchronized { + val value = _offset.get() + if (value >= localData.length) { + _offset.set(value % localData.length) + } + } + } if (_train) { // indexes is an Array, we should improve this // as the maximum length is limited by Int.max @@ -274,10 +297,13 @@ class CachedDistributedFeatureSet[T: ClassTag] override def size(): Long = count override def shuffle(): Unit = { - indexes.unpersist() - indexes = buffer.mapPartitions(iter => { - Iterator.single(RandomGenerator.shuffle((0 until iter.next().length).toArray)) - }).setName(s"shuffled index of ${buffer.name}").cache() + if (shouldShuffle) { + indexes.unpersist() + indexes = buffer.mapPartitions(iter => { + Iterator.single((RandomGenerator.shuffle((0 until iter.next().length).toArray), + new AtomicInteger(0))) + }).setName(s"shuffled index of ${buffer.name}").cache() + } } override def originRDD(): RDD[_] = buffer @@ -389,12 +415,14 @@ class DiskFeatureSet[T: ClassTag] } object DRAMFeatureSet { - def rdd[T: ClassTag](data: RDD[T]): DistributedFeatureSet[T] = { + def rdd[T: ClassTag](data: RDD[T], + sequentialOrder: Boolean = false, + shuffle: Boolean = true): DistributedFeatureSet[T] = { val arrayLikeRDD = data.mapPartitions(iter => { Iterator.single(new ArrayLikeWrapper(iter.toArray)) }).setName(s"cached feature set: ${data.name} in DRAM" ) .cache().asInstanceOf[RDD[ArrayLike[T]]] - new CachedDistributedFeatureSet[T](arrayLikeRDD) + new CachedDistributedFeatureSet[T](arrayLikeRDD, sequentialOrder, shuffle) } } @@ -403,22 +431,36 @@ object FeatureSet { def rdd[T: ClassTag]( data: RDD[T], memoryType: MemoryType = DRAM, - dataStrategy: DataStrategy = PARTITIONED): DistributedFeatureSet[T] = { + dataStrategy: DataStrategy = PARTITIONED, + sequentialOrder: Boolean = false, + shuffle: Boolean = true): DistributedFeatureSet[T] = { dataStrategy match { case PARTITIONED => val nodeNumber = EngineRef.getNodeNumber() - val repartitionedData = data.coalesce(nodeNumber, true).setName(data.name) + val repartitionedData = if (sequentialOrder && data.partitions.length == nodeNumber) { + data + } else { + data.coalesce(nodeNumber, true).setName(data.name) + } memoryType match { case DRAM => - DRAMFeatureSet.rdd(repartitionedData) + DRAMFeatureSet.rdd(repartitionedData, sequentialOrder, shuffle) case PMEM => logger.info("~~~~~~~ Caching with AEP ~~~~~~~") - PmemFeatureSet.rdd(repartitionedData, PMEM) + PmemFeatureSet.rdd(repartitionedData, PMEM, sequentialOrder, shuffle) case DIRECT => logger.info("~~~~~~~ Caching with DIRECT ~~~~~~~") - PmemFeatureSet.rdd[T](repartitionedData, DIRECT) + PmemFeatureSet.rdd[T](repartitionedData, DIRECT, sequentialOrder, shuffle) case diskM: DISK_AND_DRAM => logger.info(s"~~~~~~~ Caching with DISK_AND_DRAM(${diskM.numSlice}) ~~~~~~~") + if (sequentialOrder) { + throw new IllegalArgumentException("DiskFeatureSet does not support" + + " sequentialOrder.") + } + + if (!shuffle) { + throw new IllegalArgumentException("DiskFeatureSet must use shuffle.") + } new DiskFeatureSet[T](data, diskM.numSlice) case _ => throw new IllegalArgumentException( diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/feature/pmem/FeatureSet.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/feature/pmem/FeatureSet.scala index 1dac284d05c..7493e9ea1f5 100644 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/feature/pmem/FeatureSet.scala +++ b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/feature/pmem/FeatureSet.scala @@ -171,7 +171,9 @@ private[zoo] case class ImageFeatureArray( object PmemFeatureSet { private def rdd[T: ClassTag](data: RDD[T], - nativeArrayConverter: NativeArrayConverter[T]): + nativeArrayConverter: NativeArrayConverter[T], + sequentialOrder: Boolean, + shuffle: Boolean): DistributedFeatureSet[T] = { val countPerPartition = data.mapPartitions { iter => require(iter.hasNext) @@ -192,22 +194,28 @@ object PmemFeatureSet { nativeArrayConverter.toArray(dataIter, countIter) }.setName(s"FeatureSet: ${data.name} cached in PMEM") .cache() - new CachedDistributedFeatureSet[T](arrayRDD.asInstanceOf[RDD[ArrayLike[T]]]) + new CachedDistributedFeatureSet[T](arrayRDD.asInstanceOf[RDD[ArrayLike[T]]], + sequentialOrder, shuffle) } def rdd[T: ClassTag](data: RDD[T], - memoryType: MemoryType = PMEM): DistributedFeatureSet[T] = { + memoryType: MemoryType = PMEM, + sequentialOrder: Boolean = false, + shuffle: Boolean = true): DistributedFeatureSet[T] = { var clazz: ClassTag[T] = implicitly[ClassTag[T]] implicitly[ClassTag[T]].runtimeClass match { case t if t == classOf[ByteRecord] => rdd[ByteRecord](data.asInstanceOf[RDD[ByteRecord]], - new ByteRecordConverter(memoryType)).asInstanceOf[DistributedFeatureSet[T]] + new ByteRecordConverter(memoryType), + sequentialOrder, shuffle).asInstanceOf[DistributedFeatureSet[T]] case t if t == classOf[Sample[Float]] => rdd[Sample[Float]](data.asInstanceOf[RDD[Sample[Float]]], - new SampleConverter(memoryType)).asInstanceOf[DistributedFeatureSet[T]] + new SampleConverter(memoryType), + sequentialOrder, shuffle).asInstanceOf[DistributedFeatureSet[T]] case t if t == classOf[ImageFeature] => rdd[ImageFeature](data.asInstanceOf[RDD[ImageFeature]], - new ImageFeatureConverter(memoryType)).asInstanceOf[DistributedFeatureSet[T]] + new ImageFeatureConverter(memoryType), + sequentialOrder, shuffle).asInstanceOf[DistributedFeatureSet[T]] case _ => throw new IllegalArgumentException( s"${implicitly[ClassTag[T]].runtimeClass} is not supported for now") diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/feature/python/PythonFeatureSet.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/feature/python/PythonFeatureSet.scala index e4e0cf9b6c8..9d13e19eb28 100644 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/feature/python/PythonFeatureSet.scala +++ b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/feature/python/PythonFeatureSet.scala @@ -38,20 +38,31 @@ object PythonFeatureSet { class PythonFeatureSet[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZoo[T] { def createFeatureSetFromImageFrame( imageFrame: ImageFrame, - memoryType: String): FeatureSet[ImageFeature] = { + memoryType: String, + sequentialOrder: Boolean, shuffle: Boolean): FeatureSet[ImageFeature] = { require(imageFrame.isDistributed(), "Only support distributed ImageFrame") - FeatureSet.rdd(imageFrame.toDistributed().rdd, MemoryType.fromString(memoryType)) + FeatureSet.rdd(imageFrame.toDistributed().rdd, MemoryType.fromString(memoryType), + sequentialOrder = sequentialOrder, shuffle = shuffle) } def createFeatureSetFromRDD( data: JavaRDD[Any], - memoryType: String): FeatureSet[Any] = { - FeatureSet.rdd(data, MemoryType.fromString(memoryType)) + memoryType: String, + sequentialOrder: Boolean, + shuffle: Boolean): FeatureSet[Any] = { + FeatureSet.rdd(data, MemoryType.fromString(memoryType), + sequentialOrder = sequentialOrder, shuffle = shuffle) } - def createSampleFeatureSetFromRDD(data: JavaRDD[Sample], memoryType: String) + def createSampleFeatureSetFromRDD(data: JavaRDD[Sample], + memoryType: String, + sequentialOrder: Boolean, + shuffle: Boolean) : FeatureSet[JSample[T]] = { - FeatureSet.rdd(toJSample(data), MemoryType.fromString(memoryType)) + FeatureSet.rdd(toJSample(data), + MemoryType.fromString(memoryType), + sequentialOrder = sequentialOrder, + shuffle = shuffle) } def transformFeatureSet(featureSet: FeatureSet[Any],