Skip to content

Commit

Permalink
Support iterate a dataset in sequential order when training (intel-an…
Browse files Browse the repository at this point in the history
…alytics#1743)

* support iterate a dataset in sequential order when training

add unit test

fix style

* unpersist

* fix bug
  • Loading branch information
yangw1234 committed Nov 8, 2019
1 parent 3d3b8db commit bd63fa1
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ private[zoo] class DistributedDataSetWrapper[T: ClassTag](featureSet: Distribute
*/
// T is the returning value type. like ByteRecord
class CachedDistributedFeatureSet[T: ClassTag]
(buffer: RDD[ArrayLike[T]])
(buffer: RDD[ArrayLike[T]],
sequentialOrder: Boolean = false,
shouldShuffle: Boolean = true)
extends DistributedFeatureSet[T]{

protected lazy val count: Long = buffer.mapPartitions(iter => {
Expand All @@ -230,31 +232,52 @@ class CachedDistributedFeatureSet[T: ClassTag]
Iterator.single(array.length)
}).reduce(_ + _)

protected var indexes: RDD[Array[Int]] = buffer.mapPartitions(iter => {
Iterator.single[Array[Int]]((0 until iter.next().length).toArray[Int])
protected var indexes: RDD[(Array[Int], AtomicInteger)] = buffer.mapPartitions(iter => {
Iterator.single[(Array[Int], AtomicInteger)](((0 until iter.next().length).toArray[Int],
new AtomicInteger(0)))
}).setName(s"origin index of ${buffer.name}").cache()


protected var offsets: RDD[AtomicInteger] = buffer.mapPartitions(iter => {
Iterator.single[AtomicInteger](new AtomicInteger(0))
}).setName(s"offsets of ${buffer.name}")


override def data(train: Boolean): RDD[T] = {
val _train = train
val _seq = sequentialOrder
buffer.zipPartitions(indexes)((dataIter, indexIter) => {
val indexes = indexIter.next()
val indexOffset = math.max(1, indexes.length)
val (indexes, seqOffset) = indexIter.next()


val maxOffset = math.max(1, indexes.length)
val localData = dataIter.next()
val offset = if (_train) {
RandomGenerator.RNG.uniform(0, indexOffset).toInt
val offset = if (_train && !_seq) {
RandomGenerator.RNG.uniform(0, maxOffset).toInt
} else if (_train && _seq) {
seqOffset.get()
} else {
0
}
seqOffset.set(offset)

new Iterator[T] {
private val _offset = new AtomicInteger(offset)
private val _offset = seqOffset

override def hasNext: Boolean = {
if (_train) true else _offset.get() < localData.length
}

override def next(): T = {
val i = _offset.getAndIncrement()
if (_train && i >= localData.length) {
this.synchronized {
val value = _offset.get()
if (value >= localData.length) {
_offset.set(value % localData.length)
}
}
}
if (_train) {
// indexes is an Array, we should improve this
// as the maximum length is limited by Int.max
Expand All @@ -274,10 +297,13 @@ class CachedDistributedFeatureSet[T: ClassTag]
override def size(): Long = count

override def shuffle(): Unit = {
indexes.unpersist()
indexes = buffer.mapPartitions(iter => {
Iterator.single(RandomGenerator.shuffle((0 until iter.next().length).toArray))
}).setName(s"shuffled index of ${buffer.name}").cache()
if (shouldShuffle) {
indexes.unpersist()
indexes = buffer.mapPartitions(iter => {
Iterator.single((RandomGenerator.shuffle((0 until iter.next().length).toArray),
new AtomicInteger(0)))
}).setName(s"shuffled index of ${buffer.name}").cache()
}
}

override def originRDD(): RDD[_] = buffer
Expand Down Expand Up @@ -389,12 +415,14 @@ class DiskFeatureSet[T: ClassTag]
}

object DRAMFeatureSet {
def rdd[T: ClassTag](data: RDD[T]): DistributedFeatureSet[T] = {
def rdd[T: ClassTag](data: RDD[T],
sequentialOrder: Boolean = false,
shuffle: Boolean = true): DistributedFeatureSet[T] = {
val arrayLikeRDD = data.mapPartitions(iter => {
Iterator.single(new ArrayLikeWrapper(iter.toArray))
}).setName(s"cached feature set: ${data.name} in DRAM" )
.cache().asInstanceOf[RDD[ArrayLike[T]]]
new CachedDistributedFeatureSet[T](arrayLikeRDD)
new CachedDistributedFeatureSet[T](arrayLikeRDD, sequentialOrder, shuffle)
}
}

Expand All @@ -403,22 +431,36 @@ object FeatureSet {
def rdd[T: ClassTag](
data: RDD[T],
memoryType: MemoryType = DRAM,
dataStrategy: DataStrategy = PARTITIONED): DistributedFeatureSet[T] = {
dataStrategy: DataStrategy = PARTITIONED,
sequentialOrder: Boolean = false,
shuffle: Boolean = true): DistributedFeatureSet[T] = {
dataStrategy match {
case PARTITIONED =>
val nodeNumber = EngineRef.getNodeNumber()
val repartitionedData = data.coalesce(nodeNumber, true).setName(data.name)
val repartitionedData = if (sequentialOrder && data.partitions.length == nodeNumber) {
data
} else {
data.coalesce(nodeNumber, true).setName(data.name)
}
memoryType match {
case DRAM =>
DRAMFeatureSet.rdd(repartitionedData)
DRAMFeatureSet.rdd(repartitionedData, sequentialOrder, shuffle)
case PMEM =>
logger.info("~~~~~~~ Caching with AEP ~~~~~~~")
PmemFeatureSet.rdd(repartitionedData, PMEM)
PmemFeatureSet.rdd(repartitionedData, PMEM, sequentialOrder, shuffle)
case DIRECT =>
logger.info("~~~~~~~ Caching with DIRECT ~~~~~~~")
PmemFeatureSet.rdd[T](repartitionedData, DIRECT)
PmemFeatureSet.rdd[T](repartitionedData, DIRECT, sequentialOrder, shuffle)
case diskM: DISK_AND_DRAM =>
logger.info(s"~~~~~~~ Caching with DISK_AND_DRAM(${diskM.numSlice}) ~~~~~~~")
if (sequentialOrder) {
throw new IllegalArgumentException("DiskFeatureSet does not support" +
" sequentialOrder.")
}

if (!shuffle) {
throw new IllegalArgumentException("DiskFeatureSet must use shuffle.")
}
new DiskFeatureSet[T](data, diskM.numSlice)
case _ =>
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ private[zoo] case class ImageFeatureArray(
object PmemFeatureSet {

private def rdd[T: ClassTag](data: RDD[T],
nativeArrayConverter: NativeArrayConverter[T]):
nativeArrayConverter: NativeArrayConverter[T],
sequentialOrder: Boolean,
shuffle: Boolean):
DistributedFeatureSet[T] = {
val countPerPartition = data.mapPartitions { iter =>
require(iter.hasNext)
Expand All @@ -192,22 +194,28 @@ object PmemFeatureSet {
nativeArrayConverter.toArray(dataIter, countIter)
}.setName(s"FeatureSet: ${data.name} cached in PMEM")
.cache()
new CachedDistributedFeatureSet[T](arrayRDD.asInstanceOf[RDD[ArrayLike[T]]])
new CachedDistributedFeatureSet[T](arrayRDD.asInstanceOf[RDD[ArrayLike[T]]],
sequentialOrder, shuffle)
}

def rdd[T: ClassTag](data: RDD[T],
memoryType: MemoryType = PMEM): DistributedFeatureSet[T] = {
memoryType: MemoryType = PMEM,
sequentialOrder: Boolean = false,
shuffle: Boolean = true): DistributedFeatureSet[T] = {
var clazz: ClassTag[T] = implicitly[ClassTag[T]]
implicitly[ClassTag[T]].runtimeClass match {
case t if t == classOf[ByteRecord] =>
rdd[ByteRecord](data.asInstanceOf[RDD[ByteRecord]],
new ByteRecordConverter(memoryType)).asInstanceOf[DistributedFeatureSet[T]]
new ByteRecordConverter(memoryType),
sequentialOrder, shuffle).asInstanceOf[DistributedFeatureSet[T]]
case t if t == classOf[Sample[Float]] =>
rdd[Sample[Float]](data.asInstanceOf[RDD[Sample[Float]]],
new SampleConverter(memoryType)).asInstanceOf[DistributedFeatureSet[T]]
new SampleConverter(memoryType),
sequentialOrder, shuffle).asInstanceOf[DistributedFeatureSet[T]]
case t if t == classOf[ImageFeature] =>
rdd[ImageFeature](data.asInstanceOf[RDD[ImageFeature]],
new ImageFeatureConverter(memoryType)).asInstanceOf[DistributedFeatureSet[T]]
new ImageFeatureConverter(memoryType),
sequentialOrder, shuffle).asInstanceOf[DistributedFeatureSet[T]]
case _ =>
throw new IllegalArgumentException(
s"${implicitly[ClassTag[T]].runtimeClass} is not supported for now")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,31 @@ object PythonFeatureSet {
class PythonFeatureSet[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZoo[T] {
def createFeatureSetFromImageFrame(
imageFrame: ImageFrame,
memoryType: String): FeatureSet[ImageFeature] = {
memoryType: String,
sequentialOrder: Boolean, shuffle: Boolean): FeatureSet[ImageFeature] = {
require(imageFrame.isDistributed(), "Only support distributed ImageFrame")
FeatureSet.rdd(imageFrame.toDistributed().rdd, MemoryType.fromString(memoryType))
FeatureSet.rdd(imageFrame.toDistributed().rdd, MemoryType.fromString(memoryType),
sequentialOrder = sequentialOrder, shuffle = shuffle)
}

def createFeatureSetFromRDD(
data: JavaRDD[Any],
memoryType: String): FeatureSet[Any] = {
FeatureSet.rdd(data, MemoryType.fromString(memoryType))
memoryType: String,
sequentialOrder: Boolean,
shuffle: Boolean): FeatureSet[Any] = {
FeatureSet.rdd(data, MemoryType.fromString(memoryType),
sequentialOrder = sequentialOrder, shuffle = shuffle)
}

def createSampleFeatureSetFromRDD(data: JavaRDD[Sample], memoryType: String)
def createSampleFeatureSetFromRDD(data: JavaRDD[Sample],
memoryType: String,
sequentialOrder: Boolean,
shuffle: Boolean)
: FeatureSet[JSample[T]] = {
FeatureSet.rdd(toJSample(data), MemoryType.fromString(memoryType))
FeatureSet.rdd(toJSample(data),
MemoryType.fromString(memoryType),
sequentialOrder = sequentialOrder,
shuffle = shuffle)
}

def transformFeatureSet(featureSet: FeatureSet[Any],
Expand Down

0 comments on commit bd63fa1

Please sign in to comment.