Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a potential data corruption for Pandas UDF #9942

Merged
merged 1 commit into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode
Expand Down Expand Up @@ -194,23 +194,14 @@ case class GpuAggregateInPandasExec(
}

val batchProducer = new BatchProducer(
BatchGroupedIterator(miniIter, miniAttrs, groupingRefs.indices))
val queue = new BatchQueue(batchProducer, Some(keyConverter))
val pyInputIter = batchProducer.asIterator.map { case (batch, isForPeek) =>
val inputBatch = closeOnExcept(batch) { _ =>
BatchGroupedIterator(miniIter, miniAttrs, groupingRefs.indices), Some(keyConverter))
val pyInputIter = batchProducer.asIterator.map { batch =>
withResource(batch) { _ =>
val pyInputColumns = pyInputRefs.indices.safeMap { idx =>
batch.column(idx + groupingRefs.size).asInstanceOf[GpuColumnVector].incRefCount()
}
new ColumnarBatch(pyInputColumns.toArray, batch.numRows())
}
if (isForPeek) {
batch.close()
} else {
// When adding batch to the queue, queue will convert it to a key batch because this
// queue is constructed with the key converter.
queue.add(batch)
}
inputBatch
}

// Third, sends to Python to execute the aggregate and returns the result.
Expand All @@ -232,8 +223,8 @@ case class GpuAggregateInPandasExec(
val combinedAttrs = gpuGroupingExpressions.map(_.toAttribute) ++ pyOutAttributes
val resultRefs = GpuBindReferences.bindGpuReferences(resultExprs, combinedAttrs)
// Gets the combined batch for each group and projects for the output.
new CombiningIterator(queue, pyOutputIterator, pyRunner, mNumOutputRows,
mNumOutputBatches).map { combinedBatch =>
new CombiningIterator(batchProducer.getBatchQueue, pyOutputIterator, pyRunner,
mNumOutputRows, mNumOutputBatches).map { combinedBatch =>
withResource(combinedBatch) { batch =>
GpuProjectExec.project(batch, resultRefs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,143 +171,151 @@ class RebatchingRoundoffIterator(
}

/**
* Work with BatchQueue to support BatchQueue's peek operation by pulling
* in a batch from the input iterator on demand.
* A trait provides dedicated APIs for GPU reading batches from python.
* This is also for easy type declarations since it is implemented by an inner class
* of BatchProducer.
*/
trait BatchQueue {
/** Return and remove the first batch in the cache. Caller should close it. */
def remove(): SpillableColumnarBatch

/** Get the number of rows in the next batch, without actually getting the batch. */
def peekBatchNumRows(): Int
}

/**
* It accepts an iterator as input and will cache the batches when pulling them in from
* the input for later combination with batches coming back from python by the reader.
* It also supports an optional converter to convert input batches and put the converted
* result to the cache queue. This is for GpuAggregateInPandas to build and cache key
* batches.
*
* It also supports accessing batches from the input by an iterator. Call
* "asIterator" to get the iterator. This iterator will return a tuple of
* ColumnarBatch and Boolean. And the boolean indicates whether the batch
* is pulled in for peak.
* Call "getBatchQueue" to get the internal cache queue and specify it to the output
* combination iterator.
* To access the batches from input, call "asIterator" to get the output iterator.
*/
class BatchProducer(input: Iterator[ColumnarBatch]) extends AutoCloseable { producer =>
class BatchProducer(
input: Iterator[ColumnarBatch],
converter: Option[ColumnarBatch => ColumnarBatch] = None
) extends AutoCloseable { producer =>

Option(TaskContext.get()).foreach(onTaskCompletion(_)(close()))

// Cache for batches pulled in by the "produce" call for the peek operation.
// In fact, there is usually only one batch. But using a queue here is because in
// A queue that holds the pending batches that need to line up with and combined
// with batches coming back from python.
private[this] val batchQueue = new BatchQueueImpl

/** Get the internal BatchQueue */
def getBatchQueue: BatchQueue = batchQueue

// The cache that holds the pending batches pulled in by the "produce" call for
// the reader peeking the next rows number when the "batchQueue" is empty, and
// consumed by the iterator returned from "asIterator".
// (In fact, there is usually only ONE batch. But using a queue here is because in
// theory "produce" can be called multiple times, then more than one batch can be
// pulled in.
private val pending = mutable.Queue[SpillableColumnarBatch]()
// pulled in.)
private[this] val pendingOutput = mutable.Queue[SpillableColumnarBatch]()

private[rapids] def produce(): ColumnarBatch = producer.synchronized {
private def produce(): ColumnarBatch = {
if (input.hasNext) {
val cb = input.next()
// Need to duplicate this batch for "next"
pending.enqueue(SpillableColumnarBatch(GpuColumnVector.incRefCounts(cb),
pendingOutput.enqueue(SpillableColumnarBatch(GpuColumnVector.incRefCounts(cb),
SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
cb
} else {
null
}
}

def asIterator: Iterator[(ColumnarBatch, Boolean)] = {
new Iterator[(ColumnarBatch, Boolean)] {
/** Return an iterator to access the batches from the input */
def asIterator: Iterator[ColumnarBatch] = {
new Iterator[ColumnarBatch] {

override def hasNext: Boolean = producer.synchronized {
pending.nonEmpty || input.hasNext
pendingOutput.nonEmpty || input.hasNext
}

override def next(): (ColumnarBatch, Boolean) = producer.synchronized {
override def next(): ColumnarBatch = producer.synchronized {
if (!hasNext) {
throw new NoSuchElementException()
}
if (pending.nonEmpty) {
withResource(pending.dequeue()) { scb =>
(scb.getColumnarBatch(), true)
if (pendingOutput.nonEmpty) {
withResource(pendingOutput.dequeue()) { scb =>
scb.getColumnarBatch()
}
} else {
(input.next(), false)
closeOnExcept(input.next()) { cb =>
// Need to duplicate it for later combination with Python output
batchQueue.add(GpuColumnVector.incRefCounts(cb))
cb
}
}
}
}
}

override def close(): Unit = synchronized {
while(pending.nonEmpty) {
pending.dequeue().close()
override def close(): Unit = producer.synchronized {
batchQueue.close()
while (pendingOutput.nonEmpty) {
pendingOutput.dequeue().close()
}
}
}

/**
* A simple queue that holds the pending batches that need to line up with
* and combined with batches coming back from python.
*
* It will ask for a batch from "batchProducer" when peeking the rows number
* and the queue is empty.
* It also supports an optional converter to convert the input batch and save
* the converted batch. This is design for the GpuAggregateInPandasExec to save
* the group key instead of the original input batch.
*/
class BatchQueue(
batchProducer: BatchProducer,
converter: Option[ColumnarBatch => ColumnarBatch] = None
) extends AutoCloseable {

assert(batchProducer != null, "BatchQueue requires a BatchProducer")
Option(TaskContext.get()).foreach(onTaskCompletion(_)(close()))

private val queue = mutable.ArrayBuffer[SpillableColumnarBatch]()

private[this] def convertIfAny(batch: ColumnarBatch): ColumnarBatch = {
converter.map { convert =>
withResource(batch)(convert)
}.getOrElse(batch)
}

/** Add a batch to the queue, the input batch will be taken over, do not use it anymore */
def add(batch: ColumnarBatch): Unit = {
val cb = convertIfAny(batch)
this.synchronized {
queue.append(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
// Put this batch queue inside the BatchProducer to share the same lock with the
// output iterator returned by "asIterator" and make sure the batch movement from
// input iterator to this queue is an atomic operation.
// In a two-threaded Python runner, using two locks to protect the batch pulling
// from the input and the batch queue separately can not ensure batches in the
// queue has the same order as they are pulled in from the input. Because there is
// a race when the reader and the writer append batches to the queue.
// One possible case is:
// 1) the writer thread gets a batch A, but next it pauses.
// 2) then the reader thread gets the next Batch B, and appends it to the queue.
// 3) the writer thread restores and appends batch A to the queue.
// Therefore, batch A and B have the reversed order in the queue now, leading to data
// corruption when doing the combination.
private class BatchQueueImpl extends BatchQueue with AutoCloseable {
private val queue = mutable.Queue[SpillableColumnarBatch]()

/** Add a batch to the queue, the input batch will be taken over, do not use it anymore */
private[python] def add(batch: ColumnarBatch): Unit = {
val cb = converter.map { convert =>
withResource(batch)(convert)
}.getOrElse(batch)
queue.enqueue(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
}
}

/** Return and remove the first batch in the cache. */
def remove(): SpillableColumnarBatch = synchronized {
if (queue.isEmpty) {
null
} else {
queue.remove(0)
/** Return and remove the first batch in the cache. Caller should close it */
override def remove(): SpillableColumnarBatch = producer.synchronized {
if (queue.isEmpty) {
null
} else {
queue.dequeue()
}
}
}

/** Get the number of rows in the next batch, without actually getting the batch. */
def peekBatchNumRows(): Int = {
val isEmpty = this.synchronized {
queue.isEmpty
}
if (isEmpty) {
// Try to ask for the next batch instead of waiting for inserting a
// batch by the python runner's writing. Because the writing may
// happen after this peak in the single threaded python runner, leading
// to a hang.
// Do not call it inside a lock to avoid any dead lock.
val nextBatch = batchProducer.produce()
if (nextBatch != null) {
val cb = convertIfAny(nextBatch)
this.synchronized {
// Since we release the lock for some time, it is possible some batches
// have been added into the queue. Then we need to make sure this batch
// is the first one.
queue.insert(0, SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
/** Get the number of rows in the next batch, without actually getting the batch. */
override def peekBatchNumRows(): Int = producer.synchronized {
// Try to pull in the next batch for peek
if (queue.isEmpty) {
val cb = produce()
if (cb != null) {
add(cb)
}
}
}

this.synchronized {
if (queue.nonEmpty) {
queue.head.numRows()
} else {
0 // Should not go here but just in case.
}
}
}

override def close(): Unit = synchronized {
while (queue.nonEmpty) {
queue.remove(0).close()
override def close(): Unit = producer.synchronized {
while (queue.nonEmpty) {
queue.dequeue().close()
}
}
}
}
Expand Down Expand Up @@ -399,19 +407,8 @@ case class GpuArrowEvalPythonExec(
val batchProducer = new BatchProducer(
new RebatchingRoundoffIterator(iter, inputSchema, targetBatchSize, numInputRows,
numInputBatches))
val queue = new BatchQueue(batchProducer)
val pyInputIterator = batchProducer.asIterator.map { case (batch, isForPeek) =>
// We have to do the project before we add the batch because the batch might be closed
// when it is added
val ret = closeOnExcept(batch)(GpuProjectExec.project(_, boundReferences))
if (isForPeek) {
batch.close()
} else {
// We only add the batch that is not for peek, because the batch for peek is already
// added by the reader when peeking the next rows number.
queue.add(batch)
}
ret
val pyInputIterator = batchProducer.asIterator.map { batch =>
withResource(batch)(GpuProjectExec.project(_, boundReferences))
}

if (isPythonOnGpuEnabled) {
Expand All @@ -431,7 +428,7 @@ case class GpuArrowEvalPythonExec(
pythonOutputSchema)

val outputIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context)
new CombiningIterator(queue, outputIterator, pyRunner, numOutputRows,
new CombiningIterator(batchProducer.getBatchQueue, outputIterator, pyRunner, numOutputRows,
numOutputBatches)
} else {
// Empty partition, return it directly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf
import ai.rapids.cudf.{GroupByAggregation, NullPolicy, OrderByArg}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
Expand Down Expand Up @@ -505,24 +505,13 @@ trait GpuWindowInPandasExecBase extends ShimUnaryExecNode with GpuPythonExecBase
val boundPartitionRefs = GpuBindReferences.bindGpuReferences(gpuPartitionSpec, childOutput)
val batchProducer = new BatchProducer(
new GroupingIterator(inputIter, boundPartitionRefs, numInputRows, numInputBatches))
val queue = new BatchQueue(batchProducer)
val pyInputIterator = batchProducer.asIterator.map { case (batch, isForPeek) =>
// We have to do the project before we add the batch because the batch might be closed
// when it is added
val inputBatch = closeOnExcept(batch) { _ =>
val pyInputIterator = batchProducer.asIterator.map { batch =>
withResource(batch) { _ =>
withResource(GpuProjectExec.project(batch, boundDataRefs)) { projectedCb =>
// Compute the window bounds and insert to the head of each row for one batch
insertWindowBounds(projectedCb)
}
}
if (isForPeek) {
batch.close()
} else {
// We only add the batch that is not for peek, because the batch for peek is already
// added by the reader when peeking the next rows number.
queue.add(batch)
}
inputBatch
}

if (isPythonOnGpuEnabled) {
Expand All @@ -543,8 +532,8 @@ trait GpuWindowInPandasExecBase extends ShimUnaryExecNode with GpuPythonExecBase
pythonOutputSchema)

val outputIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context)
new CombiningIterator(queue, outputIterator, pyRunner, numOutputRows,
numOutputBatches).map(projectResult)
new CombiningIterator(batchProducer.getBatchQueue, outputIterator, pyRunner,
numOutputRows, numOutputBatches).map(projectResult)
} else {
// Empty partition, return the input iterator directly
inputIter
Expand Down
Loading