From 77694b4673dd2efb5b79d596fbd647af3db5f8a0 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Fri, 28 Oct 2022 20:59:13 +0800 Subject: [PATCH] [SPARK-40918][SQL] Mismatch between FileSourceScanExec and Orc and ParquetFileFormat on producing columnar output ### What changes were proposed in this pull request? We move the decision about supporting columnar output based on WSCG one level from ParquetFileFormat / OrcFileFormat up to FileSourceScanExec, and pass it as a new required option for ParquetFileFormat / OrcFileFormat. Now the semantics is as follows: * `ParquetFileFormat.supportsBatch` and `OrcFileFormat.supportsBatch` returns whether it **can**, not necessarily **will** return columnar output. * To return columnar output, an option `FileFormat.OPTION_RETURNING_BATCH` needs to be passed to `buildReaderWithPartitionValues` in these two file formats. It should only be set to `true` if `supportsBatch` is also `true`, but it can be set to `false` if we don't want columnar output nevertheless - this way, `FileSourceScanExec` can set it to false when there are more than 100 columsn for WSCG, and `ParquetFileFormat` / `OrcFileFormat` doesn't have to concern itself about WSCG limits. * To avoid not passing it by accident, this option is made required. Making it required requires updating a few places that use it, but an error resulting from this is very obscure. It's better to fail early and explicitly here. ### Why are the changes needed? This explains it for `ParquetFileFormat`. `OrcFileFormat` had exactly the same issue. `java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to org.apache.spark.sql.catalyst.InternalRow` was being thrown because ParquetReader was outputting columnar batches, while FileSourceScanExec expected row output. The mismatch comes from the fact that `ParquetFileFormat.supportBatch` depends on `WholeStageCodegenExec.isTooManyFields(conf, schema)`, where the threshold is 100 fields. When this is used in `FileSourceScanExec`: ``` override lazy val supportsColumnar: Boolean = { relation.fileFormat.supportBatch(relation.sparkSession, schema) } ``` the `schema` comes from output attributes, which includes extra metadata attributes. However, inside `ParquetFileFormat.buildReaderWithPartitionValues` it was calculated again as ``` relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, requiredSchema = requiredSchema, filters = pushedDownFilters, options = options, hadoopConf = hadoopConf ... val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) ... val returningBatch = supportBatch(sparkSession, resultSchema) ``` Where `requiredSchema` and `partitionSchema` wouldn't include the metadata columns: ``` FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L, file_path#6388) FileSourceScanExec: dataSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true)) FileSourceScanExec: partitionSchema: StructType() FileSourceScanExec: requiredSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true)) ``` Column like `file_path#6388` are added by the scan, and contain metadata added by the scan, not by the file reader which concerns itself with what is within the file. ### Does this PR introduce _any_ user-facing change? Not a public API change, but it is now required to pass `FileFormat.OPTION_RETURNING_BATCH` in `options` to `ParquetFileFormat.buildReaderWithPartitionValues`. The only user of this API in Apache Spark is `FileSourceScanExec`. ### How was this patch tested? Tests added Closes #38397 from juliuszsompolski/SPARK-40918. Authored-by: Juliusz Sompolski Signed-off-by: Wenchen Fan --- .../sql/execution/DataSourceScanExec.scala | 11 ++++-- .../execution/datasources/FileFormat.scala | 13 +++++++ .../datasources/orc/OrcFileFormat.scala | 33 ++++++++++++++--- .../parquet/ParquetFileFormat.scala | 36 +++++++++++++++---- .../datasources/FileMetadataStructSuite.scala | 26 ++++++++++++++ 5 files changed, 107 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index fdb49bd76746d..d9e3db6903cb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -523,7 +523,12 @@ case class FileSourceScanExec( // Note that some vals referring the file-based relation are lazy intentionally // so that this plan can be canonicalized on executor side too. See SPARK-23731. override lazy val supportsColumnar: Boolean = { - relation.fileFormat.supportBatch(relation.sparkSession, schema) + val conf = relation.sparkSession.sessionState.conf + // Only output columnar if there is WSCG to read it. + val requiredWholeStageCodegenSettings = + conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf, schema) + requiredWholeStageCodegenSettings && + relation.fileFormat.supportBatch(relation.sparkSession, schema) } private lazy val needsUnsafeRowConversion: Boolean = { @@ -535,6 +540,8 @@ case class FileSourceScanExec( } lazy val inputRDD: RDD[InternalRow] = { + val options = relation.options + + (FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString) val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, @@ -542,7 +549,7 @@ case class FileSourceScanExec( partitionSchema = relation.partitionSchema, requiredSchema = requiredSchema, filters = pushedDownFilters, - options = relation.options, + options = options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) val readRDD = if (bucketedScan) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index f7f917d894779..7e920773c048e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -61,6 +61,11 @@ trait FileFormat { /** * Returns whether this format supports returning columnar batch or not. + * If columnar batch output is requested, users shall supply + * FileFormat.OPTION_RETURNING_BATCH -> true + * in relation options when calling buildReaderWithPartitionValues. + * This should only be passed as true if it can actually be supported. + * For ParquetFileFormat and OrcFileFormat, passing this option is required. * * TODO: we should just have different traits for the different formats. */ @@ -191,6 +196,14 @@ object FileFormat { val METADATA_NAME = "_metadata" + /** + * Option to pass to buildReaderWithPartitionValues to return columnar batch output or not. + * For ParquetFileFormat and OrcFileFormat, passing this option is required. + * This should only be passed as true if it can actually be supported, which can be checked + * by calling supportBatch. + */ + val OPTION_RETURNING_BATCH = "returning_batch" + /** Schema of metadata struct that can be produced by every file format. */ val BASE_METADATA_STRUCT: StructType = new StructType() .add(StructField(FileFormat.FILE_PATH, StringType)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 024c458feaff6..6a58513c346da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection -import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -102,8 +101,7 @@ class OrcFileFormat override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { val conf = sparkSession.sessionState.conf - conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled && - !WholeStageCodegenExec.isTooManyFields(conf, schema) && + conf.orcVectorizedReaderEnabled && schema.forall(s => OrcUtils.supportColumnarReads( s.dataType, sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled)) } @@ -115,6 +113,18 @@ class OrcFileFormat true } + /** + * Build the reader. + * + * @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in options, to indicate whether + * the reader should return row or columnar output. + * If the caller can handle both, pass + * FileFormat.OPTION_RETURNING_BATCH -> + * supportBatch(sparkSession, + * StructType(requiredSchema.fields ++ partitionSchema.fields)) + * as the option. + * It should be set to "true" only if this reader can support it. + */ override def buildReaderWithPartitionValues( sparkSession: SparkSession, dataSchema: StructType, @@ -126,9 +136,24 @@ class OrcFileFormat val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) val sqlConf = sparkSession.sessionState.conf - val enableVectorizedReader = supportBatch(sparkSession, resultSchema) val capacity = sqlConf.orcVectorizedReaderBatchSize + // Should always be set by FileSourceScanExec creating this. + // Check conf before checking option, to allow working around an issue by changing conf. + val enableVectorizedReader = sqlConf.orcVectorizedReaderEnabled && + options.get(FileFormat.OPTION_RETURNING_BATCH) + .getOrElse { + throw new IllegalArgumentException( + "OPTION_RETURNING_BATCH should always be set for OrcFileFormat. " + + "To workaround this issue, set spark.sql.orc.enableVectorizedReader=false.") + } + .equals("true") + if (enableVectorizedReader) { + // If the passed option said that we are to return batches, we need to also be able to + // do this based on config and resultSchema. + assert(supportBatch(sparkSession, resultSchema)) + } + OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis) val broadcastedConf = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 5116a6bdb90ce..80b6791d8fae7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -42,7 +42,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector} import org.apache.spark.sql.internal.SQLConf @@ -82,12 +81,11 @@ class ParquetFileFormat } /** - * Returns whether the reader will return the rows as batch or not. + * Returns whether the reader can return the rows as batch or not. */ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { val conf = sparkSession.sessionState.conf - ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && conf.wholeStageEnabled && - !WholeStageCodegenExec.isTooManyFields(conf, schema) + ParquetUtils.isBatchReadSupportedForSchema(conf, schema) } override def vectorTypes( @@ -110,6 +108,18 @@ class ParquetFileFormat true } + /** + * Build the reader. + * + * @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in options, to indicate whether + * the reader should return row or columnar output. + * If the caller can handle both, pass + * FileFormat.OPTION_RETURNING_BATCH -> + * supportBatch(sparkSession, + * StructType(requiredSchema.fields ++ partitionSchema.fields)) + * as the option. + * It should be set to "true" only if this reader can support it. + */ override def buildReaderWithPartitionValues( sparkSession: SparkSession, dataSchema: StructType, @@ -161,8 +171,6 @@ class ParquetFileFormat val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion val capacity = sqlConf.parquetVectorizedReaderBatchSize val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown - // Whole stage codegen (PhysicalRDD) is able to deal with batches directly - val returningBatch = supportBatch(sparkSession, resultSchema) val pushDownDate = sqlConf.parquetFilterPushDownDate val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal @@ -173,6 +181,22 @@ class ParquetFileFormat val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead + // Should always be set by FileSourceScanExec creating this. + // Check conf before checking option, to allow working around an issue by changing conf. + val returningBatch = sparkSession.sessionState.conf.parquetVectorizedReaderEnabled && + options.get(FileFormat.OPTION_RETURNING_BATCH) + .getOrElse { + throw new IllegalArgumentException( + "OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. " + + "To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.") + } + .equals("true") + if (returningBatch) { + // If the passed option said that we are to return batches, we need to also be able to + // do this based on config and resultSchema. + assert(supportBatch(sparkSession, resultSchema)) + } + (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala index 8909fe49aac1c..2c8d72ec60934 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala @@ -622,4 +622,30 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { } } } + + Seq("parquet", "orc").foreach { format => + test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in $format") { + // The issue was that ParquetFileFormat would not count the _metadata columns towards + // the WholeStageCodegenExec.isTooManyFields limit, while FileSourceScanExec would, + // resulting in Parquet reader returning columnar output, while scan expected row. + withTempPath { dir => + sql(s"SELECT ${(1 to 100).map(i => s"id+$i as c$i").mkString(", ")} FROM RANGE(100)") + .write.format(format).save(dir.getAbsolutePath) + (98 to 102).foreach { wscgCols => + withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> wscgCols.toString) { + // Would fail with + // java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch + // cannot be cast to org.apache.spark.sql.catalyst.InternalRow + sql( + s""" + |SELECT + | ${(1 to 100).map(i => s"sum(c$i)").mkString(", ")}, + | max(_metadata.file_path) + |FROM $format.`$dir`""".stripMargin + ).collect() + } + } + } + } + } }