Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove spark.rapids.sql.nonUTC.enabled configuration option #10038

Merged
merged 4 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 15 additions & 30 deletions integration_tests/src/main/python/date_time_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,17 @@ def test_datediff(data_gen):
@allow_non_gpu(*hms_fallback)
def test_hour():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, timestamp_gen).selectExpr('hour(a)'),
conf = {'spark.rapids.sql.nonUTC.enabled': True})
lambda spark : unary_op_df(spark, timestamp_gen).selectExpr('hour(a)'))

@allow_non_gpu(*hms_fallback)
def test_minute():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, timestamp_gen).selectExpr('minute(a)'),
conf = {'spark.rapids.sql.nonUTC.enabled': True})
lambda spark : unary_op_df(spark, timestamp_gen).selectExpr('minute(a)'))

@allow_non_gpu(*hms_fallback)
def test_second():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, timestamp_gen).selectExpr('second(a)'),
conf = {'spark.rapids.sql.nonUTC.enabled': True})
lambda spark : unary_op_df(spark, timestamp_gen).selectExpr('second(a)'))

def test_quarter():
assert_gpu_and_cpu_are_equal_collect(
Expand Down Expand Up @@ -260,8 +257,7 @@ def test_dayofyear(data_gen):
@allow_non_gpu(*non_supported_tz_allow)
def test_unix_timestamp(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))),
{"spark.rapids.sql.nonUTC.enabled": "true"})
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))))


@pytest.mark.skipif(is_supported_time_zone(), reason='fallback only happens on unsupported timezones')
Expand All @@ -271,17 +267,15 @@ def test_unsupported_fallback_unix_timestamp(data_gen):
assert_gpu_fallback_collect(lambda spark: gen_df(
spark, [("a", data_gen), ("b", string_gen)], length=10).selectExpr(
"unix_timestamp(a, b)"),
"UnixTimestamp",
conf = {'spark.rapids.sql.nonUTC.enabled': True})
"UnixTimestamp")

@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF'])
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
@allow_non_gpu(*non_utc_allow)
def test_to_unix_timestamp(data_gen, ansi_enabled):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"),
conf = {'spark.sql.ansi.enabled': ansi_enabled,
'spark.rapids.sql.nonUTC.enabled': True})
conf = {'spark.sql.ansi.enabled': ansi_enabled})

@pytest.mark.skipif(is_supported_time_zone(), reason='fallback only happens on unsupported timezones')
@allow_non_gpu('ProjectExec')
Expand All @@ -290,34 +284,29 @@ def test_unsupported_fallback_to_unix_timestamp(data_gen):
assert_gpu_fallback_collect(lambda spark: gen_df(
spark, [("a", data_gen), ("b", string_gen)], length=10).selectExpr(
"to_unix_timestamp(a, b)"),
"ToUnixTimestamp",
conf = {'spark.rapids.sql.nonUTC.enabled': True})
"ToUnixTimestamp")

@pytest.mark.parametrize('time_zone', ["Asia/Shanghai", "Iran", "UTC", "UTC+0", "UTC-0", "GMT", "GMT+0", "GMT-0"], ids=idfn)
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
@allow_non_gpu(*non_utc_allow)
def test_from_utc_timestamp(data_gen, time_zone):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)),
conf = {'spark.rapids.sql.nonUTC.enabled': True})
lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)))

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('time_zone', ["PST", "NST", "AST", "America/Los_Angeles", "America/New_York", "America/Chicago"], ids=idfn)
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
def test_from_utc_timestamp_unsupported_timezone_fallback(data_gen, time_zone):
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)),
'FromUTCTimestamp',
conf = {"spark.rapids.sql.nonUTC.enabled": "true"})
'FromUTCTimestamp')

@pytest.mark.parametrize('time_zone', ["UTC", "Asia/Shanghai", "EST", "MST", "VST"], ids=idfn)
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
@allow_non_gpu(*non_utc_allow)
def test_from_utc_timestamp_supported_timezones(data_gen, time_zone):
# TODO: Remove spark.rapids.sql.nonUTC.enabled configuration
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)),
conf = {"spark.rapids.sql.nonUTC.enabled": "true"})
lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)))

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
Expand Down Expand Up @@ -392,7 +381,7 @@ def fun(spark):
def test_unix_timestamp(data_gen, ansi_enabled):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col("a"))),
{'spark.sql.ansi.enabled': ansi_enabled, "spark.rapids.sql.nonUTC.enabled": "true"})
{'spark.sql.ansi.enabled': ansi_enabled})


str_date_and_format_gen = [pytest.param(StringGen('[0-9]{4}/[01][0-9]'),'yyyy/MM', marks=pytest.mark.xfail(reason="cudf does no checks")),
Expand All @@ -411,7 +400,7 @@ def invalid_date_string_df(spark):
def test_string_to_unix_timestamp(data_gen, date_form, ansi_enabled):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form)),
{'spark.sql.ansi.enabled': ansi_enabled, "spark.rapids.sql.nonUTC.enabled": "true"})
{'spark.sql.ansi.enabled': ansi_enabled})

def test_string_to_unix_timestamp_ansi_exception():
assert_gpu_and_cpu_error(
Expand All @@ -425,7 +414,7 @@ def test_string_to_unix_timestamp_ansi_exception():
def test_string_unix_timestamp(data_gen, date_form, ansi_enabled):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen, seed=1).select(f.unix_timestamp(f.col('a'), date_form)),
{'spark.sql.ansi.enabled': ansi_enabled, "spark.rapids.sql.nonUTC.enabled": True})
{'spark.sql.ansi.enabled': ansi_enabled})

def test_string_unix_timestamp_ansi_exception():
assert_gpu_and_cpu_error(
Expand Down Expand Up @@ -488,21 +477,17 @@ def test_date_format_for_time_fall_back(data_gen, date_format):
@pytest.mark.parametrize('data_gen', [LongGen(min_val=int(datetime(1, 2, 1).timestamp()), max_val=int(datetime(9999, 12, 30).timestamp()))], ids=idfn)
@pytest.mark.skipif(not is_supported_time_zone(), reason="not all time zones are supported now, refer to https://github.com/NVIDIA/spark-rapids/issues/6839, please update after all time zones are supported")
def test_from_unixtime(data_gen, date_format):
conf = {'spark.rapids.sql.nonUTC.enabled': True}
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen, length=5).selectExpr("from_unixtime(a, '{}')".format(date_format)),
conf)
lambda spark : unary_op_df(spark, data_gen, length=5).selectExpr("from_unixtime(a, '{}')".format(date_format)))

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('date_format', supported_date_formats, ids=idfn)
# from 0001-02-01 to 9999-12-30 to avoid 'year 0 is out of range'
@pytest.mark.parametrize('data_gen', [LongGen(min_val=int(datetime(1, 2, 1).timestamp()), max_val=int(datetime(9999, 12, 30).timestamp()))], ids=idfn)
@pytest.mark.skipif(is_supported_time_zone(), reason="not all time zones are supported now, refer to https://github.com/NVIDIA/spark-rapids/issues/6839, please update after all time zones are supported")
def test_from_unixtime_fall_back(data_gen, date_format):
conf = {'spark.rapids.sql.nonUTC.enabled': True}
assert_gpu_fallback_collect(lambda spark : unary_op_df(spark, data_gen, length=5).selectExpr("from_unixtime(a, '{}')".format(date_format)),
'ProjectExec',
conf)
'ProjectExec')

unsupported_date_formats = ['F']
@pytest.mark.parametrize('date_format', unsupported_date_formats, ids=idfn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2067,13 +2067,6 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
"The gpu to disk spill bounce buffer must have a positive size")
.createWithDefault(128L * 1024 * 1024)

val NON_UTC_TIME_ZONE_ENABLED =
conf("spark.rapids.sql.nonUTC.enabled")
.doc("An option to enable/disable non-UTC time zone support.")
.internal()
.booleanConf
.createWithDefault(false)

val SPLIT_UNTIL_SIZE_OVERRIDE = conf("spark.rapids.sql.test.overrides.splitUntilSize")
.doc("Only for tests: override the value of GpuDeviceManager.splitUntilSize")
.internal()
Expand Down Expand Up @@ -2835,8 +2828,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val splitUntilSizeOverride: Option[Long] = get(SPLIT_UNTIL_SIZE_OVERRIDE)

lazy val nonUTCTimeZoneEnabled: Boolean = get(NON_UTC_TIME_ZONE_ENABLED)

