Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the number of partitions to zero when a range is empty #1542

Merged
merged 1 commit into from
Jan 15, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ case class GpuRangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range
val step: Long = range.step
val numSlices: Int = range.numSlices.getOrElse(sparkContext.defaultParallelism)
val numElements: BigInt = range.numElements
val isEmptyRange: Boolean = start == end || (start < end ^ 0 < step)

override val output: Seq[Attribute] = range.output

Expand Down Expand Up @@ -224,76 +225,82 @@ case class GpuRangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range
val totalTime = longMetric(TOTAL_TIME)
val maxRowCountPerBatch = Math.min(targetSizeBytes/8, Int.MaxValue)

sqlContext
.sparkContext
.parallelize(0 until numSlices, numSlices)
.mapPartitionsWithIndex { (i, _) =>
val partitionStart = (i * numElements) / numSlices * step + start
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
def getSafeMargin(bi: BigInt): Long =
if (bi.isValidLong) {
bi.toLong
} else if (bi > 0) {
Long.MaxValue
} else {
Long.MinValue
}
val safePartitionStart = getSafeMargin(partitionStart) // inclusive
val safePartitionEnd = getSafeMargin(partitionEnd) // exclusive, unless start == this
val taskContext = TaskContext.get()

val iter: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] {
private[this] var number: Long = safePartitionStart
private[this] var done: Boolean = false
private[this] val inputMetrics = taskContext.taskMetrics().inputMetrics

override def hasNext: Boolean =
if (!done) {
if (step > 0) {
number < safePartitionEnd
} else {
number > safePartitionEnd
}
} else false

override def next(): ColumnarBatch =
withResource(new NvtxWithMetrics("GpuRange", NvtxColor.DARK_GREEN, totalTime)){
_ =>
GpuSemaphore.acquireIfNecessary(taskContext)
val start = number
val remainingSteps = (safePartitionEnd - start) / step
// Start is inclusive so we need to produce at least one row
val rowsThisBatch = Math.max(1, Math.min(remainingSteps, maxRowCountPerBatch))
val endInclusive = start + ((rowsThisBatch - 1) * step)
number = endInclusive + step
if (number < endInclusive ^ step < 0) {
// we have Long.MaxValue + Long.MaxValue < Long.MaxValue
// and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a
// step back, we are pretty sure that we have an overflow.
done = true
if (isEmptyRange) {
sparkContext.emptyRDD[ColumnarBatch]
} else {
sqlContext
.sparkContext
.parallelize(0 until numSlices, numSlices)
.mapPartitionsWithIndex { (i, _) =>
val partitionStart = (i * numElements) / numSlices * step + start
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start

def getSafeMargin(bi: BigInt): Long =
if (bi.isValidLong) {
bi.toLong
} else if (bi > 0) {
Long.MaxValue
} else {
Long.MinValue
}

val safePartitionStart = getSafeMargin(partitionStart) // inclusive
val safePartitionEnd = getSafeMargin(partitionEnd) // exclusive, unless start == this
val taskContext = TaskContext.get()

val iter: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] {
private[this] var number: Long = safePartitionStart
private[this] var done: Boolean = false
private[this] val inputMetrics = taskContext.taskMetrics().inputMetrics

override def hasNext: Boolean =
if (!done) {
if (step > 0) {
number < safePartitionEnd
} else {
number > safePartitionEnd
}
} else false

override def next(): ColumnarBatch =
withResource(new NvtxWithMetrics("GpuRange", NvtxColor.DARK_GREEN, totalTime)) {
_ =>
GpuSemaphore.acquireIfNecessary(taskContext)
val start = number
val remainingSteps = (safePartitionEnd - start) / step
// Start is inclusive so we need to produce at least one row
val rowsThisBatch = Math.max(1, Math.min(remainingSteps, maxRowCountPerBatch))
val endInclusive = start + ((rowsThisBatch - 1) * step)
number = endInclusive + step
if (number < endInclusive ^ step < 0) {
// we have Long.MaxValue + Long.MaxValue < Long.MaxValue
// and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a
// step back, we are pretty sure that we have an overflow.
done = true
}

val ret = withResource(Scalar.fromLong(start)) { startScalar =>
withResource(Scalar.fromLong(step)) { stepScalar =>
withResource(
ai.rapids.cudf.ColumnVector.sequence(
startScalar, stepScalar, rowsThisBatch.toInt)) { vec =>
withResource(new Table(vec)) { tab =>
GpuColumnVector.from(tab, Array[DataType](LongType))
val ret = withResource(Scalar.fromLong(start)) { startScalar =>
withResource(Scalar.fromLong(step)) { stepScalar =>
withResource(
ai.rapids.cudf.ColumnVector.sequence(
startScalar, stepScalar, rowsThisBatch.toInt)) { vec =>
withResource(new Table(vec)) { tab =>
GpuColumnVector.from(tab, Array[DataType](LongType))
}
}
}
}
}

assert (rowsThisBatch == ret.numRows())
numOutputRows += rowsThisBatch
TrampolineUtil.incInputRecordsRows(inputMetrics, rowsThisBatch)
numOutputBatches += 1
ret
}
assert(rowsThisBatch == ret.numRows())
numOutputRows += rowsThisBatch
TrampolineUtil.incInputRecordsRows(inputMetrics, rowsThisBatch)
numOutputBatches += 1
ret
}
}
new InterruptibleIterator(taskContext, iter)
}
new InterruptibleIterator(taskContext, iter)
}
}
}

override def simpleString(maxFields: Int): String = {
Expand Down