Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA][AUDIT][SPARK-37099][SQL] Introduce the group limit of Window for rank-based filter to optimize top-k computation #8208

Closed
abellina opened this issue Apr 28, 2023 · 3 comments · Fixed by #10500
Assignees
Labels
audit_3.5.0 feature request New feature or request performance A performance related task/issue

Comments

@abellina
Copy link
Collaborator

This is a new window exec added in spark 3.5: WindowGroupLimitExec improves some TPCDS queries, specifically q67, when using rank-like functions where they are looking to reduce skew and shuffle writes.

apache/spark@0e8a20e6da

We should look at implementing something similarly for the GPU.

@abellina abellina added feature request New feature or request ? - Needs Triage Need team to review and classify performance A performance related task/issue audit_3.5.0 labels Apr 28, 2023
@revans2
Copy link
Collaborator

revans2 commented May 1, 2023

I agree this looks like a big performance win. They end up doing more sorting of the data in earlier stages, but the GPU is really good at that. This does not look too difficult. It looks like we are going to do a regular window operation followed by a filter on the partial result. which then throws away the rank. We might be able to generalize this pattern for any running window aggregation that follows the pattern. Compute a window, and filter rows based off of the result.

@mythrocks
Copy link
Collaborator

I had initially considered that this would need a CUDF component, to process the filter. I'm now thinking this might be doable directly in the plugin.

We would need to hook up a GpuFilter to filter out rows on a per group basis, based on the rank-function predicate.

I'll file a separate bug with details. In the first pass, I'm inclined to address the case where the entire group fits in memory. I'll consider larger groups in a follow-on.

@revans2
Copy link
Collaborator

revans2 commented Feb 7, 2024

I see a few things that we can do here, each with different levels of performance improvement.

In the simplest case we do a regular window operation like we do today and then add in a filter like @mythrocks suggested.

Longer term it might be nice to see if we could combine the sort and do something like we do with TopN, TakeOrderedAndProject. In factWindowGroupLimitExec on RowNumber, without any partitions is TopN, with the sort order the same as the requiredChildOrdering. But that is the assumption that K is small enough that keeping all of it in memory is going to work out.

mythrocks added a commit that referenced this issue Feb 29, 2024
Fixes #8208.

This commit adds support for `WindowGroupLimitExec` to run on GPU.  This optimization was added in Apache Spark 3.5, to reduce the number of rows that participate in shuffles, for queries that contain filters on the result of ranking functions. For example:

```sql
SELECT foo, bar FROM (
  SELECT foo, bar, 
         RANK() OVER (PARTITION BY foo ORDER BY bar) AS rnk
  FROM mytable )
WHERE rnk < 10
```

Such a query would require a shuffle to bring all rows in a window-group to be made available in the same task.
In Spark 3.5, an optimization was added in [SPARK-37099](https://issues.apache.org/jira/browse/SPARK-37099) to take advantage of the `rnk < 10` predicate to reduce shuffle load.
Specifically, since only 9 (i.e. 10-1) ranks participate in the window function, only those many rows need be shuffled into the task, per input batch.  By pre-filtering rows that can't possibly satisfy the condition, the number of shuffled records can be reduced.

The GPU implementation (i.e. `GpuWindowGroupLimitExec`) differs slightly from the CPU implementation, because it needs to execute on the entire input column batch.  As a result, `GpuWindowGroupLimitExec` runs the rank scan on each input batch, and then filters out ranks that exceed the limit specified in the predicate (`rnk < 10`). After the shuffle, the `RANK()` is calculated again by `GpuRunningWindowExec`, to produce the final result.

The current implementation addresses `RANK()` and `DENSE_RANK` window functions.  Other ranking functions (like `ROW_NUMBER()`) can be added at a later date.

Signed-off-by: MithunR <mythrocks@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
audit_3.5.0 feature request New feature or request performance A performance related task/issue
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants