From 929405ab753239e55bd46e9cdb31bb9f8c63b86d Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 27 Oct 2023 10:15:12 +0800 Subject: [PATCH] [SPARK-45575][SQL] Support time travel options for df read API ### What changes were proposed in this pull request? We've added time travel API in DS v2 and a dedicated SQL syntax for it. However, there is no way to do it with df read API. This PR adds time travel options (`timestampAsOf` and `versionAsOf`) to support time travel with df read API. ### Why are the changes needed? feature parity ### Does this PR introduce _any_ user-facing change? Yes, now people can specify time travel in read options. ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #43403 from cloud-fan/time-option. Lead-authored-by: Wenchen Fan Co-authored-by: Wenchen Fan Signed-off-by: Kent Yao --- .../main/resources/error/error-classes.json | 22 ++++-- ...-time-travel-timestamp-expr-error-class.md | 4 + docs/sql-error-conditions.md | 12 +++ .../sql/catalyst/analysis/Analyzer.scala | 19 ++++- .../catalyst/analysis/TimeTravelSpec.scala | 40 +++++++++- .../sql/errors/QueryCompilationErrors.scala | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 14 ++++ .../datasources/v2/DataSourceV2Utils.scala | 3 +- .../sql/connector/DataSourceV2SQLSuite.scala | 77 +++++++++++++------ 9 files changed, 153 insertions(+), 40 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 2ab4af73a8e79..5b756ad107720 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -2192,6 +2192,12 @@ ], "sqlState" : "42K0F" }, + "INVALID_TIME_TRAVEL_SPEC" : { + "message" : [ + "Cannot specify both version and timestamp when time travelling the table." + ], + "sqlState" : "42K0E" + }, "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR" : { "message" : [ "The time travel timestamp expression is invalid." @@ -2207,6 +2213,11 @@ "Must be deterministic." ] }, + "OPTION" : { + "message" : [ + "Timestamp string in the options must be able to cast to TIMESTAMP type." + ] + }, "UNEVALUABLE" : { "message" : [ "Must be evaluable." @@ -2386,6 +2397,12 @@ ], "sqlState" : "42803" }, + "MULTIPLE_TIME_TRAVEL_SPEC" : { + "message" : [ + "Cannot specify time travel in both the time travel clause and options." + ], + "sqlState" : "42K0E" + }, "MULTI_SOURCES_UNSUPPORTED_FOR_EXPRESSION" : { "message" : [ "The expression does not support more than one source." @@ -5132,11 +5149,6 @@ "" ] }, - "_LEGACY_ERROR_TEMP_1334" : { - "message" : [ - "Cannot specify both version and timestamp when time travelling the table." - ] - }, "_LEGACY_ERROR_TEMP_1338" : { "message" : [ "Sinks cannot request distribution and ordering in continuous execution mode." diff --git a/docs/sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.md b/docs/sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.md index 42513bf989bcf..80344662d0909 100644 --- a/docs/sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.md +++ b/docs/sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.md @@ -33,6 +33,10 @@ Cannot be casted to the "TIMESTAMP" type. Must be deterministic. +## OPTION + +Timestamp string in the options must be able to cast to TIMESTAMP type. + ## UNEVALUABLE Must be evaluable. diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 007c95297f7ce..7c537f6fe20e5 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -1222,6 +1222,12 @@ For more details see [INVALID_SUBQUERY_EXPRESSION](sql-error-conditions-invalid- Cannot create the persistent object `` of the type `` because it references to the temporary object `` of the type ``. Please make the temporary object `` persistent, or make the persistent object `` temporary. +### INVALID_TIME_TRAVEL_SPEC + +[SQLSTATE: 42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Cannot specify both version and timestamp when time travelling the table. + ### [INVALID_TIME_TRAVEL_TIMESTAMP_EXPR](sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.html) [SQLSTATE: 42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) @@ -1360,6 +1366,12 @@ For more details see [MISSING_ATTRIBUTES](sql-error-conditions-missing-attribute The query does not include a GROUP BY clause. Add GROUP BY or turn it into the window functions using OVER clauses. +### MULTIPLE_TIME_TRAVEL_SPEC + +[SQLSTATE: 42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Cannot specify time travel in both the time travel clause and options. + ### MULTI_SOURCES_UNSUPPORTED_FOR_EXPRESSION [SQLSTATE: 42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 06d949ece2626..65338f9917bc4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1122,7 +1122,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version) if timestamp.forall(ts => ts.resolved && !SubqueryExpression.hasSubquery(ts)) => - resolveRelation(u, TimeTravelSpec.create(timestamp, version, conf)).getOrElse(r) + val timeTravelSpec = TimeTravelSpec.create(timestamp, version, conf.sessionLocalTimeZone) + resolveRelation(u, timeTravelSpec).getOrElse(r) case u @ UnresolvedTable(identifier, cmd, suggestAlternative) => lookupTableOrView(identifier).map { @@ -1255,17 +1256,27 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor private def resolveRelation( u: UnresolvedRelation, timeTravelSpec: Option[TimeTravelSpec] = None): Option[LogicalPlan] = { - resolveTempView(u.multipartIdentifier, u.isStreaming, timeTravelSpec.isDefined).orElse { + val timeTravelSpecFromOptions = TimeTravelSpec.fromOptions( + u.options, + conf.getConf(SQLConf.TIME_TRAVEL_TIMESTAMP_KEY), + conf.getConf(SQLConf.TIME_TRAVEL_VERSION_KEY), + conf.sessionLocalTimeZone + ) + if (timeTravelSpec.nonEmpty && timeTravelSpecFromOptions.nonEmpty) { + throw new AnalysisException("MULTIPLE_TIME_TRAVEL_SPEC", Map.empty[String, String]) + } + val finalTimeTravelSpec = timeTravelSpec.orElse(timeTravelSpecFromOptions) + resolveTempView(u.multipartIdentifier, u.isStreaming, finalTimeTravelSpec.isDefined).orElse { expandIdentifier(u.multipartIdentifier) match { case CatalogAndIdentifier(catalog, ident) => - val key = ((catalog.name +: ident.namespace :+ ident.name).toSeq, timeTravelSpec) + val key = ((catalog.name +: ident.namespace :+ ident.name).toSeq, finalTimeTravelSpec) AnalysisContext.get.relationCache.get(key).map(_.transform { case multi: MultiInstanceRelation => val newRelation = multi.newInstance() newRelation.copyTagsFrom(multi) newRelation }).orElse { - val table = CatalogV2Util.loadTable(catalog, ident, timeTravelSpec) + val table = CatalogV2Util.loadTable(catalog, ident, finalTimeTravelSpec) val loaded = createRelation(catalog, ident, table, u.options, u.isStreaming) loaded.foreach(AnalysisContext.get.relationCache.update(key, _)) loaded diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala index 26856d9a5e089..8bfcd955497b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, RuntimeReplaceable, SubqueryExpression, Unevaluable} +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal, RuntimeReplaceable, SubqueryExpression, Unevaluable} import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.TimestampType +import org.apache.spark.sql.util.CaseInsensitiveStringMap sealed trait TimeTravelSpec @@ -31,7 +32,7 @@ object TimeTravelSpec { def create( timestamp: Option[Expression], version: Option[String], - conf: SQLConf) : Option[TimeTravelSpec] = { + sessionLocalTimeZone: String) : Option[TimeTravelSpec] = { if (timestamp.nonEmpty && version.nonEmpty) { throw QueryCompilationErrors.invalidTimeTravelSpecError() } else if (timestamp.nonEmpty) { @@ -50,7 +51,7 @@ object TimeTravelSpec { throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.NON_DETERMINISTIC", ts) } - val tz = Some(conf.sessionLocalTimeZone) + val tz = Some(sessionLocalTimeZone) // Set `ansiEnabled` to false, so that it can return null for invalid input and we can provide // better error message. val value = Cast(tsToEval, TimestampType, tz, ansiEnabled = false).eval() @@ -65,4 +66,35 @@ object TimeTravelSpec { None } } + + def fromOptions( + options: CaseInsensitiveStringMap, + timestampKey: String, + versionKey: String, + sessionLocalTimeZone: String): Option[TimeTravelSpec] = { + (Option(options.get(timestampKey)), Option(options.get(versionKey))) match { + case (Some(_), Some(_)) => + throw QueryCompilationErrors.invalidTimeTravelSpecError() + + case (Some(timestampStr), None) => + val timestampValue = Cast( + Literal(timestampStr), + TimestampType, + Some(sessionLocalTimeZone), + ansiEnabled = false + ).eval() + if (timestampValue == null) { + throw new AnalysisException( + "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.OPTION", + Map("expr" -> s"'$timestampStr'") + ) + } + Some(AsOfTimestamp(timestampValue.asInstanceOf[Long])) + + case (None, Some(versionStr)) => + Some(AsOfVersion(versionStr)) + + case _ => None + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 6d1aa076eb081..ae5ebd6a97489 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3343,7 +3343,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat def invalidTimeTravelSpecError(): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1334", + errorClass = "INVALID_TIME_TRAVEL_SPEC", messageParameters = Map.empty) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1e759b6266c61..43dc541fbb9fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4474,6 +4474,20 @@ object SQLConf { .booleanConf .createWithDefault(true) + val TIME_TRAVEL_TIMESTAMP_KEY = + buildConf("spark.sql.timeTravelTimestampKey") + .doc("The option name to specify the time travel timestamp when reading a table.") + .version("4.0.0") + .stringConf + .createWithDefault("timestampAsOf") + + val TIME_TRAVEL_VERSION_KEY = + buildConf("spark.sql.timeTravelVersionKey") + .doc("The option name to specify the time travel table version when reading a table.") + .version("4.0.0") + .stringConf + .createWithDefault("versionAsOf") + val LEGACY_PERCENTILE_DISC_CALCULATION = buildConf("spark.sql.legacy.percentileDiscCalculation") .internal() .doc("If true, the old bogus percentile_disc calculation is used. The old calculation " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index e485300615780..c4e7bf23cace7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -133,7 +133,8 @@ private[sql] object DataSourceV2Utils extends Logging { } else { None } - val timeTravel = TimeTravelSpec.create(timeTravelTimestamp, timeTravelVersion, conf) + val timeTravel = TimeTravelSpec.create( + timeTravelTimestamp, timeTravelVersion, conf.sessionLocalTimeZone) (CatalogV2Util.getTable(catalog, ident, timeTravel), Some(catalog), Some(ident)) case _ => // TODO: Non-catalog paths for DSV2 are currently not well defined. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 3cbbc3786fcda..c2e759efe4026 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2902,10 +2902,14 @@ class DataSourceV2SQLSuiteV1Filter sql(s"INSERT INTO $t2 VALUES (3)") sql(s"INSERT INTO $t2 VALUES (4)") - assert(sql("SELECT * FROM t VERSION AS OF 'Snapshot123456789'").collect() - === Array(Row(1), Row(2))) - assert(sql("SELECT * FROM t VERSION AS OF 2345678910").collect() - === Array(Row(3), Row(4))) + val res1_sql = sql("SELECT * FROM t VERSION AS OF 'Snapshot123456789'").collect() + assert(res1_sql === Array(Row(1), Row(2))) + val res1_df = spark.read.option("versionAsOf", "Snapshot123456789").table("t").collect() + assert(res1_df === Array(Row(1), Row(2))) + val res2_sql = sql("SELECT * FROM t VERSION AS OF 2345678910").collect() + assert(res2_sql === Array(Row(3), Row(4))) + val res2_df = spark.read.option("versionAsOf", "2345678910").table("t").collect() + assert(res2_df === Array(Row(3), Row(4))) } val ts1 = DateTimeUtils.stringToTimestampAnsi( @@ -2928,29 +2932,35 @@ class DataSourceV2SQLSuiteV1Filter sql(s"INSERT INTO $t4 VALUES (7)") sql(s"INSERT INTO $t4 VALUES (8)") - assert(sql("SELECT * FROM t TIMESTAMP AS OF '2019-01-29 00:37:58'").collect() - === Array(Row(5), Row(6))) - assert(sql("SELECT * FROM t TIMESTAMP AS OF '2021-01-29 00:00:00'").collect() - === Array(Row(7), Row(8))) - assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts1InSeconds").collect() - === Array(Row(5), Row(6))) - assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts2InSeconds").collect() - === Array(Row(7), Row(8))) - assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF $ts1InSeconds").collect() - === Array(Row(5), Row(6))) - assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF $ts2InSeconds").collect() - === Array(Row(7), Row(8))) - assert(sql("SELECT * FROM t TIMESTAMP AS OF make_date(2021, 1, 29)").collect() - === Array(Row(7), Row(8))) - assert(sql("SELECT * FROM t TIMESTAMP AS OF to_timestamp('2021-01-29 00:00:00')").collect() - === Array(Row(7), Row(8))) + val res1_sql = sql("SELECT * FROM t TIMESTAMP AS OF '2019-01-29 00:37:58'").collect() + assert(res1_sql === Array(Row(5), Row(6))) + val res1_df = spark.read.option("timestampAsOf", "2019-01-29 00:37:58").table("t").collect() + assert(res1_df === Array(Row(5), Row(6))) + val res2_sql = sql("SELECT * FROM t TIMESTAMP AS OF '2021-01-29 00:00:00'").collect() + assert(res2_sql === Array(Row(7), Row(8))) + val res2_df = spark.read.option("timestampAsOf", "2021-01-29 00:00:00").table("t").collect() + assert(res2_df === Array(Row(7), Row(8))) + + val res3 = sql(s"SELECT * FROM t TIMESTAMP AS OF $ts1InSeconds").collect() + assert(res3 === Array(Row(5), Row(6))) + val res4 = sql(s"SELECT * FROM t TIMESTAMP AS OF $ts2InSeconds").collect() + assert(res4 === Array(Row(7), Row(8))) + val res5 = sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF $ts1InSeconds").collect() + assert(res5 === Array(Row(5), Row(6))) + val res6 = sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF $ts2InSeconds").collect() + assert(res6 === Array(Row(7), Row(8))) + val res7 = sql("SELECT * FROM t TIMESTAMP AS OF make_date(2021, 1, 29)").collect() + assert(res7 === Array(Row(7), Row(8))) + val res8 = sql("SELECT * FROM t TIMESTAMP AS OF to_timestamp('2021-01-29 00:00:00')") + .collect() + assert(res8 === Array(Row(7), Row(8))) // Scalar subquery is also supported. - assert(sql("SELECT * FROM t TIMESTAMP AS OF (SELECT make_date(2021, 1, 29))").collect() - === Array(Row(7), Row(8))) + val res9 = sql("SELECT * FROM t TIMESTAMP AS OF (SELECT make_date(2021, 1, 29))").collect() + assert(res9 === Array(Row(7), Row(8))) // Nested subquery also works - assert( - sql("SELECT * FROM t TIMESTAMP AS OF (SELECT (SELECT make_date(2021, 1, 29)))").collect() - === Array(Row(7), Row(8))) + val res10 = sql("SELECT * FROM t TIMESTAMP AS OF (SELECT (SELECT make_date(2021, 1, 29)))") + .collect() + assert(res10 === Array(Row(7), Row(8))) checkError( exception = intercept[AnalysisException] { @@ -2967,6 +2977,23 @@ class DataSourceV2SQLSuiteV1Filter errorClass = "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.INPUT", parameters = Map("expr" -> "\"abc\"")) + checkError( + exception = intercept[AnalysisException] { + spark.read.option("timestampAsOf", "abc").table("t").collect() + }, + errorClass = "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.OPTION", + parameters = Map("expr" -> "'abc'")) + + checkError( + exception = intercept[AnalysisException] { + spark.read + .option("timestampAsOf", "abc") + .option("versionAsOf", "1") + .table("t") + .collect() + }, + errorClass = "INVALID_TIME_TRAVEL_SPEC") + checkError( exception = intercept[AnalysisException] { sql("SELECT * FROM t TIMESTAMP AS OF current_user()").collect()