Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batching support for row-based bounded window functions #9973

Merged
merged 25 commits into from
Dec 13, 2023

Conversation

mythrocks
Copy link
Collaborator

@mythrocks mythrocks commented Dec 6, 2023

Fixes #1860.

This commit adds support for batched processing of window aggregations where the window-extents are row-based and (finitely) bounded.

Example query:

SELECT 
  COUNT(1) OVER (PARTITION BY part ORDER BY ord ROWS BETWEEN 10 PRECEDING AND 20 FOLLOWING),
  MIN(col) OVER (PARTITION BY part ORDER BY ord ROWS BETWEEN 10 PRECEDING AND CURRENT ROW),
  AVG(nuther) OVER (PARTITION BY part ORDER BY ord ROWS BETWEEN CURRENT ROW AND 20 FOLLOWING)
FROM my_table;

The algorithm is described at length in #1860. In brief:

  1. A new exec GpuBatchedBoundedWindowExec is used to batch the input into chunks that fit into GPU memory.
  2. Depending on the window specification, some rows towards the end of the input batch might not have the context (i.e. "following" rows necessary) to finalize their output. Those rows are carried over to the next batch for recomputation.
  3. At every stage, enough rows from the previous batch are carried forward to provide the "preceding" context for the window computation.

Note that window bounds might be specified with negative offsets. These are also supported. As a consequence, LEAD() and LAG() are supported as well.

SELECT
  COUNT(1)  OVER (PARTITION BY part ORDER BY ord ROWS BETWEEN 5 PRECEDING AND -3 FOLLOWING),
  LAG(col, 10)  OVER (PARTITION BY part ORDER BY ord),
  LEAD(col, 5) OVER (PARTITION BY part ORDER BY ord) ...

This implementation falls back to unbatched processing (via GpuWindowExec) if a window's preceding/following bounds exceeds a configurable maximum (defaulting to 100 rows in either direction). This may be reconfigured via:

spark.conf.set("spark.rapids.sql.window.batched.bounded.row.extent", 500)

@mythrocks mythrocks added feature request New feature or request reliability Features to improve reliability or bugs that severly impact the reliability of the plugin labels Dec 6, 2023
@mythrocks mythrocks self-assigned this Dec 6, 2023
@mythrocks mythrocks marked this pull request as draft December 6, 2023 07:20
@mythrocks
Copy link
Collaborator Author

Still WIP. Need to sort out the outputBatching spec, and add tests for fallback.

override def next(): ColumnarBatch = {
var outputBatch: ColumnarBatch = null
while (outputBatch == null && hasNext) {
withResource(getNextInputBatch) { inputCbSpillable =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to have withRetry put in here somewhere. The hard part is making sure that we can roll back any of the caching.

We can calculate/get the inputRowCount, noMoreInput and numUnprocessedInCache without needing to get the input batch from inputCbSpillable so that might make it simpler to add in the retry logic.

I am fine if this is a follow on issue, but we need it fixed at some point.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have to address this in a follow-up. I'm still trying to sort out the missing rows problem.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#10046 will address the withRetry part of the problem.

This now allows for `LEAD()`, `LAG()`, and regular window functions with
negative values for `preceding`,`following` window bounds.
@mythrocks mythrocks changed the title [WIP] Batching support for row-based bounded window functions (with non-negative offsets) [WIP] Batching support for row-based bounded window functions Dec 7, 2023
@mythrocks mythrocks changed the title [WIP] Batching support for row-based bounded window functions Batching support for row-based bounded window functions Dec 7, 2023
This commit fixes the batching.  The new exec should not have to receive
batched input.
@mythrocks mythrocks marked this pull request as ready for review December 7, 2023 02:36
@mythrocks
Copy link
Collaborator Author

Build

revans2
revans2 previously approved these changes Dec 7, 2023
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me

1. Renamed config. '.extent' to '.max'.
2. Fixed documentation for said config.
3. Removed TODOs that were already handled.
@mythrocks
Copy link
Collaborator Author

I'll rebase and retest this shortly, after regenerating docs.

@mythrocks
Copy link
Collaborator Author

Build

@mythrocks
Copy link
Collaborator Author

There seems to be a bug in the handling for LEAD()/LAG(). I'm trying to sort this out now.

@mythrocks
Copy link
Collaborator Author

The failing test is a weird one. I've boiled it down to the following:

@ignore_order(local=True)
@approximate_float
@pytest.mark.parametrize('batch_size', ['1g'], ids=idfn)
@pytest.mark.parametrize('a_b_gen', [long_gen], ids=meta_idfn('partAndOrderBy:'))
@pytest.mark.parametrize('c_gen', [StructGen(children=[['child_int', IntegerGen()]])], ids=idfn)
@allow_non_gpu(*non_utc_allow)
def test_myth_repro(a_b_gen, c_gen, batch_size):
    conf = {'spark.rapids.sql.batchSizeBytes': batch_size}
    base_window_spec = Window.partitionBy('a').orderBy('b', 'c')

    def do_it(spark):
        # Repro FAIL!
        df = spark.read.parquet("/tmp/repro_input_cpu_parquet")
        df = df.withColumn('row_num', f.row_number().over(base_window_spec))
        df = df.withColumn('lead_def_c', f.lead('c', 2, None).over(base_window_spec))
        return df

        # Repro: WORKING!
        # return df.selectExpr(
        #     "ROW_NUMBER() OVER (PARTITION BY a ORDER BY b, c) row_num",
        #     "LEAD(C, 2, NULL)   OVER (PARTITION BY a ORDER BY b, c) lead_def_c"
        # )

    assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf)

Calling ROW_NUMBER() and LEAD(default) leads to the GPU seemingly not returning any rows, compared to CPU:

FAILED ../../src/main/python/window_function_test.py::test_myth_repro[Struct(['child_int', Integer])-partAndOrderBy:Long-1g][DATAGEN_SEED=1702330945, IGNORE_ORDER({'local': True}), APPROXIMATE_FLOAT] - AssertionError: CPU and GPU list have different lengths at [] CPU: 20 GPU: 0

This is very odd. Instrumentation in the GpuBatchedBoundedWindowIterator code indicates that the right number of rows is being returned. Somewhere after the iterator has returned, the rows don't make it to the output.

This does not happen from SQL, for the same query:

    SELECT ROW_NUMBER() OVER (PARTITION BY a ORDER BY b, c) row_num,
           LEAD(c, 2, NULL) OVER(PARTITION BY a ORDER BY b, c) lead_def_c
    FROM my_repro_table

Nor does this repro from the command line. Nor with LEAD() called without a default.

@revans2
Copy link
Collaborator

revans2 commented Dec 11, 2023

This is really odd because ROW_NUMBER is supposed to be a running window agg, so they should not be in the same window operation at all. Unless it is something to do with lead by itself being a problem.

@mythrocks
Copy link
Collaborator Author

This is really odd because ROW_NUMBER is supposed to be a running window agg, so they should not be in the same window operation at all.

You're right about that. And the plan does indicate that these operations are addressed in separate execs:

== Physical Plan ==
GpuColumnarToRow false
+- GpuBatchedBoundedWindow [a#212L, b#213L, c#214, gpulead(c#214, 2, null) gpuwindowspecdefinition(a#212L, b#213L ASC NULLS FIRST, c#214 ASC NULLS FIRST, gpuspecifiedwindowframe(RowFrame, 2, 2)) AS lead_def_c#225, row_num#219], [a#212L], [b#213L ASC NULLS FIRST, c#214 ASC NULLS FIRST]
   +- GpuRunningWindow [a#212L, b#213L, c#214, gpurownumber$() gpuwindowspecdefinition(a#212L, b#213L ASC NULLS FIRST, c#214 ASC NULLS FIRST, gpuspecifiedwindowframe(RowFrame, gpuspecialframeboundary(unboundedpreceding$()), gpuspecialframeboundary(currentrow$()))) AS row_num#219], [a#212L], [b#213L ASC NULLS FIRST, c#214 ASC NULLS FIRST]
...

The progress has been slow, but I was wrong about the following:

Nor does this repro from the command line...

I have a repro from the shell, not just from pytest. The df.show() does return the right number of rows. df.collect() seems to return 0 rows, for this combination of operations.

The operations work fine individually. 😕

@mythrocks
Copy link
Collaborator Author

@revans2 has cracked it: Looks like reordering the execs causes the output columns to be reordered as well.

I'm testing out the fix. I should have an update to this PR shortly.

@mythrocks
Copy link
Collaborator Author

Build

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only request is that we have a follow on issue to add retry to this.

@mythrocks
Copy link
Collaborator Author

mythrocks commented Dec 13, 2023

I'll raise one and start on it shortly.

Edit: I have filed #10046 for the follow on.

@mythrocks mythrocks merged commit 3720faf into NVIDIA:branch-24.02 Dec 13, 2023
37 of 38 checks passed
@mythrocks
Copy link
Collaborator Author

Thank you for the review and guidance, @revans2. This has been merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request reliability Features to improve reliability or bugs that severly impact the reliability of the plugin
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Optimize row based window operations for BOUNDED ranges
2 participants