Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] cache of struct does not work on databricks 8.2ML #2856

Closed
viadea opened this issue Jul 1, 2021 · 2 comments · Fixed by #2880
Closed

[BUG] cache of struct does not work on databricks 8.2ML #2856

viadea opened this issue Jul 1, 2021 · 2 comments · Fixed by #2880
Assignees
Labels
bug Something isn't working

Comments

@viadea
Copy link
Collaborator

viadea commented Jul 1, 2021

Describe the bug
A clear and concise description of what the bug is.

cache of struct does not work on databricks 8.2ML.

  1. When setting spark.sql.cache.serializer com.nvidia.spark.rapids.shims.spark311.ParquetCachedBatchSerializer, it falls back on CPU.

  2. When setting spark.sql.cache.serializer com.nvidia.spark.rapids.shims.spark311db.ParquetCachedBatchSerializer, it fails with :
    ClassNotFoundException: com.nvidia.spark.rapids.shims.spark311db.ParquetCachedBatchSerializer
    I also checked the shim layers and could not find this ParquetCachedBatchSerializer in spark311db shims:

$ grep -r ParquetCachedBatchSerializer *
spark311/src/main/scala/org/apache/spark/sql/rapids/shims/spark311/GpuInMemoryTableScanExec.scala:import com.nvidia.spark.rapids.shims.spark311.ParquetCachedBatchSerializer
spark311/src/main/scala/org/apache/spark/sql/rapids/shims/spark311/GpuInMemoryTableScanExec.scala:    relation.cacheBuilder.serializer.asInstanceOf[ParquetCachedBatchSerializer]
spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala:            if (!scan.relation.cacheBuilder.serializer.isInstanceOf[ParquetCachedBatchSerializer]) {
spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala:              willNotWorkOnGpu("ParquetCachedBatchSerializer is not being used")
spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala:    if (serClass == classOf[ParquetCachedBatchSerializer]) {
spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/ParquetCachedBatchSerializer.scala:class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
spark311cdh/src/main/scala/com/nvidia/spark/rapids/shims/spark311cdh/Spark311CDHShims.scala:            if (!scan.relation.cacheBuilder.serializer.isInstanceOf[ParquetCachedBatchSerializer]) {
spark311cdh/src/main/scala/com/nvidia/spark/rapids/shims/spark311cdh/Spark311CDHShims.scala:              willNotWorkOnGpu("ParquetCachedBatchSerializer is not being used")
spark311cdh/src/main/scala/com/nvidia/spark/rapids/shims/spark311cdh/Spark311CDHShims.scala:    if (serClass == classOf[ParquetCachedBatchSerializer]) {
spark311cdh/src/main/scala/com/nvidia/spark/rapids/shims/spark311cdh/ParquetCachedBatchSerializer.scala:class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
spark312/src/main/scala/com/nvidia/spark/rapids/shims/spark312/ParquetCachedBatchSerializer.scala:class ParquetCachedBatchSerializer extends shims.spark311.ParquetCachedBatchSerializer {
spark313/src/main/scala/com/nvidia/spark/rapids/shims/spark313/ParquetCachedBatchSerializer.scala:class ParquetCachedBatchSerializer extends shims.spark312.ParquetCachedBatchSerializer {
spark320/src/main/scala/com/nvidia/spark/rapids/shims/spark320/ParquetCachedBatchSerializer.scala:class ParquetCachedBatchSerializer extends shims.spark311.ParquetCachedBatchSerializer {

Steps/Code to reproduce bug
Please provide a list of steps or a code sample to reproduce the issue.
Avoid posting private or sensitive data.

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val data = Seq(
    Row(Row("Adam ","","Green"),"1","M",1000),
    Row(Row("Bob ","Middle","Green"),"2","M",2000),
    Row(Row("Cathy ","","Green"),"3","F",3000)
)

val schema = (new StructType()
  .add("name",new StructType()
    .add("firstname",StringType)
    .add("middlename",StringType)
    .add("lastname",StringType)) 
  .add("id",StringType)
  .add("gender",StringType)
  .add("salary",IntegerType))

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
df.write.format("parquet").mode("overwrite").save("/tmp/testparquet")
val df2 = spark.read.parquet("/tmp/testparquet")
df2.createOrReplaceTempView("df2")
val df3=spark.sql("select struct(name, struct(name.firstname, name.lastname) as newname) as col from df2").cache
df3.createOrReplaceTempView("df3")

spark.sql("select count(distinct col.name.firstname) from df3").show
spark.sql("select count(distinct col.name.firstname) from df3").explain

Below plan is shown:

== Physical Plan ==
GpuColumnarToRowTransition false
+- GpuHashAggregate(keys=[], functions=[gpucount(distinct _gen_alias_173#173)]), filters=List(None))
   +- GpuShuffleCoalesce 2147483647
      +- GpuColumnarExchange gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [id=#579]
         +- GpuHashAggregate(keys=[], functions=[partial_gpucount(distinct _gen_alias_173#173)]), filters=List(None))
            +- GpuHashAggregate(keys=[_gen_alias_173#173], functions=[]), filters=List())
               +- GpuShuffleCoalesce 2147483647
                  +- GpuColumnarExchange gpuhashpartitioning(_gen_alias_173#173, 200), ENSURE_REQUIREMENTS, [id=#575]
                     +- GpuHashAggregate(keys=[_gen_alias_173#173], functions=[]), filters=List())
                        +- GpuProject [col#98.name.firstname AS _gen_alias_173#173]
                           +- GpuRowToColumnar TargetSize(2147483647)
                              +- InMemoryTableScan [col#98]
                                    +- InMemoryRelation [col#98], StorageLevel(disk, memory, deserialized, 1 replicas)
                                          +- GpuProject [named_struct(name, name#57, newname, named_struct(firstname, name#57.firstname, lastname, name#57.lastname)) AS col#98]
                                             +- GpuFileGpuScan parquet [name#57] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/testparquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:struct<firstname:string,middlename:string,lastname:string>>

Expected behavior
A clear and concise description of what you expected to happen.

Correct plan should be:

== Physical Plan ==
GpuColumnarToRowTransition false
+- GpuHashAggregate(keys=[], functions=[gpucount(distinct _gen_alias_80#80)]), filters=List(None))
   +- GpuShuffleCoalesce 2147483647
      +- GpuColumnarExchange gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [id=#121]
         +- GpuHashAggregate(keys=[], functions=[partial_gpucount(distinct _gen_alias_80#80)]), filters=List(None))
            +- GpuHashAggregate(keys=[_gen_alias_80#80], functions=[]), filters=List())
               +- GpuShuffleCoalesce 2147483647
                  +- GpuColumnarExchange gpuhashpartitioning(_gen_alias_80#80, 200), ENSURE_REQUIREMENTS, [id=#110]
                     +- GpuHashAggregate(keys=[_gen_alias_80#80], functions=[]), filters=List())
                        +- GpuProject [col#25.name.firstname AS _gen_alias_80#80]
                           +- GpuInMemoryTableScan [col#25]
                                 +- InMemoryRelation [col#25], StorageLevel(disk, memory, deserialized, 1 replicas)
                                       +- GpuProject [named_struct(name, name#16, newname, named_struct(firstname, name#16.firstname, lastname, name#16.lastname)) AS col#25]
                                          +- GpuFileGpuScan parquet [name#16] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/testparquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:struct<firstname:string,middlename:string,lastname:string>>

Environment details (please complete the following information)

  • Environment location: [Standalone, YARN, Kubernetes, Cloud(specify cloud provider)]
  • Spark configuration settings related to the issue

Databricks 8.2ML GPU with spark 3.1.1

Additional context
Add any other context about the problem here.

@viadea viadea added bug Something isn't working ? - Needs Triage Need team to review and classify labels Jul 1, 2021
@viadea
Copy link
Collaborator Author

viadea commented Jul 1, 2021

Here is the classes in 21.06 rapids jar:

$ jar tf rapids-4-spark_2.12-21.06.0.jar |grep ParquetCachedBatchSerializer.class
com/nvidia/spark/rapids/shims/spark311/ParquetCachedBatchSerializer.class
com/nvidia/spark/rapids/shims/spark312/ParquetCachedBatchSerializer.class
com/nvidia/spark/rapids/shims/spark311cdh/ParquetCachedBatchSerializer.class

@razajafri razajafri self-assigned this Jul 2, 2021
@Salonijain27 Salonijain27 removed the ? - Needs Triage Need team to review and classify label Jul 6, 2021
@tgravescs
Copy link
Collaborator

Please make sure we have a test added for this on databricks as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants