Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bug where 'now' had same value as 'today' for timestamps #1159

Merged
merged 4 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

package com.nvidia.spark.rapids

import java.time.ZoneId

import scala.collection.mutable.ListBuffer

import org.apache.spark.sql.catalyst.util.DateTimeUtils

/**
* Class for helper functions for Date
*/
Expand All @@ -42,6 +46,47 @@ object DateUtils {

val ONE_DAY_MICROSECONDS = 86400000000L

val EPOCH = "epoch"
val NOW = "now"
val TODAY = "today"
val YESTERDAY = "yesterday"
val TOMORROW = "tomorrow"

def specialDatesDays: Map[String, Int] = {
val today = DateTimeUtils.currentDate(ZoneId.of("UTC"))
jlowe marked this conversation as resolved.
Show resolved Hide resolved
Map(
EPOCH -> 0,
NOW -> today,
TODAY -> today,
YESTERDAY -> (today - 1),
TOMORROW -> (today + 1)
)
}

def specialDatesSeconds: Map[String, Long] = {
val today = DateTimeUtils.currentDate(ZoneId.of("UTC"))
val now = DateTimeUtils.currentTimestamp()
Map(
EPOCH -> 0,
NOW -> now / 1000000L,
TODAY -> today * ONE_DAY_SECONDS,
YESTERDAY -> (today - 1) * ONE_DAY_SECONDS,
TOMORROW -> (today + 1) * ONE_DAY_SECONDS
)
}

def specialDatesMicros: Map[String, Long] = {
val today = DateTimeUtils.currentDate(ZoneId.of("UTC"))
val now = DateTimeUtils.currentTimestamp()
Map(
EPOCH -> 0,
NOW -> now,
TODAY -> today * ONE_DAY_MICROSECONDS,
YESTERDAY -> (today - 1) * ONE_DAY_MICROSECONDS,
TOMORROW -> (today + 1) * ONE_DAY_MICROSECONDS
)
}

case class FormatKeywordToReplace(word: String, startIndex: Int, endIndex: Int)

/**
Expand Down
37 changes: 6 additions & 31 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import ai.rapids.cudf.{ColumnVector, DType, Scalar}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.{Cast, CastBase, Expression, NullIntolerant, TimeZoneAwareExpression}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
import org.apache.spark.sql.types._

/** Meta-data for cast and ansi_cast. */
Expand Down Expand Up @@ -120,11 +121,6 @@ object GpuCast {

val INVALID_FLOAT_CAST_MSG = "At least one value is either null or is an invalid number"

val EPOCH = "epoch"
val NOW = "now"
val TODAY = "today"
val YESTERDAY = "yesterday"
val TOMORROW = "tomorrow"

/**
* Returns true iff we can cast `from` to `to` using the GPU.
Expand Down Expand Up @@ -186,17 +182,6 @@ object GpuCast {
case _ => false
}
}

def calculateSpecialDates: Map[String, Int] = {
val now = DateTimeUtils.currentDate(ZoneId.of("UTC"))
Map(
EPOCH -> 0,
NOW -> now,
TODAY -> now,
YESTERDAY -> (now - 1),
TOMORROW -> (now + 1)
)
}
}

/**
Expand Down Expand Up @@ -682,7 +667,7 @@ case class GpuCast(
cv.stringReplaceWithBackrefs("-([0-9])([ T](:?[\\r\\n]|.)*)?\\Z", "-0\\1")
}

val specialDates = calculateSpecialDates
val specialDates = DateUtils.specialDatesDays

withResource(sanitizedInput) { sanitizedInput =>

Expand Down Expand Up @@ -755,20 +740,10 @@ case class GpuCast(
}

// special timestamps
val cal = Calendar.getInstance(TimeZone.getTimeZone(ZoneId.of("UTC")))
cal.set(Calendar.HOUR_OF_DAY, 0)
cal.set(Calendar.MINUTE, 0)
cal.set(Calendar.SECOND, 0)
cal.set(Calendar.MILLISECOND, 0)
val today: Long = cal.getTimeInMillis * 1000
val todayStr = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime)
val specialDates: Map[String, Long] = Map(
GpuCast.EPOCH -> 0,
GpuCast.NOW -> today,
GpuCast.TODAY -> today,
GpuCast.YESTERDAY -> (today - DateUtils.ONE_DAY_MICROSECONDS),
GpuCast.TOMORROW -> (today + DateUtils.ONE_DAY_MICROSECONDS)
)
val today = DateTimeUtils.currentDate(ZoneId.of("UTC"))
val todayStr = new SimpleDateFormat("yyyy-MM-dd")
.format(today * DateUtils.ONE_DAY_SECONDS * 1000L)
val specialDates = DateUtils.specialDatesMicros

var sanitizedInput = input.incRefCount()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,22 +334,12 @@ object GpuToTimestamp extends Arm {
"yyyy-MM-dd HH:mm:ss"
)

val specialDatesSeconds = GpuCast.calculateSpecialDates
.map {
case (name, days) => (name, days * DateUtils.ONE_DAY_SECONDS)
}

val specialDatesMicros = GpuCast.calculateSpecialDates
.map {
case (name, days) => (name, days * DateUtils.ONE_DAY_MICROSECONDS)
}

def daysScalarSeconds(name: String): Scalar = {
Scalar.timestampFromLong(DType.TIMESTAMP_SECONDS, specialDatesSeconds(name))
Scalar.timestampFromLong(DType.TIMESTAMP_SECONDS, DateUtils.specialDatesSeconds(name))
}

def daysScalarMicros(name: String): Scalar = {
Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, specialDatesMicros(name))
Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, DateUtils.specialDatesMicros(name))
}

def daysEqual(col: ColumnVector, name: String): ColumnVector = {
Expand Down Expand Up @@ -396,19 +386,19 @@ object GpuToTimestamp extends Arm {
// values, since anything else is invalid and should throw an error or be converted to null
// depending on the policy
withResource(isTimestamp) { isTimestamp =>
withResource(daysEqual(lhs.getBase, GpuCast.EPOCH)) { isEpoch =>
withResource(daysEqual(lhs.getBase, GpuCast.NOW)) { isNow =>
withResource(daysEqual(lhs.getBase, GpuCast.TODAY)) { isToday =>
withResource(daysEqual(lhs.getBase, GpuCast.YESTERDAY)) { isYesterday =>
withResource(daysEqual(lhs.getBase, GpuCast.TOMORROW)) { isTomorrow =>
withResource(daysEqual(lhs.getBase, DateUtils.EPOCH)) { isEpoch =>
withResource(daysEqual(lhs.getBase, DateUtils.NOW)) { isNow =>
withResource(daysEqual(lhs.getBase, DateUtils.TODAY)) { isToday =>
withResource(daysEqual(lhs.getBase, DateUtils.YESTERDAY)) { isYesterday =>
withResource(daysEqual(lhs.getBase, DateUtils.TOMORROW)) { isTomorrow =>
withResource(lhs.getBase.isNull) { isNull =>
withResource(Scalar.fromNull(dtype)) { nullValue =>
withResource(asTimestamp(lhs.getBase, strfFormat)) { converted =>
withResource(daysScalar(GpuCast.EPOCH)) { epoch =>
withResource(daysScalar(GpuCast.NOW)) { now =>
withResource(daysScalar(GpuCast.TODAY)) { today =>
withResource(daysScalar(GpuCast.YESTERDAY)) { yesterday =>
withResource(daysScalar(GpuCast.TOMORROW)) { tomorrow =>
withResource(daysScalar(DateUtils.EPOCH)) { epoch =>
withResource(daysScalar(DateUtils.NOW)) { now =>
withResource(daysScalar(DateUtils.TODAY)) { today =>
withResource(daysScalar(DateUtils.YESTERDAY)) { yesterday =>
withResource(daysScalar(DateUtils.TOMORROW)) { tomorrow =>
withResource(isTomorrow.ifElse(tomorrow, nullValue)) { a =>
withResource(isYesterday.ifElse(yesterday, a)) { b =>
withResource(isToday.ifElse(today, b)) { c =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,22 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite {
"Part of the plan is not columnar class org.apache.spark.sql.execution.ProjectExec"))
}

test("parse now") {
def now(spark: SparkSession) = {
import spark.implicits._
Seq("now").toDF("c0")
.repartition(2)
.withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd HH:mm:ss"))
}
val startTimeSeconds = System.currentTimeMillis()/1000L
val cpuNowSeconds = withCpuSparkSession(now).collect().head.toSeq(1).asInstanceOf[Long]
val gpuNowSeconds = withGpuSparkSession(now).collect().head.toSeq(1).asInstanceOf[Long]
assert(cpuNowSeconds >= startTimeSeconds)
assert(gpuNowSeconds >= startTimeSeconds)
// CPU ran first so cannot have a greater value than the GPU run (but could be the same second)
assert(cpuNowSeconds <= gpuNowSeconds)
}

private def timestampsAsStrings(spark: SparkSession) = {
import spark.implicits._
timestampValues.toDF("c0")
Expand All @@ -125,11 +141,11 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite {
private def datesAsStrings(spark: SparkSession) = {
import spark.implicits._
val values = Seq(
GpuCast.EPOCH,
GpuCast.NOW,
GpuCast.TODAY,
GpuCast.YESTERDAY,
GpuCast.TOMORROW
DateUtils.EPOCH,
DateUtils.NOW,
DateUtils.TODAY,
DateUtils.YESTERDAY,
DateUtils.TOMORROW
) ++ timestampValues
values.toDF("c0")
}
Expand Down