Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Reading from iceberg table will fail. #5189

Closed
viadea opened this issue Apr 8, 2022 · 2 comments · Fixed by #5274
Closed

[BUG] Reading from iceberg table will fail. #5189

viadea opened this issue Apr 8, 2022 · 2 comments · Fixed by #5274
Assignees
Labels
bug Something isn't working

Comments

@viadea
Copy link
Collaborator

viadea commented Apr 8, 2022

Currently our plugin does not support iceberg so we want the iceberg reading can fallback to CPU gracefully instead of fail.

Env:
Spark 3.1.1 standalone cluster with Hive integration
22.04 snapshot jars

Below is a simple reading test and it will fail with our plugin:

spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 \
    --conf spark.sql.session.timeZone=UTC \
    --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC" \
    --conf "spark.executor.extraJavaOptions=-Duser.timezone=UTC" \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hive

After that, creating a demo table and run a query:

spark.sql("""CREATE TABLE spark_catalog.default.demo (id bigint, data string) USING iceberg;""")
spark.sql("""INSERT INTO spark_catalog.default.demo VALUES (1, 'a'), (2, 'b'), (3, 'c');""")
spark.sql("""select count(distinct id) from spark_catalog.default.demo""").show

Error:

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeRowToColumnarBatchIterator.fillBatch(Unknown Source)
	at com.nvidia.spark.rapids.UnsafeRowToColumnarBatchIterator.next(UnsafeRowToColumnarBatchIterator.java:122)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeRowToColumnarBatchIterator.next(Unknown Source)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.aggregateInputBatches(aggregate.scala:283)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$next$2(aggregate.scala:238)
	at scala.Option.getOrElse(Option.scala:189)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:235)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:181)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.aggregateInputBatches(aggregate.scala:283)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$next$2(aggregate.scala:238)
	at scala.Option.getOrElse(Option.scala:189)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:235)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:181)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.aggregateInputBatches(aggregate.scala:283)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$next$2(aggregate.scala:238)
	at scala.Option.getOrElse(Option.scala:189)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:235)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:181)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.aggregateInputBatches(aggregate.scala:283)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$next$2(aggregate.scala:238)
	at scala.Option.getOrElse(Option.scala:189)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:235)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:181)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.$anonfun$fetchNextBatch$2(GpuColumnarToRowExec.scala:242)
	at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
	at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.withResource(GpuColumnarToRowExec.scala:188)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.fetchNextBatch(GpuColumnarToRowExec.scala:239)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.loadNextBatch(GpuColumnarToRowExec.scala:216)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.hasNext(GpuColumnarToRowExec.scala:256)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

CPU Spark:

scala> spark.conf.set("spark.rapids.sql.enabled", false)

scala>  spark.sql("""select count(distinct id) from spark_catalog.default.demo""").show
+------------------+
|count(DISTINCT id)|
+------------------+
|                 3|
+------------------+
@viadea viadea added bug Something isn't working ? - Needs Triage Need team to review and classify labels Apr 8, 2022
@viadea
Copy link
Collaborator Author

viadea commented Apr 8, 2022

Also tested Spark 3.2 and got same error above.

@revans2
Copy link
Collaborator

revans2 commented Apr 11, 2022

We made the assumption that the data we would get would always be an UnsafeRow, even though the type passed in is InternalRow. Spark makes that assumption too in many cases. Like with collect(). Can you read data from a very small table in iceberg and just do a collect on it? From reading the Spark code you are going to get the exact same error, just at a different location in the code.

We can fix it, but the simplest fix is going to slow down processing in general, so we are going to have to think of a way to dynamically decide if we need to add in the Unsafe Projection or not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants