diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index e91493188873e..801bd2693af42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -198,13 +198,15 @@ import org.apache.spark.util.collection.Utils */ object RewriteDistinctAggregates extends Rule[LogicalPlan] { private def mustRewrite( - aggregateExpressions: Seq[AggregateExpression], + distinctAggs: Seq[AggregateExpression], groupingExpressions: Seq[Expression]): Boolean = { - // If there are any AggregateExpressions with filter, we need to rewrite the query. - // Also, if there are no grouping expressions and all aggregate expressions are foldable, - // we need to rewrite the query, e.g. SELECT COUNT(DISTINCT 1). - aggregateExpressions.exists(_.filter.isDefined) || (groupingExpressions.isEmpty && - aggregateExpressions.exists(_.aggregateFunction.children.forall(_.foldable))) + // If there are any distinct AggregateExpressions with filter, we need to rewrite the query. + // Also, if there are no grouping expressions and all distinct aggregate expressions are + // foldable, we need to rewrite the query, e.g. SELECT COUNT(DISTINCT 1). Without this case, + // non-grouping aggregation queries with distinct aggregate expressions will be incorrectly + // handled by the aggregation strategy, causing wrong results when working with empty tables. + distinctAggs.exists(_.filter.isDefined) || (groupingExpressions.isEmpty && + distinctAggs.exists(_.aggregateFunction.children.forall(_.foldable))) } private def mayNeedtoRewrite(a: Aggregate): Boolean = { @@ -213,7 +215,6 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { // We need at least two distinct aggregates or the single distinct aggregate group exists filter // clause for this rule because aggregation strategy can handle a single distinct aggregate // group without filter clause. - // This check can produce false-positives, e.g., SUM(DISTINCT a) & COUNT(DISTINCT a). distinctAggs.size > 1 || mustRewrite(distinctAggs, a.groupingExpressions) }