Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Support org.apache.spark.sql.catalyst.expressions.ReplicateRows #4104

Closed
viadea opened this issue Nov 15, 2021 · 1 comment · Fixed by #4388
Closed

[FEA] Support org.apache.spark.sql.catalyst.expressions.ReplicateRows #4104

viadea opened this issue Nov 15, 2021 · 1 comment · Fixed by #4388
Assignees
Labels
feature request New feature or request P1 Nice to have for release

Comments

@viadea
Copy link
Collaborator

viadea commented Nov 15, 2021

Is your feature request related to a problem? Please describe.

This is a feature request to support org.apache.spark.sql.catalyst.expressions.ReplicateRows.
ReplicateRows is an internal function used by optimizer to rewrite EXCEPT ALL and INTERSECT ALL queries as per the Spark source code.

Below is a mini repro:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val data = Seq(
    Row(Row("Adam ","","Green"),"1","M",1000.1, "2019-01-01",List("Java","Scala")),
    Row(Row("Bob ","Middle","Green"),"2","M",2000.2, "2019-01-02",List("Java","Python")),
    Row(Row("Cathy ","","Green"),"3","F",3000.3, "2019-01-03",List())
)

val schema = (new StructType()
  .add("name",new StructType()
    .add("firstname",StringType)
    .add("middlename",StringType)
    .add("lastname",StringType)) 
  .add("id",StringType)
  .add("gender",StringType)
  .add("salary",DoubleType)
  .add("birthdayStr",StringType)
  .add("language",ArrayType(StringType))
             )

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
df.withColumn("birthday", to_date(col("birthdayStr"))).write.format("parquet").mode("overwrite").save("/tmp/testparquet")
val df2 = spark.read.parquet("/tmp/testparquet")
df2.createOrReplaceTempView("df2")
df2.printSchema

val querytext="""SELECT gender from df2 except all (select gender from df2 where salary <> 10)"""
sql(querytext).explain

The Spark Driver log messages:

  !Exec <GenerateExec> cannot run on GPU because not all expressions can be replaced
    !NOT_FOUND <ReplicateRows> replicaterows(sum#99L, gender#76) cannot run on GPU because no GPU enabled version of expression class org.apache.spark.sql.catalyst.expressions.ReplicateRows could be found
@viadea viadea added feature request New feature or request ? - Needs Triage Need team to review and classify labels Nov 15, 2021
@Salonijain27 Salonijain27 added P1 Nice to have for release and removed ? - Needs Triage Need team to review and classify labels Nov 16, 2021
@revans2
Copy link
Collaborator

revans2 commented Nov 16, 2021

The APIS already exist in CUDF TableView.repeat. The hardest part will be memory management and combining it with the existing code that is rather explode and pos_explode specific.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request P1 Nice to have for release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants