diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 48e2e6e57d838..ee229a334fdba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -266,11 +266,21 @@ class ParquetFileFormat lazy val footerFileMetaData = ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + datetimeRebaseModeInRead) // Try to push down filters when filter push-down is enabled. val pushed = if (enableParquetFilterPushDown) { val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, - pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseMode) filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` @@ -296,9 +306,6 @@ class ParquetFileFormat None } - val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( - footerFileMetaData.getKeyValueMetaData.get, - datetimeRebaseModeInRead) val int96RebaseMode = DataSourceUtils.int96RebaseMode( footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 9999df72b3c64..5afb73657cd35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -35,6 +35,8 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseGregorianToJulianMicros} +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources import org.apache.spark.unsafe.types.UTF8String @@ -48,7 +50,8 @@ class ParquetFilters( pushDownDecimal: Boolean, pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, - caseSensitive: Boolean) { + caseSensitive: Boolean, + datetimeRebaseMode: LegacyBehaviorPolicy.Value) { // A map which contains parquet field name and data type, if predicate push down applies. // // Each key in `nameToParquetField` represents a column; `dots` are used as separators for @@ -129,14 +132,26 @@ class ParquetFilters( private val ParquetTimestampMillisType = ParquetSchemaType(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS), INT64, 0) - private def dateToDays(date: Any): Int = date match { - case d: Date => DateTimeUtils.fromJavaDate(d) - case ld: LocalDate => DateTimeUtils.localDateToDays(ld) + private def dateToDays(date: Any): Int = { + val gregorianDays = date match { + case d: Date => DateTimeUtils.fromJavaDate(d) + case ld: LocalDate => DateTimeUtils.localDateToDays(ld) + } + datetimeRebaseMode match { + case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianDays(gregorianDays) + case _ => gregorianDays + } } - private def timestampToMicros(v: Any): JLong = v match { - case i: Instant => DateTimeUtils.instantToMicros(i) - case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t) + private def timestampToMicros(v: Any): JLong = { + val gregorianMicros = v match { + case i: Instant => DateTimeUtils.instantToMicros(i) + case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t) + } + datetimeRebaseMode match { + case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianMicros(gregorianMicros) + case _ => gregorianMicros + } } private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 78076040e7cf5..058669b0937fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -133,11 +133,21 @@ case class ParquetPartitionReaderFactory( lazy val footerFileMetaData = ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + datetimeRebaseModeInRead) // Try to push down filters when filter push-down is enabled. val pushed = if (enableParquetFilterPushDown) { val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, - pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseMode) filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` @@ -170,9 +180,6 @@ case class ParquetPartitionReaderFactory( if (pushed.isDefined) { ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } - val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( - footerFileMetaData.getKeyValueMetaData.get, - datetimeRebaseModeInRead) val int96RebaseMode = DataSourceUtils.int96RebaseMode( footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala index 44053830defe5..4b3f4e7edca6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters} import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, SparkToParquetSchemaConverter} import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -51,8 +52,17 @@ case class ParquetScanBuilder( val isCaseSensitive = sqlConf.caseSensitiveAnalysis val parquetSchema = new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(readDataSchema()) - val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, - pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + // The rebase mode doesn't matter here because the filters are used to determine + // whether they is convertible. + LegacyBehaviorPolicy.CORRECTED) parquetFilters.convertibleFilters(this.filters).toArray } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 230d547f40586..8354158533ee5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -45,7 +45,9 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, LEGACY} +import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.{INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.tags.ExtendedSQLTest @@ -73,11 +75,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared protected def createParquetFilters( schema: MessageType, - caseSensitive: Option[Boolean] = None): ParquetFilters = + caseSensitive: Option[Boolean] = None, + datetimeRebaseMode: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED + ): ParquetFilters = new ParquetFilters(schema, conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, conf.parquetFilterPushDownInFilterThreshold, - caseSensitive.getOrElse(conf.caseSensitiveAnalysis)) + caseSensitive.getOrElse(conf.caseSensitiveAnalysis), + datetimeRebaseMode) override def beforeEach(): Unit = { super.beforeEach() @@ -587,71 +592,75 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared def date: Date = Date.valueOf(s) } - val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21") + val data = Seq("1000-01-01", "2018-03-19", "2018-03-20", "2018-03-21") import testImplicits._ Seq(false, true).foreach { java8Api => - withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) { - val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF() - withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) => - implicit val df: DataFrame = inputDF - - def resultFun(dateStr: String): Any = { - val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr) - fun(parsed) - } - - val dateAttr: Expression = df(colName).expr - assert(df(colName).expr.dataType === DateType) - - checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]], - data.map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr =!= "2018-03-18".date, classOf[NotEq[_]], - Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]], - resultFun("2018-03-21")) - checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]], - resultFun("2018-03-21")) - - checkFilterPredicate(Literal("2018-03-18".date) === dateAttr, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]], - resultFun("2018-03-21")) - checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr, classOf[LtEq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]], - resultFun("2018-03-21")) - - checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]], - resultFun("2018-03-21")) - checkFilterPredicate( - dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date, - classOf[Operators.Or], - Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21")))) + Seq(CORRECTED, LEGACY).foreach { rebaseMode => + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, + SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString) { + val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF() + withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) => + implicit val df: DataFrame = inputDF + + def resultFun(dateStr: String): Any = { + val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr) + fun(parsed) + } - Seq(3, 20).foreach { threshold => - withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> s"$threshold") { - checkFilterPredicate( - In(dateAttr, Array("2018-03-19".date, "2018-03-20".date, "2018-03-21".date, - "2018-03-22".date).map(Literal.apply)), - if (threshold == 3) classOf[Operators.And] else classOf[Operators.Or], - Seq(Row(resultFun("2018-03-19")), Row(resultFun("2018-03-20")), - Row(resultFun("2018-03-21")))) + val dateAttr: Expression = df(colName).expr + assert(df(colName).expr.dataType === DateType) + + checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]], + data.map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(dateAttr === "1000-01-01".date, classOf[Eq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(dateAttr <=> "1000-01-01".date, classOf[Eq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(dateAttr =!= "1000-01-01".date, classOf[NotEq[_]], + Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]], + resultFun("1000-01-01")) + checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]], + resultFun("2018-03-21")) + checkFilterPredicate(dateAttr <= "1000-01-01".date, classOf[LtEq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]], + resultFun("2018-03-21")) + + checkFilterPredicate(Literal("1000-01-01".date) === dateAttr, classOf[Eq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(Literal("1000-01-01".date) <=> dateAttr, classOf[Eq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]], + resultFun("1000-01-01")) + checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]], + resultFun("2018-03-21")) + checkFilterPredicate(Literal("1000-01-01".date) >= dateAttr, classOf[LtEq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]], + resultFun("2018-03-21")) + + checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]], + resultFun("2018-03-21")) + checkFilterPredicate( + dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date, + classOf[Operators.Or], + Seq(Row(resultFun("1000-01-01")), Row(resultFun("2018-03-21")))) + + Seq(3, 20).foreach { threshold => + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> s"$threshold") { + checkFilterPredicate( + In(dateAttr, Array("2018-03-19".date, "2018-03-20".date, "2018-03-21".date, + "2018-03-22".date).map(Literal.apply)), + if (threshold == 3) classOf[Operators.And] else classOf[Operators.Or], + Seq(Row(resultFun("2018-03-19")), Row(resultFun("2018-03-20")), + Row(resultFun("2018-03-21")))) + } } } } @@ -661,35 +670,36 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared test("filter pushdown - timestamp") { Seq(true, false).foreach { java8Api => - withSQLConf( - SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, - SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED", - SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> "CORRECTED") { - // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS + Seq(CORRECTED, LEGACY).foreach { rebaseMode => val millisData = Seq( "1000-06-14 08:28:53.123", "1582-06-15 08:28:53.001", "1900-06-16 08:28:53.0", "2018-06-17 08:28:53.999") - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> - ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) { + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, + SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MILLIS.toString) { testTimestampPushdown(millisData, java8Api) } - // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS val microsData = Seq( "1000-06-14 08:28:53.123456", "1582-06-15 08:28:53.123456", "1900-06-16 08:28:53.123456", "2018-06-17 08:28:53.123456") - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> - ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) { + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, + SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MICROS.toString) { testTimestampPushdown(microsData, java8Api) } - // spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> - ParquetOutputTimestampType.INT96.toString) { + // INT96 doesn't support pushdown + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, + SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> INT96.toString) { import testImplicits._ withTempPath { file => millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF