Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44705][PYTHON] Make PythonRunner single-threaded #42385

Closed
wants to merge 7 commits into from

Conversation

utkarsh39
Copy link
Contributor

What changes were proposed in this pull request?

PythonRunner, a utility that executes Python UDFs in Spark, uses two threads in a producer-consumer model today. This multi-threading model is problematic and confusing as Spark's execution model within a task is commonly understood to be single-threaded.
More importantly, this departure of a double-threaded execution resulted in a series of customer issues involving race conditions and deadlocks between threads as the code was hard to reason about. There have been multiple attempts to reign in these issues, viz., fix 1, fix 2, fix 3. Moreover, the fixes have made the code base somewhat abstruse by introducing multiple daemon monitor threads to detect deadlocks. This PR makes PythonRunner single-threaded making it easier to reason about and improving code health.

Current Execution Model in Spark for Python UDFs

For queries containing Python UDFs, the main Java task thread spins up a new writer thread to pipe data from the child Spark plan into the Python worker evaluating the UDF. The writer thread runs in a tight loop: evaluates the child Spark plan, and feeds the resulting output to the Python worker. The main task thread simultaneously consumes the Python UDF’s output and evaluates the parent Spark plan to produce the final result.
The I/O to/from the Python worker uses blocking Java Sockets necessitating the use of two threads, one responsible for input to the Python worker and the other for output. Without two threads, it is easy to run into a deadlock. For example, the task can block forever waiting for the output from the Python worker. The output will never arrive until the input is supplied to the Python worker, which is not possible as the task thread is blocked while waiting on output.

Proposed Fix

The proposed fix is to move to the standard single-threaded execution model within a task, i.e., to do away with the writer thread. In addition to mitigating the crashes, the fix reduces the complexity of the existing code by doing away with many safety checks in place to track deadlocks in the double-threaded execution model.

In the new model, the main task thread alternates between consuming/feeding data to the Python worker using asynchronous I/O through Java’s SocketChannel. See the read() method in the code below for approximately how this is achieved.

