Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ANSI mode for ToUnixTimestamp, UnixTimestamp, GetTimestamp, DateAddInterval #5316

Merged
merged 3 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,7 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):

ansi_enabled_conf = {'spark.sql.ansi.enabled': 'true'}
no_nans_conf = {'spark.rapids.sql.hasNans': 'false'}
legacy_interval_enabled_conf = {'spark.sql.legacy.interval.enabled': 'true'}

def copy_and_update(conf, *more_confs):
local_conf = conf.copy()
Expand Down
110 changes: 78 additions & 32 deletions integration_tests/src/main/python/date_time_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import pytest
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_gpu_and_cpu_error
from data_gen import *
from datetime import date, datetime, timezone
from marks import incompat, allow_non_gpu
Expand Down Expand Up @@ -51,13 +51,37 @@ def test_timeadd_daytime_column():
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, gen_list).selectExpr("t + d", "t + INTERVAL '1 02:03:04' DAY TO SECOND"))

# Should specify `spark.sql.legacy.interval.enabled` to test `DateAddInterval` after Spark 3.2.0,
# refer to https://issues.apache.org/jira/browse/SPARK-34896
# [SPARK-34896][SQL] Return day-time interval from dates subtraction
# 1. Add the SQL config `spark.sql.legacy.interval.enabled` which will control when Spark SQL should use `CalendarIntervalType` instead of ANSI intervals.
@pytest.mark.parametrize('data_gen', vals, ids=idfn)
def test_dateaddinterval(data_gen):
days, seconds = data_gen
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, DateGen(start=date(200, 1, 1), end=date(800, 1, 1)), seed=1)
.selectExpr('a + (interval {} days {} seconds)'.format(days, seconds),
'a - (interval {} days {} seconds)'.format(days, seconds)))
'a - (interval {} days {} seconds)'.format(days, seconds)),
legacy_interval_enabled_conf)

# test add days(not specify hours, minutes, seconds, milliseconds, microseconds) in ANSI mode.
@pytest.mark.parametrize('data_gen', vals, ids=idfn)
def test_dateaddinterval_ansi(data_gen):
days, _ = data_gen
# only specify the `days`
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, DateGen(start=date(200, 1, 1), end=date(800, 1, 1)), seed=1)
.selectExpr('a + (interval {} days)'.format(days)),
conf=copy_and_update(ansi_enabled_conf, legacy_interval_enabled_conf))

# Throws if add hours, minutes or seconds, milliseconds, microseconds to a date in ANSI mode
def test_dateaddinterval_ansi_exception():
assert_gpu_and_cpu_error(
# specify the `seconds`
lambda spark : unary_op_df(spark, DateGen(start=date(200, 1, 1), end=date(800, 1, 1)), seed=1)
.selectExpr('a + (interval {} days {} seconds)'.format(1, 5)).collect(),
conf=copy_and_update(ansi_enabled_conf, legacy_interval_enabled_conf),
error_message="IllegalArgumentException")

@pytest.mark.parametrize('data_gen', date_gens, ids=idfn)
def test_datediff(data_gen):
Expand Down Expand Up @@ -183,63 +207,85 @@ def test_unix_timestamp(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))))

@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF'])
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_to_unix_timestamp(data_gen):
def test_to_unix_timestamp(data_gen, ansi_enabled):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"))

@allow_non_gpu('ProjectExec,Alias,ToUnixTimestamp,Literal')
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_to_unix_timestamp_fallback(data_gen):
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"),
'ToUnixTimestamp',
conf={'spark.sql.ansi.enabled': 'true'})
lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"),
{'spark.sql.ansi.enabled': ansi_enabled})

@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF'])
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_unix_timestamp_improved(data_gen):
def test_unix_timestamp_improved(data_gen, ansi_enabled):
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true",
"spark.sql.legacy.timeParserPolicy": "CORRECTED"}
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))), conf)
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))),
copy_and_update({'spark.sql.ansi.enabled': ansi_enabled}, conf))

@allow_non_gpu('ProjectExec,Alias,UnixTimestamp,Literal')
@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF'])
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_unix_timestamp_fallback(data_gen):
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col("a"))),
'UnixTimestamp',
conf={'spark.sql.ansi.enabled': 'true'})
def test_unix_timestamp(data_gen, ansi_enabled):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col("a"))),
{'spark.sql.ansi.enabled': ansi_enabled})

@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF'])
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_to_unix_timestamp_improved(data_gen):
def test_to_unix_timestamp_improved(data_gen, ansi_enabled):
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"}
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"), conf)
lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"),
copy_and_update({'spark.sql.ansi.enabled': ansi_enabled}, conf))

str_date_and_format_gen = [pytest.param(StringGen('[0-9]{4}/[01][0-9]'),'yyyy/MM', marks=pytest.mark.xfail(reason="cudf does no checks")),
(StringGen('[0-9]{4}/[01][12]/[0-2][1-8]'),'yyyy/MM/dd'),
(StringGen('[01][12]/[0-2][1-8]'), 'MM/dd'),
(StringGen('[0-2][1-8]/[01][12]'), 'dd/MM'),
(ConvertGen(DateGen(nullable=False), lambda d: d.strftime('%Y/%m').zfill(7), data_type=StringType()), 'yyyy/MM')]

# get invalid date string df
def invalid_date_string_df(spark):
return spark.createDataFrame([['invalid_date_string']], "a string")

@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF'])
@pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn)
def test_string_to_unix_timestamp(data_gen, date_form):
def test_string_to_unix_timestamp(data_gen, date_form, ansi_enabled):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form)))
lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form)),
{'spark.sql.ansi.enabled': ansi_enabled})

def test_string_to_unix_timestamp_ansi_exception():
assert_gpu_and_cpu_error(
lambda spark : invalid_date_string_df(spark).selectExpr("to_unix_timestamp(a, '{}')".format('yyyy/MM/dd')).collect(),
error_message="Exception",
conf=ansi_enabled_conf)

@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF'])
@pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn)
def test_string_unix_timestamp(data_gen, date_form):
def test_string_unix_timestamp(data_gen, date_form, ansi_enabled):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen, seed=1).select(f.unix_timestamp(f.col('a'), date_form)))
lambda spark : unary_op_df(spark, data_gen, seed=1).select(f.unix_timestamp(f.col('a'), date_form)),
{'spark.sql.ansi.enabled': ansi_enabled})

def test_string_unix_timestamp_ansi_exception():
assert_gpu_and_cpu_error(
lambda spark : invalid_date_string_df(spark).select(f.unix_timestamp(f.col('a'), 'yyyy/MM/dd')).collect(),
error_message="Exception",
conf=ansi_enabled_conf)

@allow_non_gpu('ProjectExec,Alias,GetTimestamp,Literal,Cast')
@pytest.mark.parametrize('data_gen', [StringGen('200[0-9]-0[1-9]-[0-2][1-8]')], ids=idfn)
def test_gettimestamp_fallback(data_gen):
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.to_date(f.col("a"), "yyyy-MM-dd")),
'GetTimestamp',
conf={'spark.sql.ansi.enabled': 'true'})
@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF'])
def test_gettimestamp(data_gen, ansi_enabled):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.to_date(f.col("a"), "yyyy-MM-dd")),
{'spark.sql.ansi.enabled': ansi_enabled})

def test_gettimestamp_ansi_exception():
assert_gpu_and_cpu_error(
lambda spark : invalid_date_string_df(spark).select(f.to_date(f.col("a"), "yyyy-MM-dd")).collect(),
error_message="Exception",
conf=ansi_enabled_conf)

