diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 68f49f9442579..8e2cdcb1de826 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -270,11 +270,21 @@ class ParquetFileFormat lazy val footerFileMetaData = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) // Try to push down filters when filter push-down is enabled. val pushed = if (enableParquetFilterPushDown) { val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, - pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseMode) filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` @@ -300,10 +310,6 @@ class ParquetFileFormat None } - val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( - footerFileMetaData.getKeyValueMetaData.get, - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 13dee484bd98a..7a117948d58ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -35,6 +35,8 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate +import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseGregorianToJulianMicros} +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources import org.apache.spark.unsafe.types.UTF8String @@ -48,7 +50,8 @@ class ParquetFilters( pushDownDecimal: Boolean, pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, - caseSensitive: Boolean) { + caseSensitive: Boolean, + datetimeRebaseMode: LegacyBehaviorPolicy.Value) { // A map which contains parquet field name and data type, if predicate push down applies. // // Each key in `nameToParquetField` represents a column; `dots` are used as separators for @@ -124,14 +127,26 @@ class ParquetFilters( private val ParquetTimestampMicrosType = ParquetSchemaType(TIMESTAMP_MICROS, INT64, 0, null) private val ParquetTimestampMillisType = ParquetSchemaType(TIMESTAMP_MILLIS, INT64, 0, null) - private def dateToDays(date: Any): SQLDate = date match { - case d: Date => DateTimeUtils.fromJavaDate(d) - case ld: LocalDate => DateTimeUtils.localDateToDays(ld) + private def dateToDays(date: Any): SQLDate = { + val gregorianDays = date match { + case d: Date => DateTimeUtils.fromJavaDate(d) + case ld: LocalDate => DateTimeUtils.localDateToDays(ld) + } + datetimeRebaseMode match { + case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianDays(gregorianDays) + case _ => gregorianDays + } } - private def timestampToMicros(v: Any): JLong = v match { - case i: Instant => DateTimeUtils.instantToMicros(i) - case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t) + private def timestampToMicros(v: Any): JLong = { + val gregorianMicros = v match { + case i: Instant => DateTimeUtils.instantToMicros(i) + case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t) + } + datetimeRebaseMode match { + case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianMicros(gregorianMicros) + case _ => gregorianMicros + } } private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 3b482b0c8ab62..30902444bce08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -134,11 +134,21 @@ case class ParquetPartitionReaderFactory( lazy val footerFileMetaData = ParquetFileReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) // Try to push down filters when filter push-down is enabled. val pushed = if (enableParquetFilterPushDown) { val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, - pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseMode) filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` @@ -171,9 +181,6 @@ case class ParquetPartitionReaderFactory( if (pushed.isDefined) { ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } - val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( - footerFileMetaData.getKeyValueMetaData.get, - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) val reader = buildReaderFunc( split, file.partitionValues, hadoopAttemptContext, pushed, convertTz, datetimeRebaseMode) reader.initialize(split, hadoopAttemptContext) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala index 2f861356e9499..2bf6d798358de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters} import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, SparkToParquetSchemaConverter} import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -51,8 +52,17 @@ case class ParquetScanBuilder( val isCaseSensitive = sqlConf.caseSensitiveAnalysis val parquetSchema = new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(schema) - val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, - pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + // The rebase mode doesn't matter here because the filters are used to determine + // whether they is convertible. + LegacyBehaviorPolicy.CORRECTED) parquetFilters.convertibleFilters(this.filters).toArray } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 307234dcb86e9..30aedc5bc17cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -42,7 +42,9 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, LEGACY} +import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.{INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.tags.ExtendedSQLTest @@ -70,11 +72,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared protected def createParquetFilters( schema: MessageType, - caseSensitive: Option[Boolean] = None): ParquetFilters = + caseSensitive: Option[Boolean] = None, + datetimeRebaseMode: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED + ): ParquetFilters = new ParquetFilters(schema, conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, conf.parquetFilterPushDownInFilterThreshold, - caseSensitive.getOrElse(conf.caseSensitiveAnalysis)) + caseSensitive.getOrElse(conf.caseSensitiveAnalysis), + datetimeRebaseMode) override def beforeEach(): Unit = { super.beforeEach() @@ -548,62 +553,66 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared def date: Date = Date.valueOf(s) } - val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21") + val data = Seq("1000-01-01", "2018-03-19", "2018-03-20", "2018-03-21") import testImplicits._ Seq(false, true).foreach { java8Api => - withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) { - val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF() - withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) => - implicit val df: DataFrame = inputDF + Seq(CORRECTED, LEGACY).foreach { rebaseMode => + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString) { + val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF() + withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) => + implicit val df: DataFrame = inputDF + + def resultFun(dateStr: String): Any = { + val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr) + fun(parsed) + } - def resultFun(dateStr: String): Any = { - val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr) - fun(parsed) + val dateAttr: Expression = df(colName).expr + assert(df(colName).expr.dataType === DateType) + + checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]], + data.map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(dateAttr === "1000-01-01".date, classOf[Eq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(dateAttr <=> "1000-01-01".date, classOf[Eq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(dateAttr =!= "1000-01-01".date, classOf[NotEq[_]], + Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]], + resultFun("1000-01-01")) + checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]], + resultFun("2018-03-21")) + checkFilterPredicate(dateAttr <= "1000-01-01".date, classOf[LtEq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]], + resultFun("2018-03-21")) + + checkFilterPredicate(Literal("1000-01-01".date) === dateAttr, classOf[Eq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(Literal("1000-01-01".date) <=> dateAttr, classOf[Eq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]], + resultFun("1000-01-01")) + checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]], + resultFun("2018-03-21")) + checkFilterPredicate(Literal("1000-01-01".date) >= dateAttr, classOf[LtEq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]], + resultFun("2018-03-21")) + + checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]], + resultFun("2018-03-21")) + checkFilterPredicate( + dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date, + classOf[Operators.Or], + Seq(Row(resultFun("1000-01-01")), Row(resultFun("2018-03-21")))) } - - val dateAttr: Expression = df(colName).expr - assert(df(colName).expr.dataType === DateType) - - checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]], - data.map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr =!= "2018-03-18".date, classOf[NotEq[_]], - Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]], - resultFun("2018-03-21")) - checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]], - resultFun("2018-03-21")) - - checkFilterPredicate(Literal("2018-03-18".date) === dateAttr, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]], - resultFun("2018-03-21")) - checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr, classOf[LtEq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]], - resultFun("2018-03-21")) - - checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]], - resultFun("2018-03-21")) - checkFilterPredicate( - dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date, - classOf[Operators.Or], - Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21")))) } } } @@ -611,34 +620,35 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared test("filter pushdown - timestamp") { Seq(true, false).foreach { java8Api => - withSQLConf( - SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, - SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") { - // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS + Seq(CORRECTED, LEGACY).foreach { rebaseMode => val millisData = Seq( "1000-06-14 08:28:53.123", "1582-06-15 08:28:53.001", "1900-06-16 08:28:53.0", "2018-06-17 08:28:53.999") - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> - ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) { + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MILLIS.toString) { testTimestampPushdown(millisData, java8Api) } - // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS val microsData = Seq( "1000-06-14 08:28:53.123456", "1582-06-15 08:28:53.123456", "1900-06-16 08:28:53.123456", "2018-06-17 08:28:53.123456") - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> - ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) { + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MICROS.toString) { testTimestampPushdown(microsData, java8Api) } - // spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> - ParquetOutputTimestampType.INT96.toString) { + // INT96 doesn't support pushdown + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> INT96.toString) { import testImplicits._ withTempPath { file => millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF