Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] java.lang.NullPointerException when exporting delta table #1197

Closed
martinstuder opened this issue Nov 24, 2020 · 17 comments · Fixed by #1307
Closed

[BUG] java.lang.NullPointerException when exporting delta table #1197

martinstuder opened this issue Nov 24, 2020 · 17 comments · Fixed by #1307
Assignees
Labels
bug Something isn't working P0 Must have for release

Comments

@martinstuder
Copy link

Describe the bug

When saving a data frame using the delta format, I get the the following exception:

Py4JJavaError: An error occurred while calling o431.save.
: java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:201)
	at org.apache.spark.sql.execution.collect.DatasetSparkResult.toArray(DatasetSparkResult.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3714)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2954)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3702)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:116)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:249)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:101)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:836)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:199)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3700)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2954)
	at com.databricks.sql.transaction.tahoe.SnapshotEdge.$anonfun$x$4$1(SnapshotEdge.scala:102)
	at com.databricks.sql.acl.CheckPermissions$.trusted(CheckPermissions.scala:1116)
	at com.databricks.sql.transaction.tahoe.SnapshotEdge.x$4$lzycompute(SnapshotEdge.scala:90)
	at com.databricks.sql.transaction.tahoe.SnapshotEdge.x$4(SnapshotEdge.scala:90)
	at com.databricks.sql.transaction.tahoe.SnapshotEdge._metadata$lzycompute(SnapshotEdge.scala:90)
	at com.databricks.sql.transaction.tahoe.SnapshotEdge._metadata(SnapshotEdge.scala:90)
	at com.databricks.sql.transaction.tahoe.SnapshotEdge.metadata(SnapshotEdge.scala:143)
	at com.databricks.sql.transaction.tahoe.SnapshotEdge.toString(SnapshotEdge.scala:233)
	at java.lang.String.valueOf(String.java:2994)
	at java.lang.StringBuilder.append(StringBuilder.java:131)
	at com.databricks.sql.transaction.tahoe.Snapshot.$anonfun$new$1(Snapshot.scala:297)
	at com.databricks.sql.transaction.tahoe.Snapshot.$anonfun$logInfo$1(Snapshot.scala:274)
	at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
	at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
	at com.databricks.sql.transaction.tahoe.Snapshot.logInfo(Snapshot.scala:274)
	at com.databricks.sql.transaction.tahoe.Snapshot.<init>(Snapshot.scala:297)
	at com.databricks.sql.transaction.tahoe.SnapshotEdge.<init>(SnapshotEdge.scala:67)
	at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.createSnapshot(SnapshotManagementEdge.scala:85)
	at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.$anonfun$update$3(SnapshotManagementEdge.scala:111)
	at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:377)
	at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:363)
	at com.databricks.spark.util.SparkDatabricksProgressReporter$.withStatusCode(ProgressReporter.scala:34)
	at com.databricks.sql.transaction.tahoe.util.DeltaProgressReporterEdge.withStatusCode(DeltaProgressReporterEdge.scala:30)
	at com.databricks.sql.transaction.tahoe.util.DeltaProgressReporterEdge.withStatusCode$(DeltaProgressReporterEdge.scala:25)
	at com.databricks.sql.transaction.tahoe.DeltaLog.withStatusCode(DeltaLog.scala:65)
	at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.$anonfun$update$2(SnapshotManagementEdge.scala:92)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$4(UsageLogging.scala:431)
	at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:239)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:234)
	at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:231)
	at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:18)
	at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:276)
	at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:269)
	at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:18)
	at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:412)
	at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:338)
	at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:18)
	at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:55)
	at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:98)
	at com.databricks.spark.util.UsageLogger.recordOperation(UsageLogger.scala:71)
	at com.databricks.spark.util.UsageLogger.recordOperation$(UsageLogger.scala:58)
	at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:67)
	at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:346)
	at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:325)
	at com.databricks.sql.transaction.tahoe.DeltaLog.recordOperation(DeltaLog.scala:65)
	at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:108)
	at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:94)
	at com.databricks.sql.transaction.tahoe.DeltaLog.recordDeltaOperation(DeltaLog.scala:65)
	at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.$anonfun$update$1(SnapshotManagementEdge.scala:92)
	at com.databricks.sql.transaction.tahoe.DeltaLog.lockInterruptibly(DeltaLog.scala:167)
	at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.update(SnapshotManagementEdge.scala:91)
	at com.databricks.sql.transaction.tahoe.SnapshotManagementEdge.update$(SnapshotManagementEdge.scala:90)
	at com.databricks.sql.transaction.tahoe.DeltaLog.update(DeltaLog.scala:65)
	at com.databricks.sql.transaction.tahoe.OptimisticTransactionImplEdge.$anonfun$doCommit$1(OptimisticTransactionImplEdge.scala:304)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at com.databricks.sql.transaction.tahoe.DeltaLog.lockInterruptibly(DeltaLog.scala:167)
	at com.databricks.sql.transaction.tahoe.OptimisticTransactionImplEdge.doCommit(OptimisticTransactionImplEdge.scala:273)
	at com.databricks.sql.transaction.tahoe.OptimisticTransactionImplEdge.doCommit$(OptimisticTransactionImplEdge.scala:269)
	at com.databricks.sql.transaction.tahoe.OptimisticTransaction.doCommit(OptimisticTransaction.scala:84)
	at com.databricks.sql.transaction.tahoe.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$2(OptimisticTransaction.scala:536)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$4(UsageLogging.scala:431)
	at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:239)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:234)
	at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:231)
	at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:18)
	at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:276)
	at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:269)
	at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:18)
	at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:412)
	at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:338)
	at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:18)
	at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:55)
	at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:98)
	at com.databricks.spark.util.UsageLogger.recordOperation(UsageLogger.scala:71)
	at com.databricks.spark.util.UsageLogger.recordOperation$(UsageLogger.scala:58)
	at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:67)
	at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:346)
	at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:325)
	at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:84)
	at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:108)
	at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:94)
	at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:84)
	at com.databricks.sql.transaction.tahoe.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$1(OptimisticTransaction.scala:532)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at com.databricks.sql.transaction.tahoe.DeltaLog.lockInterruptibly(DeltaLog.scala:167)
	at com.databricks.sql.transaction.tahoe.OptimisticTransactionImpl.doCommitRetryIteratively(OptimisticTransaction.scala:527)
	at com.databricks.sql.transaction.tahoe.OptimisticTransactionImpl.doCommitRetryIteratively$(OptimisticTransaction.scala:524)
	at com.databricks.sql.transaction.tahoe.OptimisticTransaction.doCommitRetryIteratively(OptimisticTransaction.scala:84)
	at com.databricks.sql.transaction.tahoe.OptimisticTransactionImplEdge.$anonfun$commit$1(OptimisticTransactionImplEdge.scala:222)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$4(UsageLogging.scala:431)
	at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:239)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:234)
	at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:231)
	at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:18)
	at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:276)
	at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:269)
	at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:18)
	at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:412)
	at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:338)
	at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:18)
	at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:55)
	at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:98)
	at com.databricks.spark.util.UsageLogger.recordOperation(UsageLogger.scala:71)
	at com.databricks.spark.util.UsageLogger.recordOperation$(UsageLogger.scala:58)
	at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:67)
	at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:346)
	at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:325)
	at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:84)
	at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:108)
	at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:94)
	at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:84)
	at com.databricks.sql.transaction.tahoe.OptimisticTransactionImplEdge.commit(OptimisticTransactionImplEdge.scala:168)
	at com.databricks.sql.transaction.tahoe.OptimisticTransactionImplEdge.commit$(OptimisticTransactionImplEdge.scala:164)
	at com.databricks.sql.transaction.tahoe.OptimisticTransaction.commit(OptimisticTransaction.scala:84)
	at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.$anonfun$run$2(WriteIntoDelta.scala:74)
	at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.$anonfun$run$2$adapted(WriteIntoDelta.scala:70)
	at com.databricks.sql.transaction.tahoe.DeltaLog.withNewTransaction(DeltaLog.scala:203)
	at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:70)
	at com.databricks.sql.acl.CheckPermissions$.trusted(CheckPermissions.scala:1116)
	at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.run(WriteIntoDelta.scala:69)
	at com.databricks.sql.transaction.tahoe.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:155)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:91)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$3(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:187)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:158)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:157)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:999)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:116)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:249)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:101)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:836)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:199)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:999)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:437)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:294)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:251)
	at java.lang.Thread.run(Thread.java:748)