case class PythonUDFRunner {

  private var nextRow: Row = _
  private var endOfStream = false
  private var childHasNext = true
  private var buffer: ByteBuffer = _

  def hasNext(): Boolean = nextRow != null || {
     if (!endOfStream) {
       read(buffer)
       nextRow = deserialize(buffer)
       hasNext
     } else {
       false
     }
  }

  def next(): Row = {
     if (hasNext) {
       val outputRow = nextRow
       nextRow = null
       outputRow
     } else {
       null
     }
  }
 
  def read(buf: Array[Byte]): Row = {
    var n = 0
    while (n == 0) {
    // Alternate between reading/writing to the Python worker using async I/O
    if (pythonWorker.isReadable) {
      n = pythonWorker.read(buf)
    }
    if (pythonWorker.isWritable) {
      consumeChildPlanAndWriteDataToPythonWorker()
    }
  }
  
  def consumeChildPlanAndWriteDataToPythonWorker(): Unit = {
      // Tracks whether the connection to the Python worker can be written to. 
      var socketAcceptsInput = true
      while (socketAcceptsInput && (childHasNext || buffer.hasRemaining)) {
        if (!buffer.hasRemaining && childHasNext) {
          // Consume data from the child and buffer it.
          writeToBuffer(childPlan.next(), buffer)
          childHasNext = childPlan.hasNext()
          if (!childHasNext) {
            // Exhausted child plan’s output. Write a keyword to the Python worker signaling the end of data input.
            writeToBuffer(endOfStream)
          }
        }
        // Try to write as much buffered data as possible to the Python worker.
        while (buffer.hasRemaining && socketAcceptsInput) {
          val n = writeToPythonWorker(buffer)
          // `writeToPythonWorker()` returns 0 when the socket cannot accept more data right now.
          socketAcceptsInput = n > 0
        }
      }
    }
}

Why are the changes needed?

This PR makes PythonRunner single-threaded making it easier to reason about and improving code health.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing tests.

@zhengruifeng
Copy link
Contributor

zhengruifeng commented Aug 8, 2023

cc @HyukjinKwon @ueshin

@HyukjinKwon HyukjinKwon changed the title [SPARK-44705] Make PythonRunner single-threaded [SPARK-44705][PYTHON] Make PythonRunner single-threaded Aug 8, 2023
Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already reviewed this actually. LGTM if tests pass. This is a nice fix to have.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon
Copy link
Member

Merged to master.

hvanhovell pushed a commit to hvanhovell/spark that referenced this pull request Aug 13, 2023
### What changes were proposed in this pull request?
PythonRunner, a utility that executes Python UDFs in Spark, uses two threads in a producer-consumer model today. This multi-threading model is problematic and confusing as Spark's execution model within a task is commonly understood to be single-threaded.
More importantly, this departure of a double-threaded execution resulted in a series of customer issues involving [race conditions](https://issues.apache.org/jira/browse/SPARK-33277) and [deadlocks](https://issues.apache.org/jira/browse/SPARK-38677) between threads as the code was hard to reason about. There have been multiple attempts to reign in these issues, viz., [fix 1](https://issues.apache.org/jira/browse/SPARK-22535), [fix 2](apache#30177), [fix 3](apache@243c321). Moreover, the fixes have made the code base somewhat abstruse by introducing multiple daemon [monitor threads](https://github.com/apache/spark/blob/a3a32912be04d3760cb34eb4b79d6d481bbec502/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L579) to detect deadlocks. This PR makes PythonRunner single-threaded making it easier to reason about and improving code health.

#### Current Execution Model in Spark for Python UDFs
For queries containing Python UDFs, the main Java task thread spins up a new writer thread to pipe data from the child Spark plan into the Python worker evaluating the UDF. The writer thread runs in a tight loop: evaluates the child Spark plan, and feeds the resulting output to the Python worker. The main task thread simultaneously consumes the Python UDF’s output and evaluates the parent Spark plan to produce the final result.
The I/O to/from the Python worker uses blocking Java Sockets necessitating the use of two threads, one responsible for input to the Python worker and the other for output. Without two threads, it is easy to run into a deadlock. For example, the task can block forever waiting for the output from the Python worker. The output will never arrive until the input is supplied to the Python worker, which is not possible as the task thread is blocked while waiting on output.

#### Proposed Fix

The proposed fix is to move to the standard single-threaded execution model within a task, i.e., to do away with the writer thread. In addition to mitigating the crashes, the fix reduces the complexity of the existing code by doing away with many safety checks in place to track deadlocks in the double-threaded execution model.

In the new model, the main task thread alternates between consuming/feeding data to the Python worker using asynchronous I/O through Java’s [SocketChannel](https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html). See the `read()` method in the code below for approximately how this is achieved.

```
case class PythonUDFRunner {

  private var nextRow: Row = _
  private var endOfStream = false
  private var childHasNext = true
  private var buffer: ByteBuffer = _

  def hasNext(): Boolean = nextRow != null || {
     if (!endOfStream) {
       read(buffer)
       nextRow = deserialize(buffer)
       hasNext
     } else {
       false
     }
  }

  def next(): Row = {
     if (hasNext) {
       val outputRow = nextRow
       nextRow = null
       outputRow
     } else {
       null
     }
  }

  def read(buf: Array[Byte]): Row = {
    var n = 0
    while (n == 0) {
    // Alternate between reading/writing to the Python worker using async I/O
    if (pythonWorker.isReadable) {
      n = pythonWorker.read(buf)
    }
    if (pythonWorker.isWritable) {
      consumeChildPlanAndWriteDataToPythonWorker()
    }
  }

  def consumeChildPlanAndWriteDataToPythonWorker(): Unit = {
      // Tracks whether the connection to the Python worker can be written to.
      var socketAcceptsInput = true
      while (socketAcceptsInput && (childHasNext || buffer.hasRemaining)) {
        if (!buffer.hasRemaining && childHasNext) {
          // Consume data from the child and buffer it.
          writeToBuffer(childPlan.next(), buffer)
          childHasNext = childPlan.hasNext()
          if (!childHasNext) {
            // Exhausted child plan’s output. Write a keyword to the Python worker signaling the end of data input.
            writeToBuffer(endOfStream)
          }
        }
        // Try to write as much buffered data as possible to the Python worker.
        while (buffer.hasRemaining && socketAcceptsInput) {
          val n = writeToPythonWorker(buffer)
          // `writeToPythonWorker()` returns 0 when the socket cannot accept more data right now.
          socketAcceptsInput = n > 0
        }
      }
    }
}

```
### Why are the changes needed?
This PR makes PythonRunner single-threaded making it easier to reason about and improving code health.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests.

Closes apache#42385 from utkarsh39/SPARK-44705.

Authored-by: Utkarsh <utkarsh.agarwal@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@LuciferYang
Copy link
Contributor

This PR caused the failure of the Scala 2.13 mima check. #42479

@LuciferYang
Copy link
Contributor

LuciferYang commented Aug 14, 2023

@utkarsh39

I found that this PR may caused some PySpark test cases to fail in the Java 17 daily tests(pyspark-sql and pyspark-connect module):

image

To verify this , I conducted some local testing using Java 17

java -version
openjdk version "17.0.8" 2023-07-18 LTS
OpenJDK Runtime Environment Zulu17.44+15-CA (build 17.0.8+7-LTS)
OpenJDK 64-Bit Server VM Zulu17.44+15-CA (build 17.0.8+7-LTS, mixed mode, sharing)
  1. Revert to the previous PR before SPARK-44705 and run the following commands:
// [SPARK-44765][CONNECT] Simplify retries of ReleaseExecute
git reset --hard 9bde882fcb39e9fedced0df9702df2a36c1a84e6
export SKIP_UNIDOC=true
export SKIP_MIMA=true
export SKIP_PACKAGING=true
./dev/run-tests --parallelism 1 --modules "pyspark-sql"
Finished test(python3.9): pyspark.sql.tests.test_udtf (57s) ... 2 tests were skipped

The tests in pyspark.sql.tests.test_udtf passed.

  1. Revert to SPARK-44705 and run the following commands:
// [SPARK-44705][PYTHON] Make PythonRunner single-threaded
git reset --hard 8aaff55839493e80e3ce376f928c04aa8f31d18c
export SKIP_UNIDOC=true
export SKIP_MIMA=true
export SKIP_PACKAGING=true
./dev/run-tests --parallelism 1 --modules "pyspark-sql"
======================================================================
FAIL: test_udtf_with_analyze_table_argument_adding_columns (pyspark.sql.tests.test_udtf.UDTFTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1340, in test_udtf_with_analyze_table_argument_adding_columns
    assertSchemaEqual(
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/testing/utils.py", line 356, in assertSchemaEqual
    raise PySparkAssertionError(
pyspark.errors.exceptions.base.PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.
--- actual
+++ expected
- StructType([StructField('a', LongType(), True)])
+ StructType([StructField('id', LongType(), False), StructField('is_even', BooleanType(), True)])

======================================================================
FAIL: test_udtf_with_analyze_table_argument_repeating_rows (pyspark.sql.tests.test_udtf.UDTFTests) (query_no=0)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1394, in test_udtf_with_analyze_table_argument_repeating_rows
    assertSchemaEqual(df.schema, expected_schema)
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/testing/utils.py", line 356, in assertSchemaEqual
    raise PySparkAssertionError(
pyspark.errors.exceptions.base.PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.
--- actual
+++ expected
- StructType([StructField('id', LongType(), False), StructField('is_even', BooleanType(), True)])
+ StructType([StructField('id', LongType(), False)])

======================================================================
FAIL: test_udtf_with_analyze_table_argument_repeating_rows (pyspark.sql.tests.test_udtf.UDTFTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1400, in test_udtf_with_analyze_table_argument_repeating_rows
    self.spark.sql(
AssertionError: AnalysisException not raised

======================================================================
FAIL: test_udtf_with_analyze_using_accumulator (pyspark.sql.tests.test_udtf.UDTFTests) (query_no=0)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1625, in test_udtf_with_analyze_using_accumulator
    assertSchemaEqual(df.schema, StructType().add("col1", IntegerType()))
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/testing/utils.py", line 356, in assertSchemaEqual
    raise PySparkAssertionError(
pyspark.errors.exceptions.base.PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.
--- actual
+++ expected
- StructType([StructField('a', IntegerType(), True), StructField('b', IntegerType(), True)])
+ StructType([StructField('col1', IntegerType(), True)])

======================================================================
FAIL: test_udtf_with_analyze_using_accumulator (pyspark.sql.tests.test_udtf.UDTFTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1628, in test_udtf_with_analyze_using_accumulator
    self.assertEqual(test_accum.value, 222)
AssertionError: 111 != 222

----------------------------------------------------------------------
Ran 174 tests in 54.619s

FAILED (failures=34, errors=6, skipped=2)

There are 34 test failures after this one merged.

@utkarsh39 Do you have time to fix these test cases? For this, I have created SPARK-44797.

Or should we revert this PR to restore the Java 17 daily tests first? @HyukjinKwon @ueshin @dongjoon-hyun

@utkarsh39
Copy link
Contributor Author

@utkarsh39

I found that this PR may caused some PySpark test cases to fail in the Java 17 daily tests(pyspark-sql and pyspark-connect module):

image To verify this , I conducted some local testing using Java 17
java -version
openjdk version "17.0.8" 2023-07-18 LTS
OpenJDK Runtime Environment Zulu17.44+15-CA (build 17.0.8+7-LTS)
OpenJDK 64-Bit Server VM Zulu17.44+15-CA (build 17.0.8+7-LTS, mixed mode, sharing)
  1. Revert to the previous PR before SPARK-44705 and run the following commands:
// [SPARK-44765][CONNECT] Simplify retries of ReleaseExecute
git reset --hard 9bde882fcb39e9fedced0df9702df2a36c1a84e6
export SKIP_UNIDOC=true
export SKIP_MIMA=true
export SKIP_PACKAGING=true
./dev/run-tests --parallelism 1 --modules "pyspark-sql"
Finished test(python3.9): pyspark.sql.tests.test_udtf (57s) ... 2 tests were skipped

The tests in pyspark.sql.tests.test_udtf passed.

  1. Revert to SPARK-44705 and run the following commands:
// [SPARK-44705][PYTHON] Make PythonRunner single-threaded
git reset --hard 8aaff55839493e80e3ce376f928c04aa8f31d18c
export SKIP_UNIDOC=true
export SKIP_MIMA=true
export SKIP_PACKAGING=true
./dev/run-tests --parallelism 1 --modules "pyspark-sql"
======================================================================
FAIL: test_udtf_with_analyze_table_argument_adding_columns (pyspark.sql.tests.test_udtf.UDTFTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1340, in test_udtf_with_analyze_table_argument_adding_columns
    assertSchemaEqual(
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/testing/utils.py", line 356, in assertSchemaEqual
    raise PySparkAssertionError(
pyspark.errors.exceptions.base.PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.
--- actual
+++ expected
- StructType([StructField('a', LongType(), True)])
+ StructType([StructField('id', LongType(), False), StructField('is_even', BooleanType(), True)])

======================================================================
FAIL: test_udtf_with_analyze_table_argument_repeating_rows (pyspark.sql.tests.test_udtf.UDTFTests) (query_no=0)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1394, in test_udtf_with_analyze_table_argument_repeating_rows
    assertSchemaEqual(df.schema, expected_schema)
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/testing/utils.py", line 356, in assertSchemaEqual
    raise PySparkAssertionError(
pyspark.errors.exceptions.base.PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.
--- actual
+++ expected
- StructType([StructField('id', LongType(), False), StructField('is_even', BooleanType(), True)])
+ StructType([StructField('id', LongType(), False)])

======================================================================
FAIL: test_udtf_with_analyze_table_argument_repeating_rows (pyspark.sql.tests.test_udtf.UDTFTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1400, in test_udtf_with_analyze_table_argument_repeating_rows
    self.spark.sql(
AssertionError: AnalysisException not raised

======================================================================
FAIL: test_udtf_with_analyze_using_accumulator (pyspark.sql.tests.test_udtf.UDTFTests) (query_no=0)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1625, in test_udtf_with_analyze_using_accumulator
    assertSchemaEqual(df.schema, StructType().add("col1", IntegerType()))
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/testing/utils.py", line 356, in assertSchemaEqual
    raise PySparkAssertionError(
pyspark.errors.exceptions.base.PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.
--- actual
+++ expected
- StructType([StructField('a', IntegerType(), True), StructField('b', IntegerType(), True)])
+ StructType([StructField('col1', IntegerType(), True)])

======================================================================
FAIL: test_udtf_with_analyze_using_accumulator (pyspark.sql.tests.test_udtf.UDTFTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1628, in test_udtf_with_analyze_using_accumulator
    self.assertEqual(test_accum.value, 222)
AssertionError: 111 != 222

----------------------------------------------------------------------
Ran 174 tests in 54.619s

FAILED (failures=34, errors=6, skipped=2)

There are 34 test failures after this one merged.

@utkarsh39 Do you have time to fix these test cases? For this, I have created SPARK-44797.

Or should we revert this PR to restore the Java 17 daily tests first? @HyukjinKwon @ueshin @dongjoon-hyun

I will try to get these tests fixed ASAP

@ueshin
Copy link
Member

ueshin commented Aug 14, 2023

I think #42422 includes the fix. Could you take a look?

@ueshin
Copy link
Member

ueshin commented Aug 14, 2023

I merged #42422. Let's see the next daily tests. Thanks.

@LuciferYang
Copy link
Contributor

I merged #42422. Let's see the next daily tests. Thanks.

Thanks ~

*/
@DeveloperApi
@deprecated("Only usage for Python evaluation is now extinct", "3.5.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@utkarsh39 This should be 4.0.0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR to fix it: #42494

@LuciferYang
Copy link
Contributor

I merged #42422. Let's see the next daily tests. Thanks.

https://github.com/apache/spark/actions/runs/5861115482/job/15890643041

image

Still some failed, can't determine the reason for now, further investigation is needed.

@ueshin
Copy link
Member

ueshin commented Aug 15, 2023

@LuciferYang The error is different from the previous one that seems to be fixed.
Is it possible to rerun the test? I guess it's just flaky.

@LuciferYang
Copy link
Contributor

@LuciferYang The error is different from the previous one that seems to be fixed. Is it possible to rerun the test? I guess it's just flaky.

re-run the failed ones, there are 26 [Errno 111] Connection refused, not sure this is flaky

@LuciferYang
Copy link
Contributor

@LuciferYang The error is different from the previous one that seems to be fixed. Is it possible to rerun the test? I guess it's just flaky.

The test passed after retrying, thanks for your work ~ @ueshin

ueshin pushed a commit that referenced this pull request Aug 15, 2023
### What changes were proposed in this pull request?
#42385 deprecated `ContextAwareIterator` but the deprecation version was incorrectly set to 3.5. This PR fixes it to be 4.0.

### Why are the changes needed?
Fix deprecation version.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Not needed.

Closes #42494 from utkarsh39/SPARK-44705-fix-deprecation-version.

Authored-by: Utkarsh <utkarsh.agarwal@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
Comment on lines +74 to +76
val workerFactory =
new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap)
val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this change about?
It broke stop() method below.
cc: @WweiL, @HyukjinKwon

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@utkarsh39 we will create a followup ticket to fix this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @WweiL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks guys.

valentinp17 pushed a commit to valentinp17/spark that referenced this pull request Aug 24, 2023
### What changes were proposed in this pull request?
PythonRunner, a utility that executes Python UDFs in Spark, uses two threads in a producer-consumer model today. This multi-threading model is problematic and confusing as Spark's execution model within a task is commonly understood to be single-threaded.
More importantly, this departure of a double-threaded execution resulted in a series of customer issues involving [race conditions](https://issues.apache.org/jira/browse/SPARK-33277) and [deadlocks](https://issues.apache.org/jira/browse/SPARK-38677) between threads as the code was hard to reason about. There have been multiple attempts to reign in these issues, viz., [fix 1](https://issues.apache.org/jira/browse/SPARK-22535), [fix 2](apache#30177), [fix 3](apache@243c321). Moreover, the fixes have made the code base somewhat abstruse by introducing multiple daemon [monitor threads](https://github.com/apache/spark/blob/a3a32912be04d3760cb34eb4b79d6d481bbec502/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L579) to detect deadlocks. This PR makes PythonRunner single-threaded making it easier to reason about and improving code health.

#### Current Execution Model in Spark for Python UDFs
For queries containing Python UDFs, the main Java task thread spins up a new writer thread to pipe data from the child Spark plan into the Python worker evaluating the UDF. The writer thread runs in a tight loop: evaluates the child Spark plan, and feeds the resulting output to the Python worker. The main task thread simultaneously consumes the Python UDF’s output and evaluates the parent Spark plan to produce the final result.
The I/O to/from the Python worker uses blocking Java Sockets necessitating the use of two threads, one responsible for input to the Python worker and the other for output. Without two threads, it is easy to run into a deadlock. For example, the task can block forever waiting for the output from the Python worker. The output will never arrive until the input is supplied to the Python worker, which is not possible as the task thread is blocked while waiting on output.

#### Proposed Fix

The proposed fix is to move to the standard single-threaded execution model within a task, i.e., to do away with the writer thread. In addition to mitigating the crashes, the fix reduces the complexity of the existing code by doing away with many safety checks in place to track deadlocks in the double-threaded execution model.

In the new model, the main task thread alternates between consuming/feeding data to the Python worker using asynchronous I/O through Java’s [SocketChannel](https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html). See the `read()` method in the code below for approximately how this is achieved.

```
case class PythonUDFRunner {

  private var nextRow: Row = _
  private var endOfStream = false
  private var childHasNext = true
  private var buffer: ByteBuffer = _

  def hasNext(): Boolean = nextRow != null || {
     if (!endOfStream) {
       read(buffer)
       nextRow = deserialize(buffer)
       hasNext
     } else {
       false
     }
  }

  def next(): Row = {
     if (hasNext) {
       val outputRow = nextRow
       nextRow = null
       outputRow
     } else {
       null
     }
  }

  def read(buf: Array[Byte]): Row = {
    var n = 0
    while (n == 0) {
    // Alternate between reading/writing to the Python worker using async I/O
    if (pythonWorker.isReadable) {
      n = pythonWorker.read(buf)
    }
    if (pythonWorker.isWritable) {
      consumeChildPlanAndWriteDataToPythonWorker()
    }
  }

  def consumeChildPlanAndWriteDataToPythonWorker(): Unit = {
      // Tracks whether the connection to the Python worker can be written to.
      var socketAcceptsInput = true
      while (socketAcceptsInput && (childHasNext || buffer.hasRemaining)) {
        if (!buffer.hasRemaining && childHasNext) {
          // Consume data from the child and buffer it.
          writeToBuffer(childPlan.next(), buffer)
          childHasNext = childPlan.hasNext()
          if (!childHasNext) {
            // Exhausted child plan’s output. Write a keyword to the Python worker signaling the end of data input.
            writeToBuffer(endOfStream)
          }
        }
        // Try to write as much buffered data as possible to the Python worker.
        while (buffer.hasRemaining && socketAcceptsInput) {
          val n = writeToPythonWorker(buffer)
          // `writeToPythonWorker()` returns 0 when the socket cannot accept more data right now.
          socketAcceptsInput = n > 0
        }
      }
    }
}

```
### Why are the changes needed?
This PR makes PythonRunner single-threaded making it easier to reason about and improving code health.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests.

Closes apache#42385 from utkarsh39/SPARK-44705.

Authored-by: Utkarsh <utkarsh.agarwal@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
valentinp17 pushed a commit to valentinp17/spark that referenced this pull request Aug 24, 2023
### What changes were proposed in this pull request?
apache#42385 deprecated `ContextAwareIterator` but the deprecation version was incorrectly set to 3.5. This PR fixes it to be 4.0.

### Why are the changes needed?
Fix deprecation version.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Not needed.

Closes apache#42494 from utkarsh39/SPARK-44705-fix-deprecation-version.

Authored-by: Utkarsh <utkarsh.agarwal@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
### What changes were proposed in this pull request?
PythonRunner, a utility that executes Python UDFs in Spark, uses two threads in a producer-consumer model today. This multi-threading model is problematic and confusing as Spark's execution model within a task is commonly understood to be single-threaded.
More importantly, this departure of a double-threaded execution resulted in a series of customer issues involving [race conditions](https://issues.apache.org/jira/browse/SPARK-33277) and [deadlocks](https://issues.apache.org/jira/browse/SPARK-38677) between threads as the code was hard to reason about. There have been multiple attempts to reign in these issues, viz., [fix 1](https://issues.apache.org/jira/browse/SPARK-22535), [fix 2](apache#30177), [fix 3](apache@243c321). Moreover, the fixes have made the code base somewhat abstruse by introducing multiple daemon [monitor threads](https://github.com/apache/spark/blob/a3a32912be04d3760cb34eb4b79d6d481bbec502/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L579) to detect deadlocks. This PR makes PythonRunner single-threaded making it easier to reason about and improving code health.

#### Current Execution Model in Spark for Python UDFs
For queries containing Python UDFs, the main Java task thread spins up a new writer thread to pipe data from the child Spark plan into the Python worker evaluating the UDF. The writer thread runs in a tight loop: evaluates the child Spark plan, and feeds the resulting output to the Python worker. The main task thread simultaneously consumes the Python UDF’s output and evaluates the parent Spark plan to produce the final result.
The I/O to/from the Python worker uses blocking Java Sockets necessitating the use of two threads, one responsible for input to the Python worker and the other for output. Without two threads, it is easy to run into a deadlock. For example, the task can block forever waiting for the output from the Python worker. The output will never arrive until the input is supplied to the Python worker, which is not possible as the task thread is blocked while waiting on output.

#### Proposed Fix

The proposed fix is to move to the standard single-threaded execution model within a task, i.e., to do away with the writer thread. In addition to mitigating the crashes, the fix reduces the complexity of the existing code by doing away with many safety checks in place to track deadlocks in the double-threaded execution model.

In the new model, the main task thread alternates between consuming/feeding data to the Python worker using asynchronous I/O through Java’s [SocketChannel](https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html). See the `read()` method in the code below for approximately how this is achieved.

```
case class PythonUDFRunner {

  private var nextRow: Row = _
  private var endOfStream = false
  private var childHasNext = true
  private var buffer: ByteBuffer = _

  def hasNext(): Boolean = nextRow != null || {
     if (!endOfStream) {
       read(buffer)
       nextRow = deserialize(buffer)
       hasNext
     } else {
       false
     }
  }

  def next(): Row = {
     if (hasNext) {
       val outputRow = nextRow
       nextRow = null
       outputRow
     } else {
       null
     }
  }

  def read(buf: Array[Byte]): Row = {
    var n = 0
    while (n == 0) {
    // Alternate between reading/writing to the Python worker using async I/O
    if (pythonWorker.isReadable) {
      n = pythonWorker.read(buf)
    }
    if (pythonWorker.isWritable) {
      consumeChildPlanAndWriteDataToPythonWorker()
    }
  }

  def consumeChildPlanAndWriteDataToPythonWorker(): Unit = {
      // Tracks whether the connection to the Python worker can be written to.
      var socketAcceptsInput = true
      while (socketAcceptsInput && (childHasNext || buffer.hasRemaining)) {
        if (!buffer.hasRemaining && childHasNext) {
          // Consume data from the child and buffer it.
          writeToBuffer(childPlan.next(), buffer)
          childHasNext = childPlan.hasNext()
          if (!childHasNext) {
            // Exhausted child plan’s output. Write a keyword to the Python worker signaling the end of data input.
            writeToBuffer(endOfStream)
          }
        }
        // Try to write as much buffered data as possible to the Python worker.
        while (buffer.hasRemaining && socketAcceptsInput) {
          val n = writeToPythonWorker(buffer)
          // `writeToPythonWorker()` returns 0 when the socket cannot accept more data right now.
          socketAcceptsInput = n > 0
        }
      }
    }
}

```
### Why are the changes needed?
This PR makes PythonRunner single-threaded making it easier to reason about and improving code health.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests.

Closes apache#42385 from utkarsh39/SPARK-44705.

Authored-by: Utkarsh <utkarsh.agarwal@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
ragnarok56 pushed a commit to ragnarok56/spark that referenced this pull request Mar 2, 2024
### What changes were proposed in this pull request?
apache#42385 deprecated `ContextAwareIterator` but the deprecation version was incorrectly set to 3.5. This PR fixes it to be 4.0.

### Why are the changes needed?
Fix deprecation version.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Not needed.

Closes apache#42494 from utkarsh39/SPARK-44705-fix-deprecation-version.

Authored-by: Utkarsh <utkarsh.agarwal@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants