Skip to content

Commit

Permalink
[SPARK-49146][SS] Move assertion errors related to watermark missing …
Browse files Browse the repository at this point in the history
…in append mode streaming queries to error framework

### What changes were proposed in this pull request?

Move assertion errors related to watermark missing in append mode streaming queries to error framework.

### Why are the changes needed?

This is a followup of #43370 to make sure we throw classified user errors rather than assertion errors for better monitoring.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #47656 from bogao007/watermark-error.

Authored-by: bogao007 <bo.gao@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
bogao007 authored and HeartSaVioR committed Aug 13, 2024
1 parent e7e0826 commit 2e96fc2
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 7 deletions.
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5004,6 +5004,12 @@
},
"sqlState" : "0A000"
},
"UNSUPPORTED_STREAMING_OPERATOR_WITHOUT_WATERMARK" : {
"message" : [
"<outputMode> output mode not supported for <statefulOperator> on streaming DataFrames/DataSets without watermark."
],
"sqlState" : "0A000"
},
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY" : {
"message" : [
"Unsupported subquery expression:"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2838,6 +2838,17 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
"invalidValue" -> s"'$invalidValue'"))
}

def unsupportedStreamingOperatorWithoutWatermark(
outputMode: String,
statefulOperator: String): AnalysisException = {
new AnalysisException(
errorClass = "UNSUPPORTED_STREAMING_OPERATOR_WITHOUT_WATERMARK",
messageParameters = Map(
"outputMode" -> outputMode,
"statefulOperator" -> statefulOperator)
)
}

def conflictingPartitionColumnNamesError(
distinctPartColLists: Seq[String],
suspiciousPaths: Seq[Path]): SparkRuntimeException = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,11 +602,11 @@ case class StateStoreSaveExec(
// Update and output only rows being evicted from the StateStore
// Assumption: watermark predicates must be non-empty if append mode is allowed
case Some(Append) =>
assert(watermarkPredicateForDataForLateEvents.isDefined,
"Watermark needs to be defined for streaming aggregation query in append mode")

assert(watermarkPredicateForKeysForEviction.isDefined,
"Watermark needs to be defined for streaming aggregation query in append mode")
if (watermarkPredicateForDataForLateEvents.isEmpty ||
watermarkPredicateForKeysForEviction.isEmpty) {
throw QueryExecutionErrors.unsupportedStreamingOperatorWithoutWatermark(
"Append", "aggregations")
}

allUpdatesTimeMs += timeTakenMs {
val filteredIter = applyRemovingRowsOlderThanWatermark(iter,
Expand Down Expand Up @@ -903,8 +903,10 @@ case class SessionWindowStateStoreSaveExec(
// Update and output only rows being evicted from the StateStore
// Assumption: watermark predicates must be non-empty if append mode is allowed
case Some(Append) =>
assert(watermarkPredicateForDataForEviction.isDefined,
"Watermark needs to be defined for session window query in append mode")
if (watermarkPredicateForDataForEviction.isEmpty) {
throw QueryExecutionErrors.unsupportedStreamingOperatorWithoutWatermark(
"Append", "session window aggregations")
}

allUpdatesTimeMs += timeTakenMs {
putToStore(iter, store)
Expand Down

0 comments on commit 2e96fc2

Please sign in to comment.