Skip to content

Commit

Permalink
fix bug where 'now' had same value as 'today' for timestamps (NVIDIA#…
Browse files Browse the repository at this point in the history
…1159)

* fix bug where 'now' had same value as 'today' for timestamps

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* fix regressions

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Remove hard-coded UTC timezone when parsing special dates

Signed-off-by: Andy Grove <andygrove@nvidia.com>
  • Loading branch information
andygrove authored Nov 19, 2020
1 parent 426677f commit da72e97
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 63 deletions.
48 changes: 48 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@

package com.nvidia.spark.rapids

import java.time.LocalDate

import scala.collection.mutable.ListBuffer

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{localDateToDays, SQLDate}

/**
* Class for helper functions for Date
*/
Expand All @@ -42,6 +47,49 @@ object DateUtils {

val ONE_DAY_MICROSECONDS = 86400000000L

val EPOCH = "epoch"
val NOW = "now"
val TODAY = "today"
val YESTERDAY = "yesterday"
val TOMORROW = "tomorrow"

def specialDatesDays: Map[String, Int] = {
val today = currentDate()
Map(
EPOCH -> 0,
NOW -> today,
TODAY -> today,
YESTERDAY -> (today - 1),
TOMORROW -> (today + 1)
)
}

def specialDatesSeconds: Map[String, Long] = {
val today = currentDate()
val now = DateTimeUtils.currentTimestamp()
Map(
EPOCH -> 0,
NOW -> now / 1000000L,
TODAY -> today * ONE_DAY_SECONDS,
YESTERDAY -> (today - 1) * ONE_DAY_SECONDS,
TOMORROW -> (today + 1) * ONE_DAY_SECONDS
)
}

def specialDatesMicros: Map[String, Long] = {
val today = currentDate()
val now = DateTimeUtils.currentTimestamp()
Map(
EPOCH -> 0,
NOW -> now,
TODAY -> today * ONE_DAY_MICROSECONDS,
YESTERDAY -> (today - 1) * ONE_DAY_MICROSECONDS,
TOMORROW -> (today + 1) * ONE_DAY_MICROSECONDS
)
}

def currentDate(): SQLDate = localDateToDays(LocalDate.now())

case class FormatKeywordToReplace(word: String, startIndex: Int, endIndex: Int)

/**
Expand Down
39 changes: 5 additions & 34 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
package com.nvidia.spark.rapids

import java.text.SimpleDateFormat
import java.time.ZoneId
import java.util.{Calendar, TimeZone}

import ai.rapids.cudf.{ColumnVector, DType, Scalar}

import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.{Cast, CastBase, Expression, NullIntolerant, TimeZoneAwareExpression}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._

/** Meta-data for cast and ansi_cast. */
Expand Down Expand Up @@ -120,11 +117,6 @@ object GpuCast {

val INVALID_FLOAT_CAST_MSG = "At least one value is either null or is an invalid number"

val EPOCH = "epoch"
val NOW = "now"
val TODAY = "today"
val YESTERDAY = "yesterday"
val TOMORROW = "tomorrow"

/**
* Returns true iff we can cast `from` to `to` using the GPU.
Expand Down Expand Up @@ -186,17 +178,6 @@ object GpuCast {
case _ => false
}
}

def calculateSpecialDates: Map[String, Int] = {
val now = DateTimeUtils.currentDate(ZoneId.of("UTC"))
Map(
EPOCH -> 0,
NOW -> now,
TODAY -> now,
YESTERDAY -> (now - 1),
TOMORROW -> (now + 1)
)
}
}

/**
Expand Down Expand Up @@ -682,7 +663,7 @@ case class GpuCast(
cv.stringReplaceWithBackrefs("-([0-9])([ T](:?[\\r\\n]|.)*)?\\Z", "-0\\1")
}

val specialDates = calculateSpecialDates
val specialDates = DateUtils.specialDatesDays

withResource(sanitizedInput) { sanitizedInput =>

Expand Down Expand Up @@ -755,20 +736,10 @@ case class GpuCast(
}

// special timestamps
val cal = Calendar.getInstance(TimeZone.getTimeZone(ZoneId.of("UTC")))
cal.set(Calendar.HOUR_OF_DAY, 0)
cal.set(Calendar.MINUTE, 0)
cal.set(Calendar.SECOND, 0)
cal.set(Calendar.MILLISECOND, 0)
val today: Long = cal.getTimeInMillis * 1000
val todayStr = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime)
val specialDates: Map[String, Long] = Map(
GpuCast.EPOCH -> 0,
GpuCast.NOW -> today,
GpuCast.TODAY -> today,
GpuCast.YESTERDAY -> (today - DateUtils.ONE_DAY_MICROSECONDS),
GpuCast.TOMORROW -> (today + DateUtils.ONE_DAY_MICROSECONDS)
)
val today = DateUtils.currentDate()
val todayStr = new SimpleDateFormat("yyyy-MM-dd")
.format(today * DateUtils.ONE_DAY_SECONDS * 1000L)
val specialDates = DateUtils.specialDatesMicros

var sanitizedInput = input.incRefCount()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ package org.apache.spark.sql.rapids
import java.time.ZoneId

import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, Scalar}
import com.nvidia.spark.rapids.{Arm, BinaryExprMeta, ConfKeysAndIncompat, DateUtils, GpuBinaryExpression, GpuCast, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta}
import com.nvidia.spark.rapids.{Arm, BinaryExprMeta, ConfKeysAndIncompat, DateUtils, GpuBinaryExpression, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta}
import com.nvidia.spark.rapids.DateUtils.TimestampFormatConversionException
import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPolicy}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.{SPARK_VERSION, SparkUpgradeException}
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, ExpectsInputTypes, Expression, ImplicitCastInputTypes, NullIntolerant, TimeZoneAwareExpression}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -334,22 +333,12 @@ object GpuToTimestamp extends Arm {
"yyyy-MM-dd HH:mm:ss"
)

val specialDatesSeconds = GpuCast.calculateSpecialDates
.map {
case (name, days) => (name, days * DateUtils.ONE_DAY_SECONDS)
}

val specialDatesMicros = GpuCast.calculateSpecialDates
.map {
case (name, days) => (name, days * DateUtils.ONE_DAY_MICROSECONDS)
}

def daysScalarSeconds(name: String): Scalar = {
Scalar.timestampFromLong(DType.TIMESTAMP_SECONDS, specialDatesSeconds(name))
Scalar.timestampFromLong(DType.TIMESTAMP_SECONDS, DateUtils.specialDatesSeconds(name))
}

def daysScalarMicros(name: String): Scalar = {
Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, specialDatesMicros(name))
Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, DateUtils.specialDatesMicros(name))
}

def daysEqual(col: ColumnVector, name: String): ColumnVector = {
Expand Down Expand Up @@ -396,19 +385,19 @@ object GpuToTimestamp extends Arm {
// values, since anything else is invalid and should throw an error or be converted to null
// depending on the policy
withResource(isTimestamp) { isTimestamp =>
withResource(daysEqual(lhs.getBase, GpuCast.EPOCH)) { isEpoch =>
withResource(daysEqual(lhs.getBase, GpuCast.NOW)) { isNow =>
withResource(daysEqual(lhs.getBase, GpuCast.TODAY)) { isToday =>
withResource(daysEqual(lhs.getBase, GpuCast.YESTERDAY)) { isYesterday =>
withResource(daysEqual(lhs.getBase, GpuCast.TOMORROW)) { isTomorrow =>
withResource(daysEqual(lhs.getBase, DateUtils.EPOCH)) { isEpoch =>
withResource(daysEqual(lhs.getBase, DateUtils.NOW)) { isNow =>
withResource(daysEqual(lhs.getBase, DateUtils.TODAY)) { isToday =>
withResource(daysEqual(lhs.getBase, DateUtils.YESTERDAY)) { isYesterday =>
withResource(daysEqual(lhs.getBase, DateUtils.TOMORROW)) { isTomorrow =>
withResource(lhs.getBase.isNull) { isNull =>
withResource(Scalar.fromNull(dtype)) { nullValue =>
withResource(asTimestamp(lhs.getBase, strfFormat)) { converted =>
withResource(daysScalar(GpuCast.EPOCH)) { epoch =>
withResource(daysScalar(GpuCast.NOW)) { now =>
withResource(daysScalar(GpuCast.TODAY)) { today =>
withResource(daysScalar(GpuCast.YESTERDAY)) { yesterday =>
withResource(daysScalar(GpuCast.TOMORROW)) { tomorrow =>
withResource(daysScalar(DateUtils.EPOCH)) { epoch =>
withResource(daysScalar(DateUtils.NOW)) { now =>
withResource(daysScalar(DateUtils.TODAY)) { today =>
withResource(daysScalar(DateUtils.YESTERDAY)) { yesterday =>
withResource(daysScalar(DateUtils.TOMORROW)) { tomorrow =>
withResource(isTomorrow.ifElse(tomorrow, nullValue)) { a =>
withResource(isYesterday.ifElse(yesterday, a)) { b =>
withResource(isToday.ifElse(today, b)) { c =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,22 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite {
"Part of the plan is not columnar class org.apache.spark.sql.execution.ProjectExec"))
}

test("parse now") {
def now(spark: SparkSession) = {
import spark.implicits._
Seq("now").toDF("c0")
.repartition(2)
.withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd HH:mm:ss"))
}
val startTimeSeconds = System.currentTimeMillis()/1000L
val cpuNowSeconds = withCpuSparkSession(now).collect().head.toSeq(1).asInstanceOf[Long]
val gpuNowSeconds = withGpuSparkSession(now).collect().head.toSeq(1).asInstanceOf[Long]
assert(cpuNowSeconds >= startTimeSeconds)
assert(gpuNowSeconds >= startTimeSeconds)
// CPU ran first so cannot have a greater value than the GPU run (but could be the same second)
assert(cpuNowSeconds <= gpuNowSeconds)
}

private def timestampsAsStrings(spark: SparkSession) = {
import spark.implicits._
timestampValues.toDF("c0")
Expand All @@ -125,11 +141,11 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite {
private def datesAsStrings(spark: SparkSession) = {
import spark.implicits._
val values = Seq(
GpuCast.EPOCH,
GpuCast.NOW,
GpuCast.TODAY,
GpuCast.YESTERDAY,
GpuCast.TOMORROW
DateUtils.EPOCH,
DateUtils.NOW,
DateUtils.TODAY,
DateUtils.YESTERDAY,
DateUtils.TOMORROW
) ++ timestampValues
values.toDF("c0")
}
Expand Down

0 comments on commit da72e97

Please sign in to comment.