Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Cascaded Pandas UDFs not working as expected on Databricks when plugin is enabled #10751

Closed
eordentlich opened this issue Apr 29, 2024 · 3 comments · Fixed by #10767
Closed
Assignees
Labels
bug Something isn't working

Comments

@eordentlich
Copy link
Contributor

Describe the bug
Successively applied Pandas UDFs and MapInPandas result in exceptions or make no progress in Databricks.

Steps/Code to reproduce bug

import pyspark.sql.functions as F
import numpy as np
import pandas as pd
transformed_df = spark.range(1000000) 
from pyspark.sql.functions import pandas_udf

@pandas_udf("int")
def rand_label(col: pd.Series) -> pd.Series: 
  import logging
  logger = logging.getLogger('rand_label')
  logger.info("in rand label")
  print("in rand label 1")
  return pd.Series(np.random.randint(0,999,size=col.shape[0]))

@pandas_udf("int")
def rand_label2(col: pd.Series) -> pd.Series: 
  import logging
  logger = logging.getLogger('rand_label')
  logger.info("in rand label")
  print("in rand label 2")
  return pd.Series(np.random.randint(0,999,size=col.shape[0]))

transformed_df_w_label_2 = transformed_df.withColumn("label", rand_label(F.lit(0)))
transformed_df_w_label_3 = transformed_df_w_label_2.filter(F.col('label') != 99).withColumn("label2", rand_label2(F.col('label')))
transformed_df_w_label_3.select(F.sum("label2").alias('sum2'),F.sum("label").alias('sum1')).collect()

results in an error:

java.lang.ArrayIndexOutOfBoundsException: 0
	at ai.rapids.cudf.Table.<init>(Table.java:58)
	at com.nvidia.spark.rapids.GpuColumnVector.from(GpuColumnVector.java:524)
	at org.apache.spark.sql.rapids.execution.python.RebatchingRoundoffIterator.$anonfun$next$3(GpuArrowEvalPythonExec.scala:158)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at org.apache.spark.sql.rapids.execution.python.RebatchingRoundoffIterator.next(GpuArrowEvalPythonExec.scala:157)
	at org.apache.spark.sql.rapids.execution.python.RebatchingRoundoffIterator.next(GpuArrowEvalPythonExec.scala:51)
	at org.apache.spark.sql.rapids.execution.python.BatchProducer$$anon$1.next(GpuArrowEvalPythonExec.scala:248)
	at org.apache.spark.sql.rapids.execution.python.BatchProducer$$anon$1.next(GpuArrowEvalPythonExec.scala:233)
        ...

Also, based on print statement output in the logs, the first udf appears to complete fully before the second one starts. The batches should flow through both python udfs incrementally as is the case with baseline Spark.

A different behavior is observed with the following (but I think they may be related)

transformed_df = spark.read.parquet("s3a://spark-rapids-ml-bm-datasets-public/pca/1m_3k_singlecol_float32_50_files.parquet")
features_col = 'feature_array'
prediction_col = 'label'
centers = np.random.rand(1000,3000)
from pyspark.sql.types import StructType, StructField, DoubleType

sc = transformed_df.rdd.context
centers_bc = sc.broadcast(centers)

def partition_score_udf(
    pdf_iter
) :
    local_centers = centers_bc.value.astype(np.float64)
    partition_score = 0.0
    import logging
    logger = logging.getLogger('partition_score_udf')
    logger.info("in partition score udf")
    for pdf in pdf_iter:
        print("in partition score udf")
        input_vecs = np.array(list(pdf[features_col]), dtype=np.float64)
        predictions = list(pdf[prediction_col])
        center_vecs = local_centers[predictions, :]
        partition_score += np.sum((input_vecs - center_vecs) ** 2)
    yield pd.DataFrame({"partition_score": [partition_score]})

total_score = (
  # the below is extremely slow
  # if instead of transformed_df_w_label_2 we apply to transformed_df_w_label it runs fine
  # one difference is that transformed_df_ws_label_2 is itself the output of another pandas udf
  # so data for this case is passing back and forth between jvm and python workers multiple times
    transformed_df_w_label_2.mapInPandas(
        partition_score_udf,  # type: ignore
        StructType([StructField("partition_score", DoubleType(), True)]),
    )
    .agg(F.sum("partition_score").alias("total_score"))
    .toPandas()
)  # type: ignore
total_score = total_score["total_score"][0]  # type: ignore

In this case, at least in 13.3ML, the computation slows dramatically and may be deadlocked.

Expected behavior
No exception and no slowdowns, like with baseline Spark without the plugin.

Environment details (please complete the following information)

  • Environment location: First example: Databricks 12.2ML or 13.3ML, spark-rapids 24.04. Second example is slow only in Databricks 13.3ML
  • Spark configuration settings related to the issue
spark.task.resource.gpu.amount 1
spark.task.cpus 1
spark.databricks.delta.preview.enabled true
spark.python.worker.reuse true
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-24.04.0.jar:/databricks/spark/python
spark.sql.files.minPartitionNum 2
spark.sql.execution.arrow.maxRecordsPerBatch 10000
spark.executor.cores 8
spark.rapids.memory.gpu.minAllocFraction 0.0001
spark.plugins com.nvidia.spark.SQLPlugin
spark.locality.wait 0s
spark.sql.cache.serializer com.nvidia.spark.ParquetCachedBatchSerializer
spark.rapids.memory.gpu.pooling.enabled false
spark.rapids.sql.explain ALL
spark.sql.execution.sortBeforeRepartition false
spark.rapids.sql.python.gpu.enabled true
spark.rapids.memory.pinnedPool.size 2G
spark.python.daemon.module rapids.daemon_databricks
spark.rapids.sql.batchSizeBytes 512m
spark.sql.adaptive.enabled false
spark.sql.execution.arrow.pyspark.enabled true
spark.sql.files.maxPartitionBytes 2000000000000
spark.databricks.delta.optimizeWrite.enabled false
spark.rapids.sql.concurrentGpuTasks 2

Cluster shape: 2x workers with g5.2xlarge and driver with g4dn.xlarge
Additional context
Add any other context about the problem here.

@eordentlich eordentlich added ? - Needs Triage Need team to review and classify bug Something isn't working labels Apr 29, 2024
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Apr 30, 2024
@firestarman
Copy link
Collaborator

@eordentlich Thx for finding these. Could you file another issue to track the slowness running case?

@firestarman
Copy link
Collaborator

The linked PR only fixes the ArrayIndexOutOfBoundsException.

@eordentlich
Copy link
Contributor Author

@eordentlich Thx for finding these. Could you file another issue to track the slowness running case?

Done. #10770

firestarman added a commit that referenced this issue May 7, 2024
fix #10751

A cuDF Table requires non empyt columns, so need to check the number of columns when converting a batch to a cuDF table. This PR adds the support for rows-only batches in RebatchingRoundoffIterator.

---------

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants