From da72e97d1f9c61bc9e9ac360bb24c2bff544cc9e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 19 Nov 2020 05:21:25 -0700 Subject: [PATCH] fix bug where 'now' had same value as 'today' for timestamps (#1159) * fix bug where 'now' had same value as 'today' for timestamps Signed-off-by: Andy Grove * fix regressions Signed-off-by: Andy Grove * Remove hard-coded UTC timezone when parsing special dates Signed-off-by: Andy Grove --- .../com/nvidia/spark/rapids/DateUtils.scala | 48 +++++++++++++++++++ .../com/nvidia/spark/rapids/GpuCast.scala | 39 ++------------- .../sql/rapids/datetimeExpressions.scala | 37 +++++--------- .../spark/rapids/ParseDateTimeSuite.scala | 26 ++++++++-- 4 files changed, 87 insertions(+), 63 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala index a452dce42b7..270a109dfe6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala @@ -16,8 +16,13 @@ package com.nvidia.spark.rapids +import java.time.LocalDate + import scala.collection.mutable.ListBuffer +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{localDateToDays, SQLDate} + /** * Class for helper functions for Date */ @@ -42,6 +47,49 @@ object DateUtils { val ONE_DAY_MICROSECONDS = 86400000000L + val EPOCH = "epoch" + val NOW = "now" + val TODAY = "today" + val YESTERDAY = "yesterday" + val TOMORROW = "tomorrow" + + def specialDatesDays: Map[String, Int] = { + val today = currentDate() + Map( + EPOCH -> 0, + NOW -> today, + TODAY -> today, + YESTERDAY -> (today - 1), + TOMORROW -> (today + 1) + ) + } + + def specialDatesSeconds: Map[String, Long] = { + val today = currentDate() + val now = DateTimeUtils.currentTimestamp() + Map( + EPOCH -> 0, + NOW -> now / 1000000L, + TODAY -> today * ONE_DAY_SECONDS, + YESTERDAY -> (today - 1) * ONE_DAY_SECONDS, + TOMORROW -> (today + 1) * ONE_DAY_SECONDS + ) + } + + def specialDatesMicros: Map[String, Long] = { + val today = currentDate() + val now = DateTimeUtils.currentTimestamp() + Map( + EPOCH -> 0, + NOW -> now, + TODAY -> today * ONE_DAY_MICROSECONDS, + YESTERDAY -> (today - 1) * ONE_DAY_MICROSECONDS, + TOMORROW -> (today + 1) * ONE_DAY_MICROSECONDS + ) + } + + def currentDate(): SQLDate = localDateToDays(LocalDate.now()) + case class FormatKeywordToReplace(word: String, startIndex: Int, endIndex: Int) /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index c1fd0dd382b..d4cdf7e5bba 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -17,14 +17,11 @@ package com.nvidia.spark.rapids import java.text.SimpleDateFormat -import java.time.ZoneId -import java.util.{Calendar, TimeZone} import ai.rapids.cudf.{ColumnVector, DType, Scalar} import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.{Cast, CastBase, Expression, NullIntolerant, TimeZoneAwareExpression} -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ /** Meta-data for cast and ansi_cast. */ @@ -120,11 +117,6 @@ object GpuCast { val INVALID_FLOAT_CAST_MSG = "At least one value is either null or is an invalid number" - val EPOCH = "epoch" - val NOW = "now" - val TODAY = "today" - val YESTERDAY = "yesterday" - val TOMORROW = "tomorrow" /** * Returns true iff we can cast `from` to `to` using the GPU. @@ -186,17 +178,6 @@ object GpuCast { case _ => false } } - - def calculateSpecialDates: Map[String, Int] = { - val now = DateTimeUtils.currentDate(ZoneId.of("UTC")) - Map( - EPOCH -> 0, - NOW -> now, - TODAY -> now, - YESTERDAY -> (now - 1), - TOMORROW -> (now + 1) - ) - } } /** @@ -682,7 +663,7 @@ case class GpuCast( cv.stringReplaceWithBackrefs("-([0-9])([ T](:?[\\r\\n]|.)*)?\\Z", "-0\\1") } - val specialDates = calculateSpecialDates + val specialDates = DateUtils.specialDatesDays withResource(sanitizedInput) { sanitizedInput => @@ -755,20 +736,10 @@ case class GpuCast( } // special timestamps - val cal = Calendar.getInstance(TimeZone.getTimeZone(ZoneId.of("UTC"))) - cal.set(Calendar.HOUR_OF_DAY, 0) - cal.set(Calendar.MINUTE, 0) - cal.set(Calendar.SECOND, 0) - cal.set(Calendar.MILLISECOND, 0) - val today: Long = cal.getTimeInMillis * 1000 - val todayStr = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime) - val specialDates: Map[String, Long] = Map( - GpuCast.EPOCH -> 0, - GpuCast.NOW -> today, - GpuCast.TODAY -> today, - GpuCast.YESTERDAY -> (today - DateUtils.ONE_DAY_MICROSECONDS), - GpuCast.TOMORROW -> (today + DateUtils.ONE_DAY_MICROSECONDS) - ) + val today = DateUtils.currentDate() + val todayStr = new SimpleDateFormat("yyyy-MM-dd") + .format(today * DateUtils.ONE_DAY_SECONDS * 1000L) + val specialDates = DateUtils.specialDatesMicros var sanitizedInput = input.incRefCount() diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 0db90cca7af..bf8d640d42d 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -19,12 +19,11 @@ package org.apache.spark.sql.rapids import java.time.ZoneId import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, Scalar} -import com.nvidia.spark.rapids.{Arm, BinaryExprMeta, ConfKeysAndIncompat, DateUtils, GpuBinaryExpression, GpuCast, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} +import com.nvidia.spark.rapids.{Arm, BinaryExprMeta, ConfKeysAndIncompat, DateUtils, GpuBinaryExpression, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} import com.nvidia.spark.rapids.DateUtils.TimestampFormatConversionException import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPolicy} import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import org.apache.spark.{SPARK_VERSION, SparkUpgradeException} import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, ExpectsInputTypes, Expression, ImplicitCastInputTypes, NullIntolerant, TimeZoneAwareExpression} import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch @@ -334,22 +333,12 @@ object GpuToTimestamp extends Arm { "yyyy-MM-dd HH:mm:ss" ) - val specialDatesSeconds = GpuCast.calculateSpecialDates - .map { - case (name, days) => (name, days * DateUtils.ONE_DAY_SECONDS) - } - - val specialDatesMicros = GpuCast.calculateSpecialDates - .map { - case (name, days) => (name, days * DateUtils.ONE_DAY_MICROSECONDS) - } - def daysScalarSeconds(name: String): Scalar = { - Scalar.timestampFromLong(DType.TIMESTAMP_SECONDS, specialDatesSeconds(name)) + Scalar.timestampFromLong(DType.TIMESTAMP_SECONDS, DateUtils.specialDatesSeconds(name)) } def daysScalarMicros(name: String): Scalar = { - Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, specialDatesMicros(name)) + Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, DateUtils.specialDatesMicros(name)) } def daysEqual(col: ColumnVector, name: String): ColumnVector = { @@ -396,19 +385,19 @@ object GpuToTimestamp extends Arm { // values, since anything else is invalid and should throw an error or be converted to null // depending on the policy withResource(isTimestamp) { isTimestamp => - withResource(daysEqual(lhs.getBase, GpuCast.EPOCH)) { isEpoch => - withResource(daysEqual(lhs.getBase, GpuCast.NOW)) { isNow => - withResource(daysEqual(lhs.getBase, GpuCast.TODAY)) { isToday => - withResource(daysEqual(lhs.getBase, GpuCast.YESTERDAY)) { isYesterday => - withResource(daysEqual(lhs.getBase, GpuCast.TOMORROW)) { isTomorrow => + withResource(daysEqual(lhs.getBase, DateUtils.EPOCH)) { isEpoch => + withResource(daysEqual(lhs.getBase, DateUtils.NOW)) { isNow => + withResource(daysEqual(lhs.getBase, DateUtils.TODAY)) { isToday => + withResource(daysEqual(lhs.getBase, DateUtils.YESTERDAY)) { isYesterday => + withResource(daysEqual(lhs.getBase, DateUtils.TOMORROW)) { isTomorrow => withResource(lhs.getBase.isNull) { isNull => withResource(Scalar.fromNull(dtype)) { nullValue => withResource(asTimestamp(lhs.getBase, strfFormat)) { converted => - withResource(daysScalar(GpuCast.EPOCH)) { epoch => - withResource(daysScalar(GpuCast.NOW)) { now => - withResource(daysScalar(GpuCast.TODAY)) { today => - withResource(daysScalar(GpuCast.YESTERDAY)) { yesterday => - withResource(daysScalar(GpuCast.TOMORROW)) { tomorrow => + withResource(daysScalar(DateUtils.EPOCH)) { epoch => + withResource(daysScalar(DateUtils.NOW)) { now => + withResource(daysScalar(DateUtils.TODAY)) { today => + withResource(daysScalar(DateUtils.YESTERDAY)) { yesterday => + withResource(daysScalar(DateUtils.TOMORROW)) { tomorrow => withResource(isTomorrow.ifElse(tomorrow, nullValue)) { a => withResource(isYesterday.ifElse(yesterday, a)) { b => withResource(isToday.ifElse(today, b)) { c => diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala index 558270de290..4cbe3503a92 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala @@ -117,6 +117,22 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite { "Part of the plan is not columnar class org.apache.spark.sql.execution.ProjectExec")) } + test("parse now") { + def now(spark: SparkSession) = { + import spark.implicits._ + Seq("now").toDF("c0") + .repartition(2) + .withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd HH:mm:ss")) + } + val startTimeSeconds = System.currentTimeMillis()/1000L + val cpuNowSeconds = withCpuSparkSession(now).collect().head.toSeq(1).asInstanceOf[Long] + val gpuNowSeconds = withGpuSparkSession(now).collect().head.toSeq(1).asInstanceOf[Long] + assert(cpuNowSeconds >= startTimeSeconds) + assert(gpuNowSeconds >= startTimeSeconds) + // CPU ran first so cannot have a greater value than the GPU run (but could be the same second) + assert(cpuNowSeconds <= gpuNowSeconds) + } + private def timestampsAsStrings(spark: SparkSession) = { import spark.implicits._ timestampValues.toDF("c0") @@ -125,11 +141,11 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite { private def datesAsStrings(spark: SparkSession) = { import spark.implicits._ val values = Seq( - GpuCast.EPOCH, - GpuCast.NOW, - GpuCast.TODAY, - GpuCast.YESTERDAY, - GpuCast.TOMORROW + DateUtils.EPOCH, + DateUtils.NOW, + DateUtils.TODAY, + DateUtils.YESTERDAY, + DateUtils.TOMORROW ) ++ timestampValues values.toDF("c0") }