Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify Parquet columnar encryption is handled safely #5246

Closed
jlowe opened this issue Apr 13, 2022 · 14 comments · Fixed by #5761
Closed

Verify Parquet columnar encryption is handled safely #5246

jlowe opened this issue Apr 13, 2022 · 14 comments · Fixed by #5761
Assignees
Labels
bug Something isn't working task Work required that improves the product but is not user facing

Comments

@jlowe
Copy link
Member

jlowe commented Apr 13, 2022

Spark 3.2 added support for columnar encryption in Parquet. We probably cannot detect this at query planning time, but minimally we need to make sure we are not corrupting the data during reads on encrypted Parquet files and instead throwing an exception. Long-term we either need to directly support encrypted data or dynamic, post-planning fallback to the CPU on an encrypted Parquet read.

@jlowe jlowe added ? - Needs Triage Need team to review and classify task Work required that improves the product but is not user facing labels Apr 13, 2022
@sameerz sameerz removed the ? - Needs Triage Need team to review and classify label Apr 19, 2022
@HaoYang670 HaoYang670 self-assigned this Apr 27, 2022
@HaoYang670
Copy link
Collaborator

I'd like to have a try!

@HaoYang670
Copy link
Collaborator

Verified on Spark3.3. It seems like our plugin could decrypt correctly.

scala> val df = Seq((1, 2, 3)).toDF("one", "tWo", "THREE")
df: org.apache.spark.sql.DataFrame = [one: int, tWo: int ... 1 more field]

scala> sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" ,
     |                            "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")

scala> sc.hadoopConfiguration.set("parquet.encryption.key.list" ,
     |                    "keyA:AAECAwQFBgcICQoLDA0ODw== ,  keyB:AAECAAECAAECAAECAAECAA==")

scala> sc.hadoopConfiguration.set("parquet.crypto.factory.class" ,
     |                    "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")

scala> df.write.
     |    option("parquet.encryption.column.keys" , "keyA:one").
     |    option("parquet.encryption.footer.key" , "keyB").
     | parquet("./tmp/parquet")
22/05/07 20:08:10 WARN GpuOverrides: 
*Exec <DataWritingCommandExec> will run on GPU
  *Output <InsertIntoHadoopFsRelationCommand> will run on GPU
  ! <LocalTableScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.LocalTableScanExec
    @Expression <AttributeReference> one#10 could run on GPU
    @Expression <AttributeReference> tWo#11 could run on GPU
    @Expression <AttributeReference> THREE#12 could run on GPU


scala> spark.read.parquet("./tmp/parquet").show
22/05/07 20:08:27 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> cast(one#19 as string) AS one#28 will run on GPU
      *Expression <Cast> cast(one#19 as string) will run on GPU
    *Expression <Alias> cast(tWo#20 as string) AS tWo#29 will run on GPU
      *Expression <Cast> cast(tWo#20 as string) will run on GPU
    *Expression <Alias> cast(THREE#21 as string) AS THREE#30 will run on GPU
      *Expression <Cast> cast(THREE#21 as string) will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

+---+---+-----+
|one|tWo|THREE|
+---+---+-----+
|  1|  2|    3|
+---+---+-----+

@jlowe
Copy link
Member Author

jlowe commented May 9, 2022

I do not think this has been validated. It looks like this passed because the GPU was allowed to perform the Parquet write despite encryption being requested. The Parquet file was then written without encryption which explains why the read passed.

To test this properly, the file must be written by the CPU (i.e.: either run without the RAPIDS Accelerator or disable it before performing the write). I'll file an issue to document that we're not properly preventing Parquet writes from being replaced when encryption is requested.

@HaoYang670
Copy link
Collaborator

Thank you for your review @jlowe. I also find there are some mistakes in my verification.
Based on your suggestion, I tried cpu-write and gpu-read. And there is an runtime error Caused by: org.apache.parquet.crypto.ParquetCryptoRuntimeException: Trying to read file with encrypted footer. No keys available

scala> spark.conf.set("spark.rapids.sql.format.parquet.read.enabled", "true")

scala> spark.conf.set("spark.rapids.sql.format.parquet.enabled", "true")

scala> spark.read.parquet("./tmp/parquet").show
22/05/11 09:23:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:375) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:477) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:523) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:517) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:76) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_292]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_292]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
Caused by: java.io.IOException: Could not read footer for file: FileStatus{path=file:/home/remziy/working/rapids/tmp/parquet/part-00000-2aaef34d-78ac-49ab-b8b7-7afda73b5669-c000.snappy.parquet; isDirectory=false; length=2575; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFooterForFileError(QueryExecutionErrors.scala:814) ~[spark-catalyst_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:490) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) ~[scala-library-2.12.15.jar:?]
	at scala.util.Success.$anonfun$map$1(Try.scala:255) ~[scala-library-2.12.15.jar:?]
	at scala.util.Success.map(Try.scala:213) ~[scala-library-2.12.15.jar:?]
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) ~[scala-library-2.12.15.jar:?]
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?]
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?]
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) ~[scala-library-2.12.15.jar:?]
	at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) ~[?:1.8.0_292]
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_292]
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_292]
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_292]
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) ~[?:1.8.0_292]
Caused by: org.apache.parquet.crypto.ParquetCryptoRuntimeException: Trying to read file with encrypted footer. No keys available
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:588) ~[parquet-hadoop-1.12.2.jar:1.12.2]
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776) ~[parquet-hadoop-1.12.2.jar:1.12.2]
	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657) ~[parquet-hadoop-1.12.2.jar:1.12.2]
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:53) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:44) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:484) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) ~[scala-library-2.12.15.jar:?]
	at scala.util.Success.$anonfun$map$1(Try.scala:255) ~[scala-library-2.12.15.jar:?]
	at scala.util.Success.map(Try.scala:213) ~[scala-library-2.12.15.jar:?]
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) ~[scala-library-2.12.15.jar:?]
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?]
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?]
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) ~[scala-library-2.12.15.jar:?]
	at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) ~[?:1.8.0_292]
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_292]
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_292]
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_292]
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) ~[?:1.8.0_292]
22/05/11 09:23:30 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (remzi-desktop executor driver): org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:375)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:477)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:523)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:517)
	at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:76)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not read footer for file: FileStatus{path=file:/home/remziy/working/rapids/tmp/parquet/part-00000-2aaef34d-78ac-49ab-b8b7-7afda73b5669-c000.snappy.parquet; isDirectory=false; length=2575; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFooterForFileError(QueryExecutionErrors.scala:814)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:490)
	at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.parquet.crypto.ParquetCryptoRuntimeException: Trying to read file with encrypted footer. No keys available
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:588)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776)
	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:53)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:44)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:484)
	... 13 more

22/05/11 09:23:30 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (remzi-desktop executor driver): org.apache.spark.SparkException: Exception thrown in awaitResult:
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:375)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:477)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:523)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:517)
	at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:76)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not read footer for file: FileStatus{path=file:/home/remziy/working/rapids/tmp/parquet/part-00000-2aaef34d-78ac-49ab-b8b7-7afda73b5669-c000.snappy.parquet; isDirectory=false; length=2575; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFooterForFileError(QueryExecutionErrors.scala:814)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:490)
	at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.parquet.crypto.ParquetCryptoRuntimeException: Trying to read file with encrypted footer. No keys available
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:588)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776)
	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:53)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:44)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:484)
	... 13 more

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
  at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
  at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:70)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:527)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:125)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:168)
  at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:210)
  at scala.Option.orElse(Option.scala:447)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:207)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:411)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:227)
  at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:209)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:209)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:553)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:538)
  ... 47 elided
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
  at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
  at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:375)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:477)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:523)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:517)
  at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:76)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:136)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not read footer for file: FileStatus{path=file:/home/remziy/working/rapids/tmp/parquet/part-00000-2aaef34d-78ac-49ab-b8b7-7afda73b5669-c000.snappy.parquet; isDirectory=false; length=2575; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}
  at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFooterForFileError(QueryExecutionErrors.scala:814)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:490)
  at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
  at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
  at scala.util.Success.$anonfun$map$1(Try.scala:255)
  at scala.util.Success.map(Try.scala:213)
  at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
  at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
  at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
  at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
  at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
  at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
  at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
  at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.parquet.crypto.ParquetCryptoRuntimeException: Trying to read file with encrypted footer. No keys available
  at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:588)
  at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776)
  at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:53)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:44)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:484)
  ... 13 more

@HaoYang670
Copy link
Collaborator

And just as you said, for gpu write + gpu read, the encryption is bypassed.

spark.conf.set("spark.rapids.sql.format.parquet.write.enabled", "true")

scala> sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" ,
     | "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")

scala> sc.hadoopConfiguration.set("parquet.encryption.key.list" ,
     | "keyA:AAECAwQFBgcICQoLDA0ODw== ,  keyB:AAECAAECAAECAAECAAECAA==")

scala> sc.hadoopConfiguration.set("parquet.crypto.factory.class" ,
     | "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")

scala> df.write.
     | option("parquet.encryption.column.keys" , "keyA:one").
     | option("parquet.encryption.footer.key" , "keyB").
     | parquet("./tmp/parquet")
22/05/11 09:30:02 WARN GpuOverrides: 
*Exec <DataWritingCommandExec> will run on GPU
  *Output <InsertIntoHadoopFsRelationCommand> will run on GPU
  ! <LocalTableScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.LocalTableScanExec
    @Expression <AttributeReference> one#10 could run on GPU
    @Expression <AttributeReference> tWo#11 could run on GPU
    @Expression <AttributeReference> THREE#12 could run on GPU


scala> spark.read.parquet("./tmp/parquet").show
22/05/11 09:30:18 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> cast(one#19 as string) AS one#28 will run on GPU
      *Expression <Cast> cast(one#19 as string) will run on GPU
    *Expression <Alias> cast(tWo#20 as string) AS tWo#29 will run on GPU
      *Expression <Cast> cast(tWo#20 as string) will run on GPU
    *Expression <Alias> cast(THREE#21 as string) AS THREE#30 will run on GPU
      *Expression <Cast> cast(THREE#21 as string) will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

+---+---+-----+
|one|tWo|THREE|
+---+---+-----+
|  1|  2|    3|
+---+---+-----+

@GaryShen2008
Copy link
Collaborator

@jlowe
It seems we can't support reading encrypted parquet file for now.
If we can't detect it in query planning, shall we have a short term solution to avoid it?
E.g. we provide a conf like spark.rapids.parquet.hasEncryption=false as default, if it's true, we tag ParquetScan to fallback CPU. And if we can detect the encryption in Runtime, we can throw an exception to tell the user to set it to true.

@jlowe
Copy link
Member Author

jlowe commented May 16, 2022

First step is to verify that all the different ways Parquet could be encrypted are detected with a thrown exception if we try to read on the GPU to avoid the possibility of silent data corruption. I believe this is already covered, but we should verify there are no other ways the file could be encrypted beyond what is already tested.

we provide a conf like spark.rapids.parquet.hasEncryption=false as default, if it's true, we tag ParquetScan to fallback CPU.

I don't see a need for another config. If the user doesn't want Parquet to be read by the GPU, there's already the existing config, spark.rapids.sql.format.parquet.read.enabled.

It might make sense to automatically fallback if certain Parquet encryption settings are present (e.g.: when the user provides a secret key via a config), but I could see cases where only a few Parquet files (or maybe none!) require the secret key, and we would end up falling back from all Parquet reads. But maybe this is the best default setup for now until we have improved support for encrypted Parquet files.

And if we can detect the encryption in Runtime, we can throw an exception to tell the user to set it to true.

Agree it would be good to refine any exception messages being thrown to inform the user how to avoid this via config settings.

@HaoYang670
Copy link
Collaborator

@jlowe @GaryShen2008
Sorry, I forgot to configure the keys when reading on GPU. So please ignore the event log I pasted last time.
After double confirmed, GPU could read encrypted parquet file:

scala> sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" ,
     | "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")

scala> sc.hadoopConfiguration.set("parquet.encryption.key.list" ,
     | "keyA:AAECAwQFBgcICQoLDA0ODw== ,  keyB:AAECAAECAAECAAECAAECAA==")

scala> sc.hadoopConfiguration.set("parquet.crypto.factory.class" ,
     | "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")

scala> spark.conf.set("spark.rapids.sql.format.parquet.read.enabled", "true")

scala> spark.conf.set("spark.rapids.sql.format.parquet.enabled", "true")

scala> spark.read.
     | option("parquet.encryption.column.keys" , "keyA:one").
     | option("parquet.encryption.footer.key" , "keyB").
     | parquet("./tmp/parquet/encrypted")
res5: org.apache.spark.sql.DataFrame = [one: int, tWo: int ... 1 more field]

scala> val df = spark.read.
     | option("parquet.encryption.column.keys" , "keyA:one").
     | option("parquet.encryption.footer.key" , "keyB").
     | parquet("./tmp/parquet/encrypted")
df: org.apache.spark.sql.DataFrame = [one: int, tWo: int ... 1 more field]

scala> df.show
22/05/24 13:37:52 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> cast(one#6 as string) AS one#19 will run on GPU
      *Expression <Cast> cast(one#6 as string) will run on GPU
    *Expression <Alias> cast(tWo#7 as string) AS tWo#20 will run on GPU
      *Expression <Cast> cast(tWo#7 as string) will run on GPU
    *Expression <Alias> cast(THREE#8 as string) AS THREE#21 will run on GPU
      *Expression <Cast> cast(THREE#8 as string) will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

+---+---+-----+
|one|tWo|THREE|
+---+---+-----+
|  0|  2|    3|
+---+---+-----+

@jlowe
Copy link
Member Author

jlowe commented May 24, 2022

Since the footer is read on the CPU by the same Parquet code, I'm not surprised that the column names are retrieved properly. However I do not see any way we can reliably produce the proper output if the columns are encrypted. I suspect your finding is an anomaly caused by the tiny amount of data and only encrypting one of the columns. For example, here's what I get for the same example, which is not quite correct:

scala> sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" , "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")

scala> sc.hadoopConfiguration.set("parquet.encryption.key.list" , "keyA:AAECAwQFBgcICQoLDA0ODw== ,  keyB:AAECAAECAAECAAECAAECAA==")

scala> sc.hadoopConfiguration.set("parquet.crypto.factory.class" , "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")

scala> spark.read.parquet("/tmp/encrypted").show()
+---+---+-----+                                                                 
|one|tWo|THREE|
+---+---+-----+
|  0|  2|    3|
+---+---+-----+


scala> spark.conf.set("spark.rapids.sql.enabled", "false")

scala> spark.read.parquet("/tmp/encrypted").show()
+---+---+-----+
|one|tWo|THREE|
+---+---+-----+
|  1|  2|    3|
+---+---+-----+

This is even more apparent when using a file with more data. For example, I created an encrypted version of tests/src/test/resources/strings.csv with Spark CPU, encrypting the strings column, and here's the results of trying to load that encrypted file on the GPU and CPU:

scala> spark.read.parquet("/tmp/encryptedstrings").show()
+-------+----+                                                                  
|strings|ints|
+-------+----+
|   null|   1|
|   null|   2|
|   null|   3|
|   null|   4|
|   null|   5|
|   null|   6|
|   null|   7|
|   null|   8|
+-------+----+


scala> spark.conf.set("spark.rapids.sql.enabled", "false")

scala> spark.read.parquet("/tmp/encryptedstrings").show()
+-------+----+
|strings|ints|
+-------+----+
|  First|   1|
|    Foo|   2|
|   Bar |   3|
|   null|   4|
|    Baz|   5|
|  Bat  |   6|
|    Bar|   7|
|   Bay |   8|
+-------+----+

@HaoYang670
Copy link
Collaborator

Yes. GPU can run successfully but give wrong answer, so that it might be a little difficult to fall back to CPU because no runtime error is thrown.

@jlowe
Copy link
Member Author

jlowe commented May 24, 2022

The footer is decryptable because we read it on the CPU. If we had dynamic, late fallback to a CPU reader, we should be able to detect from the footer information that there's an encrypted column and fallback in that case (either just for the encrypted columns being read or for the entire read of that file).

@revans2
Copy link
Collaborator

revans2 commented May 24, 2022

If the footer is encrypted then it will have a magic number of PARE instead of PAR1. We might be able to detect that early on and skip those files.

For individual columns I think what you want to look for is

https://github.com/apache/arrow/blob/c5a175dd61b3fa94a38b918c1cc3d7b4ffaefba7/cpp/src/parquet/parquet.thrift#L801

Sadly there appears to be no good way to get access to this from the parsed parquet-mr API. Especially because we have to interact with several different versions of it.

I think the right thing to do is to remove all of the encryption configs from the Hadoop conf that is passed to our Parquet reader and then catch any exceptions that it throws. We can then wrap them in more informative exceptions explaining what is happening.

@tgravescs
Copy link
Collaborator

so I think we do want the check for encryption in 22.06.

Bobby recommended:
I think the right thing to do is to remove all of the encryption configs from the Hadoop conf that is passed to our Parquet reader and then catch any exceptions that it throws. We can then wrap them in more informative exceptions explaining what is happening.

So that we don't read bogus data and the user is thrown a useful error message.

@tgravescs
Copy link
Collaborator

PR is merged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working task Work required that improves the product but is not user facing
Projects
None yet
6 participants