Skip to content

Commit

Permalink
Fix a hanging issue when processing empty data. (NVIDIA#841)
Browse files Browse the repository at this point in the history
* Fix a hanging issue when processing empty data.

The output iterator will wait on the batch queue when calling `hasNext`,
and suppose to be waked up when the Python runner inserts something into
the batch queue. But the insertion will never happen if the input data
is empty. So it hangs forever.

The solution is to let the Python runner always wake up the output iterator
after it finishes the data writing by calling the new added API `finish()`.

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Add tests for processing empty data.

The 'small_data' is small enough to let some tasks get
no data when running.

Now only test this for the Scalar type who just implements
the columnar pipeline.

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
  • Loading branch information
firestarman authored Sep 25, 2020
1 parent 36d478c commit 851af53
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
16 changes: 10 additions & 6 deletions integration_tests/src/main/python/udf_cudf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@
'spark.rapids.sql.python.gpu.enabled': 'true'
}

small_data = [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)]

def _create_df(spark):
elements = list(map(lambda i: (i, i/1.0), range(1, 5000)))
return spark.createDataFrame(elements * 2, ("id", "v"))
large_data = list(map(lambda i: (i, i/1.0), range(1, 5000))) * 2


def _create_df(spark, data=large_data):
return spark.createDataFrame(data, ("id", "v"))


# since this test requires to run different functions on CPU and GPU(need cudf),
Expand Down Expand Up @@ -76,13 +79,14 @@ def _plus_one_gpu_func(v: pd.Series) -> pd.Series:


@cudf_udf
def test_with_column(enable_cudf_udf):
@pytest.mark.parametrize('data', [small_data, large_data], ids=['small data', 'large data'])
def test_with_column(enable_cudf_udf, data):
def cpu_run(spark):
df = _create_df(spark)
df = _create_df(spark, data)
return df.withColumn("v1", _plus_one_cpu_func(df.v)).collect()

def gpu_run(spark):
df = _create_df(spark)
df = _create_df(spark, data)
return df.withColumn("v1", _plus_one_gpu_func(df.v)).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ class BatchQueue extends AutoCloseable with Arm {
}
}

def finish(): Unit = synchronized {
if (!isSet) {
// Wake up anyone waiting for the first batch.
isSet = true
notifyAll()
}
}

def remove(): ColumnarBatch = synchronized {
if (queue.isEmpty) {
null
Expand Down Expand Up @@ -369,7 +377,8 @@ class GpuArrowPythonRunner(
schema: StructType,
timeZoneId: String,
conf: Map[String, String],
batchSize: Long)
batchSize: Long,
onDataWriteFinished: () => Unit)
extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType, argOffsets)
with GpuPythonArrowOutput {

Expand Down Expand Up @@ -431,6 +440,7 @@ class GpuArrowPythonRunner(
} {
writer.close()
dataOut.flush()
if (onDataWriteFinished != null) onDataWriteFinished()
}
}
}
Expand Down Expand Up @@ -587,7 +597,8 @@ case class GpuArrowEvalPythonExec(
schema,
sessionLocalTimeZone,
pythonRunnerConf,
batchSize){
batchSize,
() => queue.finish()){
override def minReadTargetBatchSize: Int = targetReadBatchSize
}.compute(projectedIterator,
context.partitionId(),
Expand Down

0 comments on commit 851af53

Please sign in to comment.