private val optimizerDefaults = Map(
// this is not accurate because CPU projections do have a cost due to appending values
// to each row that is produced, but this needs to be a really small number because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1113,24 +1113,19 @@ abstract class BaseExprMeta[INPUT <: Expression](
// There are 4 levels of timezone check in GPU plan tag phase:
// Level 1: Check whether an expression is related to timezone. This is achieved by
// [[needTimeZoneCheck]] below.
// Level 2: Check on golden configuration 'spark.rapids.sql.nonUTC.enabled'. If
// yes, we pass to next level timezone check. If not, we only pass UTC case as before.
// Level 3: Check related expression has been implemented with timezone. There is a
// Level 2: Check related expression has been implemented with timezone. There is a
// toggle flag [[isTimeZoneSupported]] for this. If false, fallback to UTC-only check as
// before. If yes, move to next level check. When we add timezone support for a related
// function. [[isTimeZoneSupported]] should be override as true.
// Level 4: Check whether the desired timezone is supported by Gpu kernel.
// Level 3: Check whether the desired timezone is supported by Gpu kernel.
def checkExprForTimezone(): Unit = {
// Level 1 check
if (!needTimeZoneCheck) return

// Level 2 check
if(!conf.nonUTCTimeZoneEnabled) return checkUTCTimezone(this)

// Level 3 check
if (!isTimeZoneSupported) return checkUTCTimezone(this)

// Level 4 check
// Level 3 check
if (!GpuTimeZoneDB.isSupportedTimeZone(getZoneId())) {
willNotWorkOnGpu(TimeZoneDB.timezoneNotSupportedStr(this.wrapped.getClass.toString))
}
Expand Down Expand Up @@ -1200,7 +1195,7 @@ abstract class BaseExprMeta[INPUT <: Expression](
case _ => Cast.needsTimeZone(from, to)
}

// Level 3 timezone checking flag, need to override to true when supports timezone in functions
// Level 2 timezone checking flag, need to override to true when supports timezone in functions
// Useless if it's not timezone related expression defined in [[needTimeZoneCheck]]
def isTimeZoneSupported: Boolean = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,6 @@ class FromUTCTimestampExprMeta(
extends BinaryExprMeta[FromUTCTimestamp](expr, conf, parent, rule) {

private[this] var timezoneId: ZoneId = null
private[this] val nonUTCEnabled: Boolean = conf.nonUTCTimeZoneEnabled

override def tagExprForGpu(): Unit = {
extractStringLit(expr.right) match {
Expand All @@ -1090,27 +1089,19 @@ class FromUTCTimestampExprMeta(
case Some(timezoneShortID) =>
if (timezoneShortID != null) {
timezoneId = GpuTimeZoneDB.getZoneId(timezoneShortID)
// Always pass for UTC timezone since it's no-op.
if (!GpuOverrides.isUTCTimezone(timezoneId)) {
if (nonUTCEnabled) {
if(!GpuTimeZoneDB.isSupportedTimeZone(timezoneShortID)) {
willNotWorkOnGpu(s"Not supported timezone type $timezoneShortID.")
}
} else {
// TODO: remove this once GPU backend was supported.
willNotWorkOnGpu(s"Not supported timezone type $timezoneShortID.")
}
if (!GpuTimeZoneDB.isSupportedTimeZone(timezoneId)) {
willNotWorkOnGpu(s"Not supported timezone type $timezoneShortID.")
}
}
}
}

override def convertToGpu(timestamp: Expression, timezone: Expression): GpuExpression =
GpuFromUTCTimestamp(timestamp, timezone, timezoneId, nonUTCEnabled)
GpuFromUTCTimestamp(timestamp, timezone, timezoneId)
}

case class GpuFromUTCTimestamp(
timestamp: Expression, timezone: Expression, zoneId: ZoneId, nonUTCEnabled: Boolean)
timestamp: Expression, timezone: Expression, zoneId: ZoneId)
extends GpuBinaryExpressionArgsAnyScalar
with ImplicitCastInputTypes
with NullIntolerant {
Expand All @@ -1126,13 +1117,7 @@ case class GpuFromUTCTimestamp(
// For UTC timezone, just a no-op bypassing GPU computation.
lhs.getBase.incRefCount()
} else {
if (nonUTCEnabled){
GpuTimeZoneDB.fromUtcTimestampToTimestamp(lhs.getBase, zoneId)
} else {
// TODO: remove this until GPU backend supported.
throw new UnsupportedOperationException(
s"Not supported timezone type ${zoneId.normalized()}")
}
GpuTimeZoneDB.fromUtcTimestampToTimestamp(lhs.getBase, zoneId)
}
} else {
// All-null output column.
Expand Down