Steps/Code to reproduce bug
I haven't been able to reduce the issue to a small reproducible example yet. The Python stacktrace highlights the following location:

df.write.format('delta').partitionBy('aaa').save(path)

Interestingly, the Spark UI does not show any failed jobs or stages and the actual data write seems to have happened. Judging from the above stacktrace the delta log/metadata may be corrupt though since the issue seems to happen when writing the delta log.

Expected behavior
Data frame export to delta succeeds without exception.

Environment details

  • Environment location: Azure Databricks 7.3 LTS using Databricks Container Services docker image with CUDA 10.2
  • Recent rapids-4-spark 0.3.0-SNAPSHOT
$ git rev-parse HEAD
b2f03e0eae642c7999122c00127cd045627520dc
  • Spark configuration settings
"spark.plugins": "com.nvidia.spark.SQLPlugin",
"spark.sql.parquet.filterPushdown": "false",
"spark.rapids.sql.incompatibleOps.enabled": "true",
"spark.rapids.memory.pinnedPool.size": "2G",
"spark.task.resource.gpu.amount": 0.1,
"spark.rapids.sql.concurrentGpuTasks": 2,
"spark.locality.wait": "0s",
"spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": 1,
"spark.kryo.registrator": "com.nvidia.spark.rapids.GpuKryoRegistrator"
"spark.databricks.delta.optimizeWrite.enabled": "false",
"spark.databricks.delta.autoCompact.enabled": "false",

Additional context
There seems to be no issue with rapids-4-spark 0.2.0.

@martinstuder martinstuder added ? - Needs Triage Need team to review and classify bug Something isn't working labels Nov 24, 2020
@sameerz sameerz removed the ? - Needs Triage Need team to review and classify label Nov 24, 2020
@tgravescs tgravescs added the P0 Must have for release label Nov 25, 2020
@tgravescs
Copy link
Collaborator

thanks @martinstuder for reporting we haven't seen this before so I'll investigate.

@tgravescs
Copy link
Collaborator

tgravescs commented Dec 2, 2020

@martinstuder I'm trying to reproduce this, but haven't been able to yet.

df.write.format('delta').partitionBy('aaa').save(path)

Was this on the first write to this path? The table didn't exist previously? can you tell me the schema for the partitionBy column - is it just a timestamp?

Based on your comment that no jobs failed, this exception happened on the driver side then.

can you reproduce this or was it just a one time thing?

@tgravescs
Copy link
Collaborator

at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:201)
at org.apache.spark.sql.execution.collect.DatasetSparkResult.toArray(DatasetSparkResult.scala:61)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3714)

Looking at the last few lines here it seems its just trying to collect it back to the driver but the deserializer doesn't like the what it got back. the DatasetSparkResult is a databricks specific class which we don't have source for. Would you be able to perhaps tell me the last operation before the write? I'm wondering if that operation produced results that doesn't match what databricks expected.

@martinstuder
Copy link
Author

@tgravescs Reconstructing the code path it basically comes down to the following:

df1 = spark_session.read.format('parquet').schema(schema).load(path1)
df2 = spark_session.read.format('parquet').schema(schema).load(path2)
df = df1 \
    .withColumn('extra', F.lit(0).cast('byte')) \
    .unionByName(df2.withColumn('extra', F.lit(1).cast('byte')))
df.write.format('delta').partitionBy('aaa').save(path)

It's basically unioning two source data frames from parquet, adding an extra discriminator column to both and then writing to delta with a different partitioning scheme. The issue happened on the first write to path with no table existing there. The partition-by column (aaa) is an integer.

We've recently changed things to have the source data frames in delta as well where we read them without specifying a schema explicitly. I'll check whether this has any impact.

@martinstuder
Copy link
Author

I was just able to reproduce this with delta as the source for input data frames:

Py4JJavaError: An error occurred while calling o409.save.
: java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:201)
	at org.apache.spark.sql.execution.collect.DatasetSparkResult.toArray(DatasetSparkResult.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3714)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2954)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3702)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:116)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:249)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:101)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:836)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:199)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3700)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2954)
	at com.databricks.sql.transaction.tahoe.SnapshotEdge.$anonfun$x$4$1(SnapshotEdge.scala:102)
	at com.databricks.sql.acl.CheckPermissions$.trusted(CheckPermissions.scala:1116)
	at com.databricks.sql.transaction.tahoe.SnapshotEdge.x$4$lzycompute(SnapshotEdge.scala:90)
	at com.databricks.sql.transaction.tahoe.SnapshotEdge.x$4(SnapshotEdge.scala:90)
	at com.databricks.sql.transaction.tahoe.SnapshotEdge._metadata$lzycompute(SnapshotEdge.scala:90)
	at com.databricks.sql.transaction.tahoe.SnapshotEdge._metadata(SnapshotEdge.scala:90)
	at com.databricks.sql.transaction.tahoe.SnapshotEdge.metadata(SnapshotEdge.scala:143)
	[...]

Below is the last portion of the driver log4j output. The first three "Finished task" lines are from the write stage:

20/12/02 19:35:24 INFO TaskSetManager: Finished task 245.0 in stage 2.0 (TID 322) in 241069 ms on 10.50.24.11 (executor 0) (301/303)
20/12/02 19:35:27 INFO TaskSetManager: Finished task 261.0 in stage 2.0 (TID 338) in 243903 ms on 10.50.24.7 (executor 6) (302/303)
20/12/02 19:35:37 INFO TaskSetManager: Finished task 279.0 in stage 2.0 (TID 356) in 253963 ms on 10.50.24.6 (executor 7) (303/303)
20/12/02 19:35:37 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 3591389086745350677
20/12/02 19:35:37 INFO DAGScheduler: ResultStage 2 (write at TransactionalWriteEdge.scala:183) finished in 461.573 s
20/12/02 19:35:37 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
20/12/02 19:35:37 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
20/12/02 19:35:37 INFO DAGScheduler: Job 2 finished: write at TransactionalWriteEdge.scala:183, took 461.591995 s
20/12/02 19:35:37 INFO FileFormatWriter: Write Job cf9aeeb1-fdd0-49d3-94c9-9c9bf4a39dc5 committed.
20/12/02 19:35:37 INFO FileFormatWriter: Finished processing stats for write job cf9aeeb1-fdd0-49d3-94c9-9c9bf4a39dc5.
20/12/02 19:35:37 INFO OptimisticTransaction: [tableId=2c4f3ede-e0b7-4eb3-afc6-c1e1389d121c] Attempting to commit version 0 with 158343 actions with WriteSerializable isolation level
20/12/02 19:35:38 INFO AzureBlobFileSystem: FS_OP_CREATE FILE[/aaa/_delta_log/__tmp_path_dir/.00000000000000000000.json.5daef307-543a-4a02-8426-a852a621641a.tmp] Creating output stream; permission: rw-r--r--, overwrite: false, bufferSize: 33554432
20/12/02 19:35:38 INFO RetryTolerableRenameFSDataOutputStream: Writing atomically to dbfs:/mnt/output/aaa/_delta_log/00000000000000000000.json using temp file dbfs:/mnt/output/aaa/_delta_log/__tmp_path_dir/.00000000000000000000.json.5daef307-543a-4a02-8426-a852a621641a.tmp
20/12/02 19:35:38 INFO AbfsOutputStream: FS_OP_CREATE FILE[/aaa/_delta_log/__tmp_path_dir/.00000000000000000000.json.5daef307-543a-4a02-8426-a852a621641a.tmp] Closing stream; size: 95766506
20/12/02 19:35:39 INFO AbfsOutputStream: FS_OP_CREATE FILE[/aaa/_delta_log/__tmp_path_dir/.00000000000000000000.json.5daef307-543a-4a02-8426-a852a621641a.tmp] Upload complete; size: 95766506
20/12/02 19:35:39 INFO AzureBlobFileSystem: FS_OP_RENAME [/aaa/_delta_log/__tmp_path_dir/.00000000000000000000.json.5daef307-543a-4a02-8426-a852a621641a.tmp] to [/aaa/_delta_log/00000000000000000000.json] Starting rename. Issuing rename operation.
20/12/02 19:35:39 INFO AzureBlobFileSystem: FS_OP_RENAME [/aaa/_delta_log/__tmp_path_dir/.00000000000000000000.json.5daef307-543a-4a02-8426-a852a621641a.tmp] to [/aaa/_delta_log/00000000000000000000.json] Rename successful.
20/12/02 19:35:39 INFO RetryTolerableRenameFSDataOutputStream: Renamed temp file dbfs:/mnt/output/aaa/_delta_log/__tmp_path_dir/.00000000000000000000.json.5daef307-543a-4a02-8426-a852a621641a.tmp to dbfs:/mnt/output/aaa/_delta_log/00000000000000000000.json
20/12/02 19:35:39 INFO DeltaLog: Loading version 0.
20/12/02 19:35:39 INFO DeltaLogFileIndex: Created DeltaLogFileIndex(JSON, numFilesInSegment: 1, totalFileSize: 95766506)
20/12/02 19:35:39 INFO FileSourceStrategy: Pruning directories with: 
20/12/02 19:35:39 INFO FileSourceStrategy: Pushed Filters: Or(IsNotNull(protocol),IsNotNull(metaData))
20/12/02 19:35:39 INFO FileSourceStrategy: Post-Scan Filters: (isnotnull(protocol#417) OR isnotnull(metaData#416))
20/12/02 19:35:39 INFO FileSourceStrategy: Output Data Schema: struct<metaData: struct<id: string, name: string, description: string, format: struct<provider: string, options: map<string,string>>, schemaString: string ... 6 more fields>, protocol: struct<minReaderVersion: int, minWriterVersion: int>>
20/12/02 19:35:39 INFO CodeGenerator: Code generated in 29.995099 ms
20/12/02 19:35:39 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 351.8 KiB, free 15.2 GiB)
20/12/02 19:35:39 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 19.6 KiB, free 15.2 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 10.50.24.8:41649 (size: 19.6 KiB, free: 15.2 GiB)
20/12/02 19:35:39 INFO SparkContext: Created broadcast 6 from execute at GpuRowToColumnarExec.scala:811
20/12/02 19:35:39 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
20/12/02 19:35:39 INFO SparkContext: Starting job: collect at SnapshotEdge.scala:102
20/12/02 19:35:39 INFO DAGScheduler: Got job 3 (collect at SnapshotEdge.scala:102) with 23 output partitions
20/12/02 19:35:39 INFO DAGScheduler: Final stage: ResultStage 3 (collect at SnapshotEdge.scala:102)
20/12/02 19:35:39 INFO DAGScheduler: Parents of final stage: List()
20/12/02 19:35:39 INFO DAGScheduler: Missing parents: List()
20/12/02 19:35:39 INFO DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[28] at mapPartitions at GpuColumnarToRowExec.scala:294), which has no missing parents
20/12/02 19:35:39 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 33.3 KiB, free 15.2 GiB)
20/12/02 19:35:39 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 9.9 KiB, free 15.2 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 10.50.24.8:41649 (size: 9.9 KiB, free: 15.2 GiB)
20/12/02 19:35:39 INFO SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:1547
20/12/02 19:35:39 INFO DAGScheduler: Submitting 23 missing tasks from ResultStage 3 (MapPartitionsRDD[28] at mapPartitions at GpuColumnarToRowExec.scala:294) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
20/12/02 19:35:39 INFO TaskSchedulerImpl: Adding task set 3.0 with 23 tasks
20/12/02 19:35:39 INFO TaskSetManager: Jars for session None: Map()
20/12/02 19:35:39 INFO TaskSetManager: Files for session None: Map()
20/12/02 19:35:39 INFO FairSchedulableBuilder: Added task set TaskSet_3.0 tasks to pool 3591389086745350677
20/12/02 19:35:39 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 380, 10.50.24.5, executor 8, partition 0, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID 381, 10.50.24.14, executor 1, partition 1, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 2.0 in stage 3.0 (TID 382, 10.50.24.7, executor 6, partition 2, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 3.0 in stage 3.0 (TID 383, 10.50.24.12, executor 9, partition 3, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 4.0 in stage 3.0 (TID 384, 10.50.24.9, executor 4, partition 4, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 5.0 in stage 3.0 (TID 385, 10.50.24.6, executor 7, partition 5, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 6.0 in stage 3.0 (TID 386, 10.50.24.4, executor 3, partition 6, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 7.0 in stage 3.0 (TID 387, 10.50.24.10, executor 2, partition 7, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 8.0 in stage 3.0 (TID 388, 10.50.24.13, executor 5, partition 8, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 9.0 in stage 3.0 (TID 389, 10.50.24.11, executor 0, partition 9, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 10.0 in stage 3.0 (TID 390, 10.50.24.5, executor 8, partition 10, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 11.0 in stage 3.0 (TID 391, 10.50.24.14, executor 1, partition 11, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 12.0 in stage 3.0 (TID 392, 10.50.24.7, executor 6, partition 12, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 13.0 in stage 3.0 (TID 393, 10.50.24.12, executor 9, partition 13, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 14.0 in stage 3.0 (TID 394, 10.50.24.9, executor 4, partition 14, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 15.0 in stage 3.0 (TID 395, 10.50.24.6, executor 7, partition 15, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 16.0 in stage 3.0 (TID 396, 10.50.24.4, executor 3, partition 16, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 17.0 in stage 3.0 (TID 397, 10.50.24.10, executor 2, partition 17, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 18.0 in stage 3.0 (TID 398, 10.50.24.13, executor 5, partition 18, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 19.0 in stage 3.0 (TID 399, 10.50.24.11, executor 0, partition 19, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 20.0 in stage 3.0 (TID 400, 10.50.24.5, executor 8, partition 20, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 21.0 in stage 3.0 (TID 401, 10.50.24.14, executor 1, partition 21, PROCESS_LOCAL)
20/12/02 19:35:39 INFO TaskSetManager: Starting task 22.0 in stage 3.0 (TID 402, 10.50.24.7, executor 6, partition 22, PROCESS_LOCAL)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 10.50.24.14:41817 (size: 9.9 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 10.50.24.5:37389 (size: 9.9 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 10.50.24.9:37481 (size: 9.9 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 10.50.24.7:41215 (size: 9.9 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 10.50.24.12:35131 (size: 9.9 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 10.50.24.11:41067 (size: 9.9 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 10.50.24.13:40297 (size: 9.9 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 10.50.24.6:42369 (size: 9.9 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 10.50.24.10:36759 (size: 9.9 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 10.50.24.4:45565 (size: 9.9 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 10.50.24.13:40297 (size: 19.6 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 10.50.24.14:41817 (size: 19.6 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 10.50.24.7:41215 (size: 19.6 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 10.50.24.11:41067 (size: 19.6 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 10.50.24.6:42369 (size: 19.6 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 10.50.24.10:36759 (size: 19.6 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 10.50.24.9:37481 (size: 19.6 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 10.50.24.4:45565 (size: 19.6 KiB, free: 66.3 GiB)
20/12/02 19:35:39 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 10.50.24.5:37389 (size: 19.6 KiB, free: 66.3 GiB)
20/12/02 19:35:40 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 10.50.24.12:35131 (size: 19.6 KiB, free: 66.3 GiB)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 18.0 in stage 3.0 (TID 398) in 4693 ms on 10.50.24.13 (executor 5) (1/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 8.0 in stage 3.0 (TID 388) in 4695 ms on 10.50.24.13 (executor 5) (2/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 17.0 in stage 3.0 (TID 397) in 4742 ms on 10.50.24.10 (executor 2) (3/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 7.0 in stage 3.0 (TID 387) in 4743 ms on 10.50.24.10 (executor 2) (4/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 12.0 in stage 3.0 (TID 392) in 4769 ms on 10.50.24.7 (executor 6) (5/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 22.0 in stage 3.0 (TID 402) in 4768 ms on 10.50.24.7 (executor 6) (6/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 2.0 in stage 3.0 (TID 382) in 4769 ms on 10.50.24.7 (executor 6) (7/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 9.0 in stage 3.0 (TID 389) in 4817 ms on 10.50.24.11 (executor 0) (8/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 19.0 in stage 3.0 (TID 399) in 4816 ms on 10.50.24.11 (executor 0) (9/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 4.0 in stage 3.0 (TID 384) in 4821 ms on 10.50.24.9 (executor 4) (10/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 14.0 in stage 3.0 (TID 394) in 4821 ms on 10.50.24.9 (executor 4) (11/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 16.0 in stage 3.0 (TID 396) in 4862 ms on 10.50.24.4 (executor 3) (12/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 6.0 in stage 3.0 (TID 386) in 4862 ms on 10.50.24.4 (executor 3) (13/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 10.0 in stage 3.0 (TID 390) in 4942 ms on 10.50.24.5 (executor 8) (14/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 20.0 in stage 3.0 (TID 400) in 4941 ms on 10.50.24.5 (executor 8) (15/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 380) in 4947 ms on 10.50.24.5 (executor 8) (16/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 3.0 in stage 3.0 (TID 383) in 4999 ms on 10.50.24.12 (executor 9) (17/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 13.0 in stage 3.0 (TID 393) in 4999 ms on 10.50.24.12 (executor 9) (18/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 381) in 5035 ms on 10.50.24.14 (executor 1) (19/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 21.0 in stage 3.0 (TID 401) in 5034 ms on 10.50.24.14 (executor 1) (20/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 11.0 in stage 3.0 (TID 391) in 5035 ms on 10.50.24.14 (executor 1) (21/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 5.0 in stage 3.0 (TID 385) in 5296 ms on 10.50.24.6 (executor 7) (22/23)
20/12/02 19:35:44 INFO TaskSetManager: Finished task 15.0 in stage 3.0 (TID 395) in 5296 ms on 10.50.24.6 (executor 7) (23/23)
20/12/02 19:35:44 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 3591389086745350677
20/12/02 19:35:44 INFO DAGScheduler: ResultStage 3 (collect at SnapshotEdge.scala:102) finished in 5.319 s
20/12/02 19:35:44 INFO DAGScheduler: Job 3 is finished. Cancelling potential speculative or zombie tasks for this job
20/12/02 19:35:44 INFO TaskSchedulerImpl: Killing all running tasks in stage 3: Stage finished
20/12/02 19:35:44 INFO DAGScheduler: Job 3 finished: collect at SnapshotEdge.scala:102, took 5.328319 s
20/12/02 19:35:45 INFO AzureBlobFileSystem: FS_OP_DELETE  [/checkpoints_d398a706784a44b7b08488ec65e0f2ed] Starting delete - recursive true
20/12/02 19:35:45 INFO AzureBlobFileSystem: FS_OP_DELETE  [/checkpoints_d398a706784a44b7b08488ec65e0f2ed] Delete successful.
20/12/02 19:35:45 INFO ProgressReporter$: Removed result fetcher for 3591389086745350677_8580777726221396950_job-23307-run-1-action-23295
20/12/02 19:35:46 INFO DriverCorral$: Cleaning the wrapper ReplId-31d72-d58ad-abd21-5 (currently in status Idle(ReplId-31d72-d58ad-abd21-5))
20/12/02 19:35:46 INFO DriverCorral$: sending shutdown signal for REPL ReplId-31d72-d58ad-abd21-5
20/12/02 19:35:46 INFO PythonDriverLocal$RedirectThread: Python RedirectThread exit
20/12/02 19:35:46 INFO PythonDriverLocal$RedirectThread: Python RedirectThread exit
20/12/02 19:35:46 INFO PythonDriverLocal$Watchdog: Python shell exit code: 143
20/12/02 19:35:46 INFO PythonDriverLocal$Watchdog: Strace is not enabled. To turn it on please set the spark conf spark.databricks.daemon.driver.python.withStrace to be true.
20/12/02 19:35:46 INFO DriverCorral$: sending the interrupt signal for REPL ReplId-31d72-d58ad-abd21-5
20/12/02 19:35:46 INFO DriverCorral$: waiting for localThread to stop for REPL ReplId-31d72-d58ad-abd21-5
20/12/02 19:35:46 INFO DriverCorral$: ReplId-31d72-d58ad-abd21-5 successfully discarded

image

When checking the delta table output, I can see the data files are there. What is strange though is the _delta_log directory:
image

The metadata file (*.json) is fairly large and the *.crc file is missing. Usually, I would expect the metadata file to be in the low MB range, together with a *.crc file:
image

The size of the metadata file may be explained by the fact that many more smaller parquet files are being written, together with appropriate column stats.

@martinstuder
Copy link
Author

Just a guess: is it trying to read the metadata (*.json) back in to generate some log/diagnostic message but failing with the generated metadata?

@tgravescs
Copy link
Collaborator

thanks for the update, I'll give it another try to reproduce using delta file input and see what is going on. I suspect you are right about the metadata or its recording something like input version or something since it knows its delta format

@martinstuder
Copy link
Author

Disabling parquet input/output acceleration also doesn't help ("spark.rapids.sql.format.parquet.enabled": "false")

@tgravescs
Copy link
Collaborator

yeah its likely your write wasn't happening on the GPU anyway at least my testing with writing to delta uses SaveIntoDataSourceCommand com.databricks.sql.transaction.tahoe.sources.DeltaDataSource which GPU doesn't know how to do delta format. I'm still trying to reproduce.

Are any of the column types complex types - like array? Or is it all Long, Int, String types. We are adding some support for complex types in 0.3 and since you said this worked with 0.2 I'm wondering if something there is not complete yet or causing incompatibilityes with databricks.

@tgravescs
Copy link
Collaborator

Sorry missed it earlier, I think its actually in your log and it looks like you are using structs:

20/12/02 19:35:39 INFO FileSourceStrategy: Output Data Schema: struct<metaData: struct<id: string, name: string, description: string, format: struct<provider: string, options: map<string,string>>, schemaString: string ... 6 more fields>, protocol: struct<minReaderVersion: int, minWriterVersion: int>>

@martinstuder
Copy link
Author

The actual data frames are all just int, long, double and byte. The schema you are referring to seems to be the schema that delta uses for its internal metadata.

@tgravescs
Copy link
Collaborator

@martinstuder unfortunately I still can't reproduce, would you be able to try to get a small reproducible case? I"m not sure if its the data or perhaps something in delta configuration or setup. I tried both our latest 0,3,0 and went back to the commit you had.

I've tried a bunch of different things. Perhaps you could try this sample I found on databricks to see if it reproduces for you?

I tried with both saveAsTable and just save/load so feel free to change those below and update the paths as needed.

from pyspark.sql.functions import *

events = spark.read.json("/databricks-datasets/structured-streaming/events/")
eventsWithDate = events.filter(col("time") < 1469504216).withColumn("date_col", from_unixtime(col("time")))
spark.conf.set("spark.rapids.sql.format.parquet.enabled", "false")
eventsWithDate.write.format("delta").saveAsTable("deltaeventsPartitioned2Table")
eventsWithDate.write.format("delta").saveAsTable("deltaeventsPartitioned3Table")

from pyspark.sql import functions as F
df1 = spark.read.format('delta').load("/user/hive/warehouse/deltaeventspartitioned2table")
df2 = spark.read.format('delta').load("/user/hive/warehouse/deltaeventspartitioned3table")

df = df1 \
    .withColumn('extra', F.lit(0).cast('byte')) \
    .unionByName(df2.withColumn('extra', F.lit(1).cast('byte')))
df.write.format("delta").partitionBy("time").saveAsTable("deltaeventsPartitioned2TableWriteAgain3")

Also just to verify, if you use the 0.2.0 build with exact same setup you don't see the error?
Do you see this error every run?

@tgravescs
Copy link
Collaborator

also if its possible could you send me the explain output for the failure? You can go to the SQL tab in UI and at the bottom is the text representation

@tgravescs
Copy link
Collaborator

@martinstuder Sorry you can ignore the above questions I was finally able to reproduce it, will investigate more tomorrow.

@tgravescs
Copy link
Collaborator

I'm working on a workaround for this where we have the delta log queries use the CPU.

@tgravescs
Copy link
Collaborator

@martinstuder I checked in a fix for this would you be able to try it out? You would have to build with the latest branch.

@martinstuder
Copy link
Author

Hi @tgravescs, I can confirm that I can successfully export delta tables using a current build of the 0.3 branch. Thanks a lot for your efforts!

tgravescs pushed a commit to tgravescs/spark-rapids that referenced this issue Nov 30, 2023
…IDIA#1197)

Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P0 Must have for release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants