Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support iterate a dataset in sequential order when training #1743

Merged
merged 3 commits into from
Nov 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 34 additions & 8 deletions pyzoo/zoo/feature/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ def __init__(self, jvalue=None, bigdl_type="float"):
self.value = jvalue

@classmethod
def image_frame(cls, image_frame, memory_type="DRAM", bigdl_type="float"):
def image_frame(cls, image_frame, memory_type="DRAM",
sequential_order=False,
shuffle=True, bigdl_type="float"):
"""
Create FeatureSet from ImageFrame.
:param image_frame: ImageFrame
Expand All @@ -235,15 +237,21 @@ def image_frame(cls, image_frame, memory_type="DRAM", bigdl_type="float"):
of the data into memory during the training. After going through the
1/n, we will release the current cache, and load another 1/n into
memory.
:param sequential_order: whether to iterate the elements in the feature set
in sequential order for training.
:param shuffle: whether to shuffle the elements in each partition before each epoch
when training
:param bigdl_type: numeric type
:return: A feature set
"""
jvalue = callBigDlFunc(bigdl_type, "createFeatureSetFromImageFrame",
image_frame, memory_type)
image_frame, memory_type, sequential_order, shuffle)
return cls(jvalue=jvalue)

@classmethod
def image_set(cls, imageset, memory_type="DRAM", bigdl_type="float"):
def image_set(cls, imageset, memory_type="DRAM",
sequential_order=False,
shuffle=True, bigdl_type="float"):
"""
Create FeatureSet from ImageFrame.
:param imageset: ImageSet
Expand All @@ -254,15 +262,22 @@ def image_set(cls, imageset, memory_type="DRAM", bigdl_type="float"):
of the data into memory during the training. After going through the
1/n, we will release the current cache, and load another 1/n into
memory.
:param sequential_order: whether to iterate the elements in the feature set
in sequential order for training.
:param shuffle: whether to shuffle the elements in each partition before each epoch
when training
:param bigdl_type: numeric type
:return: A feature set
"""
jvalue = callBigDlFunc(bigdl_type, "createFeatureSetFromImageFrame",
imageset.to_image_frame(), memory_type)
imageset.to_image_frame(), memory_type,
sequential_order, shuffle)
return cls(jvalue=jvalue)

@classmethod
def sample_rdd(cls, rdd, memory_type="DRAM", bigdl_type="float"):
def sample_rdd(cls, rdd, memory_type="DRAM",
sequential_order=False,
shuffle=True, bigdl_type="float"):
"""
Create FeatureSet from RDD[Sample].
:param rdd: A RDD[Sample]
Expand All @@ -273,14 +288,20 @@ def sample_rdd(cls, rdd, memory_type="DRAM", bigdl_type="float"):
of the data into memory during the training. After going through the
1/n, we will release the current cache, and load another 1/n into
memory.
:param sequential_order: whether to iterate the elements in the feature set
in sequential order when training.
:param shuffle: whether to shuffle the elements in each partition before each epoch
when training
:param bigdl_type:numeric type
:return: A feature set
"""
jvalue = callBigDlFunc(bigdl_type, "createSampleFeatureSetFromRDD", rdd, memory_type)
jvalue = callBigDlFunc(bigdl_type, "createSampleFeatureSetFromRDD", rdd,
memory_type, sequential_order, shuffle)
return cls(jvalue=jvalue)

@classmethod
def rdd(cls, rdd, memory_type="DRAM", bigdl_type="float"):
def rdd(cls, rdd, memory_type="DRAM", sequential_order=False,
shuffle=True, bigdl_type="float"):
"""
Create FeatureSet from RDD.
:param rdd: A RDD
Expand All @@ -291,10 +312,15 @@ def rdd(cls, rdd, memory_type="DRAM", bigdl_type="float"):
of the data into memory during the training. After going through the
1/n, we will release the current cache, and load another 1/n into
memory.
:param sequential_order: whether to iterate the elements in the feature set
in sequential order when training.
:param shuffle: whether to shuffle the elements in each partition before each epoch
when training
:param bigdl_type:numeric type
:return: A feature set
"""
jvalue = callBigDlFunc(bigdl_type, "createFeatureSetFromRDD", rdd, memory_type)
jvalue = callBigDlFunc(bigdl_type, "createFeatureSetFromRDD", rdd,
memory_type, sequential_order, shuffle)
return cls(jvalue=jvalue)

def transform(self, transformer):
Expand Down
80 changes: 61 additions & 19 deletions zoo/src/main/scala/com/intel/analytics/zoo/feature/FeatureSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ private[zoo] class DistributedDataSetWrapper[T: ClassTag](featureSet: Distribute
*/
// T is the returning value type. like ByteRecord
class CachedDistributedFeatureSet[T: ClassTag]
(buffer: RDD[ArrayLike[T]])
(buffer: RDD[ArrayLike[T]],
sequentialOrder: Boolean = false,
shouldShuffle: Boolean = true)
extends DistributedFeatureSet[T]{

protected lazy val count: Long = buffer.mapPartitions(iter => {
Expand All @@ -230,31 +232,52 @@ class CachedDistributedFeatureSet[T: ClassTag]
Iterator.single(array.length)
}).reduce(_ + _)

protected var indexes: RDD[Array[Int]] = buffer.mapPartitions(iter => {
Iterator.single[Array[Int]]((0 until iter.next().length).toArray[Int])
protected var indexes: RDD[(Array[Int], AtomicInteger)] = buffer.mapPartitions(iter => {
Iterator.single[(Array[Int], AtomicInteger)](((0 until iter.next().length).toArray[Int],
new AtomicInteger(0)))
}).setName(s"origin index of ${buffer.name}").cache()


protected var offsets: RDD[AtomicInteger] = buffer.mapPartitions(iter => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like useless?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! I'll delete it.

Iterator.single[AtomicInteger](new AtomicInteger(0))
}).setName(s"offsets of ${buffer.name}")


override def data(train: Boolean): RDD[T] = {
val _train = train
val _seq = sequentialOrder
buffer.zipPartitions(indexes)((dataIter, indexIter) => {
val indexes = indexIter.next()
val indexOffset = math.max(1, indexes.length)
val (indexes, seqOffset) = indexIter.next()


val maxOffset = math.max(1, indexes.length)
val localData = dataIter.next()
val offset = if (_train) {
RandomGenerator.RNG.uniform(0, indexOffset).toInt
val offset = if (_train && !_seq) {
RandomGenerator.RNG.uniform(0, maxOffset).toInt
} else if (_train && _seq) {
seqOffset.get()
} else {
0
}
seqOffset.set(offset)

new Iterator[T] {
private val _offset = new AtomicInteger(offset)
private val _offset = seqOffset

override def hasNext: Boolean = {
if (_train) true else _offset.get() < localData.length
}

override def next(): T = {
val i = _offset.getAndIncrement()
if (_train && i >= localData.length) {
this.synchronized {
val value = _offset.get()
if (value >= localData.length) {
_offset.set(value % localData.length)
}
}
}
if (_train) {
// indexes is an Array, we should improve this
// as the maximum length is limited by Int.max
Expand All @@ -274,10 +297,13 @@ class CachedDistributedFeatureSet[T: ClassTag]
override def size(): Long = count

override def shuffle(): Unit = {
indexes.unpersist()
indexes = buffer.mapPartitions(iter => {
Iterator.single(RandomGenerator.shuffle((0 until iter.next().length).toArray))
}).setName(s"shuffled index of ${buffer.name}").cache()
if (shouldShuffle) {
indexes.unpersist()
indexes = buffer.mapPartitions(iter => {
Iterator.single((RandomGenerator.shuffle((0 until iter.next().length).toArray),
new AtomicInteger(0)))
}).setName(s"shuffled index of ${buffer.name}").cache()
}
}

override def originRDD(): RDD[_] = buffer
Expand Down Expand Up @@ -389,12 +415,14 @@ class DiskFeatureSet[T: ClassTag]
}

object DRAMFeatureSet {
def rdd[T: ClassTag](data: RDD[T]): DistributedFeatureSet[T] = {
def rdd[T: ClassTag](data: RDD[T],
sequentialOrder: Boolean = false,
shuffle: Boolean = true): DistributedFeatureSet[T] = {
val arrayLikeRDD = data.mapPartitions(iter => {
Iterator.single(new ArrayLikeWrapper(iter.toArray))
}).setName(s"cached feature set: ${data.name} in DRAM" )
.cache().asInstanceOf[RDD[ArrayLike[T]]]
new CachedDistributedFeatureSet[T](arrayLikeRDD)
new CachedDistributedFeatureSet[T](arrayLikeRDD, sequentialOrder, shuffle)
}
}

Expand All @@ -403,22 +431,36 @@ object FeatureSet {
def rdd[T: ClassTag](
data: RDD[T],
memoryType: MemoryType = DRAM,
dataStrategy: DataStrategy = PARTITIONED): DistributedFeatureSet[T] = {
dataStrategy: DataStrategy = PARTITIONED,
sequentialOrder: Boolean = false,
shuffle: Boolean = true): DistributedFeatureSet[T] = {
dataStrategy match {
case PARTITIONED =>
val nodeNumber = EngineRef.getNodeNumber()
val repartitionedData = data.coalesce(nodeNumber, true).setName(data.name)
val repartitionedData = if (sequentialOrder && data.partitions.length == nodeNumber) {
data
} else {
data.coalesce(nodeNumber, true).setName(data.name)
}
memoryType match {
case DRAM =>
DRAMFeatureSet.rdd(repartitionedData)
DRAMFeatureSet.rdd(repartitionedData, sequentialOrder, shuffle)
case PMEM =>
logger.info("~~~~~~~ Caching with AEP ~~~~~~~")
PmemFeatureSet.rdd(repartitionedData, PMEM)
PmemFeatureSet.rdd(repartitionedData, PMEM, sequentialOrder, shuffle)
case DIRECT =>
logger.info("~~~~~~~ Caching with DIRECT ~~~~~~~")
PmemFeatureSet.rdd[T](repartitionedData, DIRECT)
PmemFeatureSet.rdd[T](repartitionedData, DIRECT, sequentialOrder, shuffle)
case diskM: DISK_AND_DRAM =>
logger.info(s"~~~~~~~ Caching with DISK_AND_DRAM(${diskM.numSlice}) ~~~~~~~")
if (sequentialOrder) {
throw new IllegalArgumentException("DiskFeatureSet does not support" +
" sequentialOrder.")
}

if (!shuffle) {
throw new IllegalArgumentException("DiskFeatureSet must use shuffle.")
}
new DiskFeatureSet[T](data, diskM.numSlice)
case _ =>
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ private[zoo] case class ImageFeatureArray(
object PmemFeatureSet {

private def rdd[T: ClassTag](data: RDD[T],
nativeArrayConverter: NativeArrayConverter[T]):
nativeArrayConverter: NativeArrayConverter[T],
sequentialOrder: Boolean,
shuffle: Boolean):
DistributedFeatureSet[T] = {
val countPerPartition = data.mapPartitions { iter =>
require(iter.hasNext)
Expand All @@ -192,22 +194,28 @@ object PmemFeatureSet {
nativeArrayConverter.toArray(dataIter, countIter)
}.setName(s"FeatureSet: ${data.name} cached in PMEM")
.cache()
new CachedDistributedFeatureSet[T](arrayRDD.asInstanceOf[RDD[ArrayLike[T]]])
new CachedDistributedFeatureSet[T](arrayRDD.asInstanceOf[RDD[ArrayLike[T]]],
sequentialOrder, shuffle)
}

def rdd[T: ClassTag](data: RDD[T],
memoryType: MemoryType = PMEM): DistributedFeatureSet[T] = {
memoryType: MemoryType = PMEM,
sequentialOrder: Boolean = false,
shuffle: Boolean = true): DistributedFeatureSet[T] = {
var clazz: ClassTag[T] = implicitly[ClassTag[T]]
implicitly[ClassTag[T]].runtimeClass match {
case t if t == classOf[ByteRecord] =>
rdd[ByteRecord](data.asInstanceOf[RDD[ByteRecord]],
new ByteRecordConverter(memoryType)).asInstanceOf[DistributedFeatureSet[T]]
new ByteRecordConverter(memoryType),
sequentialOrder, shuffle).asInstanceOf[DistributedFeatureSet[T]]
case t if t == classOf[Sample[Float]] =>
rdd[Sample[Float]](data.asInstanceOf[RDD[Sample[Float]]],
new SampleConverter(memoryType)).asInstanceOf[DistributedFeatureSet[T]]
new SampleConverter(memoryType),
sequentialOrder, shuffle).asInstanceOf[DistributedFeatureSet[T]]
case t if t == classOf[ImageFeature] =>
rdd[ImageFeature](data.asInstanceOf[RDD[ImageFeature]],
new ImageFeatureConverter(memoryType)).asInstanceOf[DistributedFeatureSet[T]]
new ImageFeatureConverter(memoryType),
sequentialOrder, shuffle).asInstanceOf[DistributedFeatureSet[T]]
case _ =>
throw new IllegalArgumentException(
s"${implicitly[ClassTag[T]].runtimeClass} is not supported for now")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,31 @@ object PythonFeatureSet {
class PythonFeatureSet[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZoo[T] {
def createFeatureSetFromImageFrame(
imageFrame: ImageFrame,
memoryType: String): FeatureSet[ImageFeature] = {
memoryType: String,
sequentialOrder: Boolean, shuffle: Boolean): FeatureSet[ImageFeature] = {
require(imageFrame.isDistributed(), "Only support distributed ImageFrame")
FeatureSet.rdd(imageFrame.toDistributed().rdd, MemoryType.fromString(memoryType))
FeatureSet.rdd(imageFrame.toDistributed().rdd, MemoryType.fromString(memoryType),
sequentialOrder = sequentialOrder, shuffle = shuffle)
}

def createFeatureSetFromRDD(
data: JavaRDD[Any],
memoryType: String): FeatureSet[Any] = {
FeatureSet.rdd(data, MemoryType.fromString(memoryType))
memoryType: String,
sequentialOrder: Boolean,
shuffle: Boolean): FeatureSet[Any] = {
FeatureSet.rdd(data, MemoryType.fromString(memoryType),
sequentialOrder = sequentialOrder, shuffle = shuffle)
}

def createSampleFeatureSetFromRDD(data: JavaRDD[Sample], memoryType: String)
def createSampleFeatureSetFromRDD(data: JavaRDD[Sample],
memoryType: String,
sequentialOrder: Boolean,
shuffle: Boolean)
: FeatureSet[JSample[T]] = {
FeatureSet.rdd(toJSample(data), MemoryType.fromString(memoryType))
FeatureSet.rdd(toJSample(data),
MemoryType.fromString(memoryType),
sequentialOrder = sequentialOrder,
shuffle = shuffle)
}

def transformFeatureSet(featureSet: FeatureSet[Any],
Expand Down
Loading