supported_date_formats = ['yyyy-MM-dd', 'yyyy-MM', 'yyyy/MM/dd', 'yyyy/MM', 'dd/MM/yyyy',
'MM-dd', 'MM/dd', 'dd-MM', 'dd/MM']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1732,8 +1732,6 @@ object GpuOverrides extends Logging {
.withPsNote(TypeEnum.STRING, "A limited number of formats are supported"),
TypeSig.STRING)),
(a, conf, p, r) => new UnixTimeExprMeta[DateFormatClass](a, conf, p, r) {
override def shouldFallbackOnAnsiTimestamp: Boolean = false

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuDateFormatClass(lhs, rhs, strfFormat)
}
Expand All @@ -1748,8 +1746,6 @@ object GpuOverrides extends Logging {
.withPsNote(TypeEnum.STRING, "A limited number of formats are supported"),
TypeSig.STRING)),
(a, conf, p, r) => new UnixTimeExprMeta[ToUnixTimestamp](a, conf, p, r) {
override def shouldFallbackOnAnsiTimestamp: Boolean = SQLConf.get.ansiEnabled

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
if (conf.isImprovedTimestampOpsEnabled) {
// passing the already converted strf string for a little optimization
Expand All @@ -1769,8 +1765,6 @@ object GpuOverrides extends Logging {
.withPsNote(TypeEnum.STRING, "A limited number of formats are supported"),
TypeSig.STRING)),
(a, conf, p, r) => new UnixTimeExprMeta[UnixTimestamp](a, conf, p, r) {
override def shouldFallbackOnAnsiTimestamp: Boolean = SQLConf.get.ansiEnabled

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
if (conf.isImprovedTimestampOpsEnabled) {
// passing the already converted strf string for a little optimization
Expand Down Expand Up @@ -1846,8 +1840,6 @@ object GpuOverrides extends Logging {
.withPsNote(TypeEnum.STRING, "Only a limited number of formats are supported"),
TypeSig.STRING)),
(a, conf, p, r) => new UnixTimeExprMeta[FromUnixTime](a, conf, p, r) {
override def shouldFallbackOnAnsiTimestamp: Boolean = false

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
// passing the already converted strf string for a little optimization
GpuFromUnixTime(lhs, rhs, strfFormat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions.rapids
import com.nvidia.spark.rapids.{ExprChecks, ExprRule, GpuExpression, GpuOverrides, TypeEnum, TypeSig}

import org.apache.spark.sql.catalyst.expressions.{Expression, GetTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuGetTimestamp, UnixTimeExprMeta}

/**
Expand All @@ -39,8 +38,6 @@ object TimeStamp {
.withPsNote(TypeEnum.STRING, "A limited number of formats are supported"),
TypeSig.STRING)),
(a, conf, p, r) => new UnixTimeExprMeta[GetTimestamp](a, conf, p, r) {
override def shouldFallbackOnAnsiTimestamp: Boolean = SQLConf.get.ansiEnabled

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
GpuGetTimestamp(lhs, rhs, sparkFormat, strfFormat)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package org.apache.spark.sql.rapids
import java.util.concurrent.TimeUnit

import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, DType, Scalar}
import com.nvidia.spark.rapids.{Arm, BinaryExprMeta, DataFromReplacementRule, DateUtils, GpuBinaryExpression, GpuColumnVector, GpuExpression, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta}
import com.nvidia.spark.rapids.{Arm, BinaryExprMeta, BoolUtils, DataFromReplacementRule, DateUtils, GpuBinaryExpression, GpuColumnVector, GpuExpression, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta}
import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPolicy}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.ShimBinaryExpression

import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, ExpectsInputTypes, Expression, ImplicitCastInputTypes, NullIntolerant, TimeZoneAwareExpression}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.CalendarInterval
Expand Down Expand Up @@ -177,7 +178,8 @@ abstract class GpuTimeMath(

case class GpuDateAddInterval(start: Expression,
interval: Expression,
timeZoneId: Option[String] = None)
timeZoneId: Option[String] = None,
ansiEnabled: Boolean = SQLConf.get.ansiEnabled)
extends GpuTimeMath(start, interval, timeZoneId) {

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = {
Expand All @@ -203,6 +205,15 @@ case class GpuDateAddInterval(start: Expression,
// the Scala value instead.
// Skip the null check because it wll be detected by the following calls.
val intvl = intvlS.getValue.asInstanceOf[CalendarInterval]

// ANSI mode checking
if(ansiEnabled && intvl.microseconds != 0) {
val msg = "IllegalArgumentException: Cannot add hours, minutes or seconds" +
", milliseconds, microseconds to a date. " +
"If necessary set spark.sql.ansi.enabled to false to bypass this error."
throw new IllegalArgumentException(msg)
}

if (intvl.months != 0) {
throw new UnsupportedOperationException("Months aren't supported at the moment")
}
Expand Down Expand Up @@ -363,17 +374,11 @@ abstract class UnixTimeExprMeta[A <: BinaryExpression with TimeZoneAwareExpressi
rule: DataFromReplacementRule)
extends BinaryExprMeta[A](expr, conf, parent, rule) {

def shouldFallbackOnAnsiTimestamp: Boolean

var sparkFormat: String = _
var strfFormat: String = _
override def tagExprForGpu(): Unit = {
checkTimeZoneId(expr.timeZoneId)

if (shouldFallbackOnAnsiTimestamp) {
willNotWorkOnGpu("ANSI mode is not supported")
}

// Date and Timestamp work too
if (expr.right.dataType == StringType) {
extractStringLit(expr.right) match {
Expand Down Expand Up @@ -499,10 +504,17 @@ object GpuToTimestamp extends Arm {
lhs: GpuColumnVector,
sparkFormat: String,
strfFormat: String,
dtype: DType): ColumnVector = {
dtype: DType,
failOnError: Boolean): ColumnVector = {

// `tsVector` will be closed in replaceSpecialDates
val tsVector = withResource(isTimestamp(lhs.getBase, sparkFormat, strfFormat)) { isTs =>
if(failOnError && !BoolUtils.isAllValidTrue(isTs)) {
// ANSI mode and has invalid value.
// CPU may throw `DateTimeParseException`, `DateTimeException` or `ParseException`
throw new IllegalArgumentException("Exception occurred when parsing timestamp in ANSI mode")
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}

withResource(Scalar.fromNull(dtype)) { nullValue =>
withResource(lhs.getBase.asTimestamp(dtype, strfFormat)) { tsVec =>
isTs.ifElse(tsVec, nullValue)
Expand Down Expand Up @@ -634,6 +646,8 @@ abstract class GpuToTimestamp

val timeParserPolicy = getTimeParserPolicy

val failOnError: Boolean = SQLConf.get.ansiEnabled

override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector = {
throw new IllegalArgumentException("rhs has to be a scalar for the unixtimestamp to work")
}
Expand All @@ -658,7 +672,8 @@ abstract class GpuToTimestamp
lhs,
sparkFormat,
strfFormat,
DType.TIMESTAMP_MICROSECONDS)
DType.TIMESTAMP_MICROSECONDS,
failOnError)
}
} else { // Timestamp or DateType
lhs.getBase.asTimestampMicroseconds()
Expand Down Expand Up @@ -707,7 +722,8 @@ abstract class GpuToTimestampImproved extends GpuToTimestamp {
lhs,
sparkFormat,
strfFormat,
DType.TIMESTAMP_SECONDS)
DType.TIMESTAMP_SECONDS,
failOnError)
}
} else if (lhs.dataType() == DateType){
lhs.getBase.asTimestampSeconds()
Expand Down