Skip to content

Commit

Permalink
[SPARK-45575][SQL] Support time travel options for df read API
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

We've added time travel API in DS v2 and a dedicated SQL syntax for it. However, there is no way to do it with df read API. This PR adds time travel options (`timestampAsOf` and `versionAsOf`) to support time travel with df read API.

### Why are the changes needed?

feature parity

### Does this PR introduce _any_ user-facing change?

Yes, now people can specify time travel in read options.

### How was this patch tested?

new tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #43403 from cloud-fan/time-option.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
2 people authored and yaooqinn committed Oct 27, 2023
1 parent 519b582 commit 929405a
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 40 deletions.
22 changes: 17 additions & 5 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2192,6 +2192,12 @@
],
"sqlState" : "42K0F"
},
"INVALID_TIME_TRAVEL_SPEC" : {
"message" : [
"Cannot specify both version and timestamp when time travelling the table."
],
"sqlState" : "42K0E"
},
"INVALID_TIME_TRAVEL_TIMESTAMP_EXPR" : {
"message" : [
"The time travel timestamp expression <expr> is invalid."
Expand All @@ -2207,6 +2213,11 @@
"Must be deterministic."
]
},
"OPTION" : {
"message" : [
"Timestamp string in the options must be able to cast to TIMESTAMP type."
]
},
"UNEVALUABLE" : {
"message" : [
"Must be evaluable."
Expand Down Expand Up @@ -2386,6 +2397,12 @@
],
"sqlState" : "42803"
},
"MULTIPLE_TIME_TRAVEL_SPEC" : {
"message" : [
"Cannot specify time travel in both the time travel clause and options."
],
"sqlState" : "42K0E"
},
"MULTI_SOURCES_UNSUPPORTED_FOR_EXPRESSION" : {
"message" : [
"The expression <expr> does not support more than one source."
Expand Down Expand Up @@ -5132,11 +5149,6 @@
"<errorMessage>"
]
},
"_LEGACY_ERROR_TEMP_1334" : {
"message" : [
"Cannot specify both version and timestamp when time travelling the table."
]
},
"_LEGACY_ERROR_TEMP_1338" : {
"message" : [
"Sinks cannot request distribution and ordering in continuous execution mode."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ Cannot be casted to the "TIMESTAMP" type.

Must be deterministic.

## OPTION

Timestamp string in the options must be able to cast to TIMESTAMP type.

## UNEVALUABLE

Must be evaluable.
Expand Down
12 changes: 12 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,12 @@ For more details see [INVALID_SUBQUERY_EXPRESSION](sql-error-conditions-invalid-

Cannot create the persistent object `<objName>` of the type `<obj>` because it references to the temporary object `<tempObjName>` of the type `<tempObj>`. Please make the temporary object `<tempObjName>` persistent, or make the persistent object `<objName>` temporary.

### INVALID_TIME_TRAVEL_SPEC

[SQLSTATE: 42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Cannot specify both version and timestamp when time travelling the table.

### [INVALID_TIME_TRAVEL_TIMESTAMP_EXPR](sql-error-conditions-invalid-time-travel-timestamp-expr-error-class.html)

[SQLSTATE: 42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down Expand Up @@ -1360,6 +1366,12 @@ For more details see [MISSING_ATTRIBUTES](sql-error-conditions-missing-attribute

The query does not include a GROUP BY clause. Add GROUP BY or turn it into the window functions using OVER clauses.

### MULTIPLE_TIME_TRAVEL_SPEC

[SQLSTATE: 42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Cannot specify time travel in both the time travel clause and options.

### MULTI_SOURCES_UNSUPPORTED_FOR_EXPRESSION

[SQLSTATE: 42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor

case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version)
if timestamp.forall(ts => ts.resolved && !SubqueryExpression.hasSubquery(ts)) =>
resolveRelation(u, TimeTravelSpec.create(timestamp, version, conf)).getOrElse(r)
val timeTravelSpec = TimeTravelSpec.create(timestamp, version, conf.sessionLocalTimeZone)
resolveRelation(u, timeTravelSpec).getOrElse(r)

case u @ UnresolvedTable(identifier, cmd, suggestAlternative) =>
lookupTableOrView(identifier).map {
Expand Down Expand Up @@ -1255,17 +1256,27 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
private def resolveRelation(
u: UnresolvedRelation,
timeTravelSpec: Option[TimeTravelSpec] = None): Option[LogicalPlan] = {
resolveTempView(u.multipartIdentifier, u.isStreaming, timeTravelSpec.isDefined).orElse {
val timeTravelSpecFromOptions = TimeTravelSpec.fromOptions(
u.options,
conf.getConf(SQLConf.TIME_TRAVEL_TIMESTAMP_KEY),
conf.getConf(SQLConf.TIME_TRAVEL_VERSION_KEY),
conf.sessionLocalTimeZone
)
if (timeTravelSpec.nonEmpty && timeTravelSpecFromOptions.nonEmpty) {
throw new AnalysisException("MULTIPLE_TIME_TRAVEL_SPEC", Map.empty[String, String])
}
val finalTimeTravelSpec = timeTravelSpec.orElse(timeTravelSpecFromOptions)
resolveTempView(u.multipartIdentifier, u.isStreaming, finalTimeTravelSpec.isDefined).orElse {
expandIdentifier(u.multipartIdentifier) match {
case CatalogAndIdentifier(catalog, ident) =>
val key = ((catalog.name +: ident.namespace :+ ident.name).toSeq, timeTravelSpec)
val key = ((catalog.name +: ident.namespace :+ ident.name).toSeq, finalTimeTravelSpec)
AnalysisContext.get.relationCache.get(key).map(_.transform {
case multi: MultiInstanceRelation =>
val newRelation = multi.newInstance()
newRelation.copyTagsFrom(multi)
newRelation
}).orElse {
val table = CatalogV2Util.loadTable(catalog, ident, timeTravelSpec)
val table = CatalogV2Util.loadTable(catalog, ident, finalTimeTravelSpec)
val loaded = createRelation(catalog, ident, table, u.options, u.isStreaming)
loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
loaded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, RuntimeReplaceable, SubqueryExpression, Unevaluable}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal, RuntimeReplaceable, SubqueryExpression, Unevaluable}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

sealed trait TimeTravelSpec

Expand All @@ -31,7 +32,7 @@ object TimeTravelSpec {
def create(
timestamp: Option[Expression],
version: Option[String],
conf: SQLConf) : Option[TimeTravelSpec] = {
sessionLocalTimeZone: String) : Option[TimeTravelSpec] = {
if (timestamp.nonEmpty && version.nonEmpty) {
throw QueryCompilationErrors.invalidTimeTravelSpecError()
} else if (timestamp.nonEmpty) {
Expand All @@ -50,7 +51,7 @@ object TimeTravelSpec {
throw QueryCompilationErrors.invalidTimestampExprForTimeTravel(
"INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.NON_DETERMINISTIC", ts)
}
val tz = Some(conf.sessionLocalTimeZone)
val tz = Some(sessionLocalTimeZone)
// Set `ansiEnabled` to false, so that it can return null for invalid input and we can provide
// better error message.
val value = Cast(tsToEval, TimestampType, tz, ansiEnabled = false).eval()
Expand All @@ -65,4 +66,35 @@ object TimeTravelSpec {
None
}
}

def fromOptions(
options: CaseInsensitiveStringMap,
timestampKey: String,
versionKey: String,
sessionLocalTimeZone: String): Option[TimeTravelSpec] = {
(Option(options.get(timestampKey)), Option(options.get(versionKey))) match {
case (Some(_), Some(_)) =>
throw QueryCompilationErrors.invalidTimeTravelSpecError()

case (Some(timestampStr), None) =>
val timestampValue = Cast(
Literal(timestampStr),
TimestampType,
Some(sessionLocalTimeZone),
ansiEnabled = false
).eval()
if (timestampValue == null) {
throw new AnalysisException(
"INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.OPTION",
Map("expr" -> s"'$timestampStr'")
)
}
Some(AsOfTimestamp(timestampValue.asInstanceOf[Long]))

case (None, Some(versionStr)) =>
Some(AsOfVersion(versionStr))

case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3343,7 +3343,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat

def invalidTimeTravelSpecError(): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1334",
errorClass = "INVALID_TIME_TRAVEL_SPEC",
messageParameters = Map.empty)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4474,6 +4474,20 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val TIME_TRAVEL_TIMESTAMP_KEY =
buildConf("spark.sql.timeTravelTimestampKey")
.doc("The option name to specify the time travel timestamp when reading a table.")
.version("4.0.0")
.stringConf
.createWithDefault("timestampAsOf")

val TIME_TRAVEL_VERSION_KEY =
buildConf("spark.sql.timeTravelVersionKey")
.doc("The option name to specify the time travel table version when reading a table.")
.version("4.0.0")
.stringConf
.createWithDefault("versionAsOf")

val LEGACY_PERCENTILE_DISC_CALCULATION = buildConf("spark.sql.legacy.percentileDiscCalculation")
.internal()
.doc("If true, the old bogus percentile_disc calculation is used. The old calculation " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ private[sql] object DataSourceV2Utils extends Logging {
} else {
None
}
val timeTravel = TimeTravelSpec.create(timeTravelTimestamp, timeTravelVersion, conf)
val timeTravel = TimeTravelSpec.create(
timeTravelTimestamp, timeTravelVersion, conf.sessionLocalTimeZone)
(CatalogV2Util.getTable(catalog, ident, timeTravel), Some(catalog), Some(ident))
case _ =>
// TODO: Non-catalog paths for DSV2 are currently not well defined.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2902,10 +2902,14 @@ class DataSourceV2SQLSuiteV1Filter
sql(s"INSERT INTO $t2 VALUES (3)")
sql(s"INSERT INTO $t2 VALUES (4)")

assert(sql("SELECT * FROM t VERSION AS OF 'Snapshot123456789'").collect()
=== Array(Row(1), Row(2)))
assert(sql("SELECT * FROM t VERSION AS OF 2345678910").collect()
=== Array(Row(3), Row(4)))
val res1_sql = sql("SELECT * FROM t VERSION AS OF 'Snapshot123456789'").collect()
assert(res1_sql === Array(Row(1), Row(2)))
val res1_df = spark.read.option("versionAsOf", "Snapshot123456789").table("t").collect()
assert(res1_df === Array(Row(1), Row(2)))
val res2_sql = sql("SELECT * FROM t VERSION AS OF 2345678910").collect()
assert(res2_sql === Array(Row(3), Row(4)))
val res2_df = spark.read.option("versionAsOf", "2345678910").table("t").collect()
assert(res2_df === Array(Row(3), Row(4)))
}

val ts1 = DateTimeUtils.stringToTimestampAnsi(
Expand All @@ -2928,29 +2932,35 @@ class DataSourceV2SQLSuiteV1Filter
sql(s"INSERT INTO $t4 VALUES (7)")
sql(s"INSERT INTO $t4 VALUES (8)")

assert(sql("SELECT * FROM t TIMESTAMP AS OF '2019-01-29 00:37:58'").collect()
=== Array(Row(5), Row(6)))
assert(sql("SELECT * FROM t TIMESTAMP AS OF '2021-01-29 00:00:00'").collect()
=== Array(Row(7), Row(8)))
assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts1InSeconds").collect()
=== Array(Row(5), Row(6)))
assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts2InSeconds").collect()
=== Array(Row(7), Row(8)))
assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF $ts1InSeconds").collect()
=== Array(Row(5), Row(6)))
assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF $ts2InSeconds").collect()
=== Array(Row(7), Row(8)))
assert(sql("SELECT * FROM t TIMESTAMP AS OF make_date(2021, 1, 29)").collect()
=== Array(Row(7), Row(8)))
assert(sql("SELECT * FROM t TIMESTAMP AS OF to_timestamp('2021-01-29 00:00:00')").collect()
=== Array(Row(7), Row(8)))
val res1_sql = sql("SELECT * FROM t TIMESTAMP AS OF '2019-01-29 00:37:58'").collect()
assert(res1_sql === Array(Row(5), Row(6)))
val res1_df = spark.read.option("timestampAsOf", "2019-01-29 00:37:58").table("t").collect()
assert(res1_df === Array(Row(5), Row(6)))
val res2_sql = sql("SELECT * FROM t TIMESTAMP AS OF '2021-01-29 00:00:00'").collect()
assert(res2_sql === Array(Row(7), Row(8)))
val res2_df = spark.read.option("timestampAsOf", "2021-01-29 00:00:00").table("t").collect()
assert(res2_df === Array(Row(7), Row(8)))

val res3 = sql(s"SELECT * FROM t TIMESTAMP AS OF $ts1InSeconds").collect()
assert(res3 === Array(Row(5), Row(6)))
val res4 = sql(s"SELECT * FROM t TIMESTAMP AS OF $ts2InSeconds").collect()
assert(res4 === Array(Row(7), Row(8)))
val res5 = sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF $ts1InSeconds").collect()
assert(res5 === Array(Row(5), Row(6)))
val res6 = sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF $ts2InSeconds").collect()
assert(res6 === Array(Row(7), Row(8)))
val res7 = sql("SELECT * FROM t TIMESTAMP AS OF make_date(2021, 1, 29)").collect()
assert(res7 === Array(Row(7), Row(8)))
val res8 = sql("SELECT * FROM t TIMESTAMP AS OF to_timestamp('2021-01-29 00:00:00')")
.collect()
assert(res8 === Array(Row(7), Row(8)))
// Scalar subquery is also supported.
assert(sql("SELECT * FROM t TIMESTAMP AS OF (SELECT make_date(2021, 1, 29))").collect()
=== Array(Row(7), Row(8)))
val res9 = sql("SELECT * FROM t TIMESTAMP AS OF (SELECT make_date(2021, 1, 29))").collect()
assert(res9 === Array(Row(7), Row(8)))
// Nested subquery also works
assert(
sql("SELECT * FROM t TIMESTAMP AS OF (SELECT (SELECT make_date(2021, 1, 29)))").collect()
=== Array(Row(7), Row(8)))
val res10 = sql("SELECT * FROM t TIMESTAMP AS OF (SELECT (SELECT make_date(2021, 1, 29)))")
.collect()
assert(res10 === Array(Row(7), Row(8)))

checkError(
exception = intercept[AnalysisException] {
Expand All @@ -2967,6 +2977,23 @@ class DataSourceV2SQLSuiteV1Filter
errorClass = "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.INPUT",
parameters = Map("expr" -> "\"abc\""))

checkError(
exception = intercept[AnalysisException] {
spark.read.option("timestampAsOf", "abc").table("t").collect()
},
errorClass = "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.OPTION",
parameters = Map("expr" -> "'abc'"))

checkError(
exception = intercept[AnalysisException] {
spark.read
.option("timestampAsOf", "abc")
.option("versionAsOf", "1")
.table("t")
.collect()
},
errorClass = "INVALID_TIME_TRAVEL_SPEC")

checkError(
exception = intercept[AnalysisException] {
sql("SELECT * FROM t TIMESTAMP AS OF current_user()").collect()
Expand Down

0 comments on commit 929405a

Please sign in to comment.