Skip to content

Commit

Permalink
[SPARK-40918][SQL] Mismatch between FileSourceScanExec and Orc and Pa…
Browse files Browse the repository at this point in the history
…rquetFileFormat on producing columnar output

### What changes were proposed in this pull request?

We move the decision about supporting columnar output based on WSCG one level from ParquetFileFormat / OrcFileFormat up to FileSourceScanExec, and pass it as a new required option for ParquetFileFormat / OrcFileFormat. Now the semantics is as follows:
* `ParquetFileFormat.supportsBatch` and `OrcFileFormat.supportsBatch` returns whether it **can**, not necessarily **will** return columnar output.
* To return columnar output, an option `FileFormat.OPTION_RETURNING_BATCH` needs to be passed to `buildReaderWithPartitionValues` in these two file formats. It should only be set to `true` if `supportsBatch` is also `true`, but it can be set to `false` if we don't want columnar output nevertheless - this way, `FileSourceScanExec` can set it to false when there are more than 100 columsn for WSCG, and `ParquetFileFormat` / `OrcFileFormat` doesn't have to concern itself about WSCG limits.
* To avoid not passing it by accident, this option is made required. Making it required requires updating a few places that use it, but an error resulting from this is very obscure. It's better to fail early and explicitly here.

### Why are the changes needed?

This explains it for `ParquetFileFormat`. `OrcFileFormat` had exactly the same issue.

`java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to org.apache.spark.sql.catalyst.InternalRow` was being thrown because ParquetReader was outputting columnar batches, while FileSourceScanExec expected row output.

The mismatch comes from the fact that `ParquetFileFormat.supportBatch` depends on `WholeStageCodegenExec.isTooManyFields(conf, schema)`, where the threshold is 100 fields.

When this is used in `FileSourceScanExec`:
```
  override lazy val supportsColumnar: Boolean = {
      relation.fileFormat.supportBatch(relation.sparkSession, schema)
  }
```
the `schema` comes from output attributes, which includes extra metadata attributes.

However, inside `ParquetFileFormat.buildReaderWithPartitionValues` it was calculated again as
```
      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = requiredSchema,
        filters = pushedDownFilters,
        options = options,
        hadoopConf = hadoopConf
...
val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
...
val returningBatch = supportBatch(sparkSession, resultSchema)
```

Where `requiredSchema` and `partitionSchema` wouldn't include the metadata columns:
```
FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L, file_path#6388)
FileSourceScanExec: dataSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
FileSourceScanExec: partitionSchema: StructType()
FileSourceScanExec: requiredSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
```

Column like `file_path#6388` are added by the scan, and contain metadata added by the scan, not by the file reader which concerns itself with what is within the file.

### Does this PR introduce _any_ user-facing change?

Not a public API change, but it is now required to pass `FileFormat.OPTION_RETURNING_BATCH` in `options` to `ParquetFileFormat.buildReaderWithPartitionValues`. The only user of this API in Apache Spark is `FileSourceScanExec`.

### How was this patch tested?

Tests added

Closes apache#38397 from juliuszsompolski/SPARK-40918.

Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
juliuszsompolski authored and cloud-fan committed Oct 28, 2022
1 parent 7b4dfa3 commit 77694b4
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,12 @@ case class FileSourceScanExec(
// Note that some vals referring the file-based relation are lazy intentionally
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
override lazy val supportsColumnar: Boolean = {
relation.fileFormat.supportBatch(relation.sparkSession, schema)
val conf = relation.sparkSession.sessionState.conf
// Only output columnar if there is WSCG to read it.
val requiredWholeStageCodegenSettings =
conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf, schema)
requiredWholeStageCodegenSettings &&
relation.fileFormat.supportBatch(relation.sparkSession, schema)
}

private lazy val needsUnsafeRowConversion: Boolean = {
Expand All @@ -535,14 +540,16 @@ case class FileSourceScanExec(
}

lazy val inputRDD: RDD[InternalRow] = {
val options = relation.options +
(FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString)
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
options = options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))

val readRDD = if (bucketedScan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ trait FileFormat {

/**
* Returns whether this format supports returning columnar batch or not.
* If columnar batch output is requested, users shall supply
* FileFormat.OPTION_RETURNING_BATCH -> true
* in relation options when calling buildReaderWithPartitionValues.
* This should only be passed as true if it can actually be supported.
* For ParquetFileFormat and OrcFileFormat, passing this option is required.
*
* TODO: we should just have different traits for the different formats.
*/
Expand Down Expand Up @@ -191,6 +196,14 @@ object FileFormat {

val METADATA_NAME = "_metadata"

/**
* Option to pass to buildReaderWithPartitionValues to return columnar batch output or not.
* For ParquetFileFormat and OrcFileFormat, passing this option is required.
* This should only be passed as true if it can actually be supported, which can be checked
* by calling supportBatch.
*/
val OPTION_RETURNING_BATCH = "returning_batch"

/** Schema of metadata struct that can be produced by every file format. */
val BASE_METADATA_STRUCT: StructType = new StructType()
.add(StructField(FileFormat.FILE_PATH, StringType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -102,8 +101,7 @@ class OrcFileFormat

override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled &&
!WholeStageCodegenExec.isTooManyFields(conf, schema) &&
conf.orcVectorizedReaderEnabled &&
schema.forall(s => OrcUtils.supportColumnarReads(
s.dataType, sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled))
}
Expand All @@ -115,6 +113,18 @@ class OrcFileFormat
true
}

/**
* Build the reader.
*
* @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in options, to indicate whether
* the reader should return row or columnar output.
* If the caller can handle both, pass
* FileFormat.OPTION_RETURNING_BATCH ->
* supportBatch(sparkSession,
* StructType(requiredSchema.fields ++ partitionSchema.fields))
* as the option.
* It should be set to "true" only if this reader can support it.
*/
override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
Expand All @@ -126,9 +136,24 @@ class OrcFileFormat

val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
val capacity = sqlConf.orcVectorizedReaderBatchSize

// Should always be set by FileSourceScanExec creating this.
// Check conf before checking option, to allow working around an issue by changing conf.
val enableVectorizedReader = sqlConf.orcVectorizedReaderEnabled &&
options.get(FileFormat.OPTION_RETURNING_BATCH)
.getOrElse {
throw new IllegalArgumentException(
"OPTION_RETURNING_BATCH should always be set for OrcFileFormat. " +
"To workaround this issue, set spark.sql.orc.enableVectorizedReader=false.")
}
.equals("true")
if (enableVectorizedReader) {
// If the passed option said that we are to return batches, we need to also be able to
// do this based on config and resultSchema.
assert(supportBatch(sparkSession, resultSchema))
}

OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis)

val broadcastedConf =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -82,12 +81,11 @@ class ParquetFileFormat
}

/**
* Returns whether the reader will return the rows as batch or not.
* Returns whether the reader can return the rows as batch or not.
*/
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && conf.wholeStageEnabled &&
!WholeStageCodegenExec.isTooManyFields(conf, schema)
ParquetUtils.isBatchReadSupportedForSchema(conf, schema)
}

override def vectorTypes(
Expand All @@ -110,6 +108,18 @@ class ParquetFileFormat
true
}

/**
* Build the reader.
*
* @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in options, to indicate whether
* the reader should return row or columnar output.
* If the caller can handle both, pass
* FileFormat.OPTION_RETURNING_BATCH ->
* supportBatch(sparkSession,
* StructType(requiredSchema.fields ++ partitionSchema.fields))
* as the option.
* It should be set to "true" only if this reader can support it.
*/
override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
Expand Down Expand Up @@ -161,8 +171,6 @@ class ParquetFileFormat
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
val returningBatch = supportBatch(sparkSession, resultSchema)
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
Expand All @@ -173,6 +181,22 @@ class ParquetFileFormat
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead

// Should always be set by FileSourceScanExec creating this.
// Check conf before checking option, to allow working around an issue by changing conf.
val returningBatch = sparkSession.sessionState.conf.parquetVectorizedReaderEnabled &&
options.get(FileFormat.OPTION_RETURNING_BATCH)
.getOrElse {
throw new IllegalArgumentException(
"OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. " +
"To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.")
}
.equals("true")
if (returningBatch) {
// If the passed option said that we are to return batches, we need to also be able to
// do this based on config and resultSchema.
assert(supportBatch(sparkSession, resultSchema))
}

(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,4 +622,30 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
}
}
}

Seq("parquet", "orc").foreach { format =>
test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in $format") {
// The issue was that ParquetFileFormat would not count the _metadata columns towards
// the WholeStageCodegenExec.isTooManyFields limit, while FileSourceScanExec would,
// resulting in Parquet reader returning columnar output, while scan expected row.
withTempPath { dir =>
sql(s"SELECT ${(1 to 100).map(i => s"id+$i as c$i").mkString(", ")} FROM RANGE(100)")
.write.format(format).save(dir.getAbsolutePath)
(98 to 102).foreach { wscgCols =>
withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> wscgCols.toString) {
// Would fail with
// java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch
// cannot be cast to org.apache.spark.sql.catalyst.InternalRow
sql(
s"""
|SELECT
| ${(1 to 100).map(i => s"sum(c$i)").mkString(", ")},
| max(_metadata.file_path)
|FROM $format.`$dir`""".stripMargin
).collect()
}
}
}
}
}
}

0 comments on commit 77694b4

Please sign in to comment.