Skip to content

Commit

Permalink
lint and minor, need to think about offset log
Browse files Browse the repository at this point in the history
  • Loading branch information
WweiL committed Sep 20, 2024
1 parent 13a83f7 commit 1f3ea93
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,8 @@ class SparkConnectPlanner(
}

private def groupColsFromDropDuplicates(
colNames: Seq[String], allColumns: Seq[Attribute]): Seq[Attribute] = {
colNames: Seq[String],
allColumns: Seq[Attribute]): Seq[Attribute] = {
val resolver = session.sessionState.analyzer.resolver
// [SPARK-31990][SPARK-49722]: We must keep `toSet.toSeq` here because of the backward
// compatibility issue (the Streaming's state store depends on the `groupCols` order).
Expand Down Expand Up @@ -1226,11 +1227,11 @@ class SparkConnectPlanner(
val resolver = session.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
if (rel.getAllColumnsAsKeys) {
val groupCals = groupColsFromDropDuplicates(allColumns.map(_.name), allColumns)
val groupCols = groupColsFromDropDuplicates(allColumns.map(_.name), allColumns)
if (rel.getWithinWatermark) {
DeduplicateWithinWatermark(groupCals, queryExecution.analyzed)
DeduplicateWithinWatermark(groupCols, queryExecution.analyzed)
} else {
Deduplicate(groupCals, queryExecution.analyzed)
Deduplicate(groupCols, queryExecution.analyzed)
}
} else {
val toGroupColumnNames = rel.getColumnNamesList.asScala.toSeq
Expand Down

0 comments on commit 1f3ea93

Please sign in to comment.