Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix NestedLoopJoin performance regression #12531

Merged

Conversation

alihan-synnada
Copy link
Contributor

@alihan-synnada alihan-synnada commented Sep 19, 2024

Which issue does this PR close?

Closes #12528.

Rationale for this change

Iterating over the right rows instead of the left while calling build_join_indices increased the number of calls to apply_join_filter_to_indices in cases where the right table has more rows (which applies to most common use cases). Building the indices inside build_join_indices helps reduce the number of calls to apply_join_filter_to_indices

The same indices are created inside build_join_indices every time with an expensive operation so it makes sense to cache them.

What changes are included in this PR?

This PR builds the indices in one go inside build_join_indices, removing the outer iteration, and reduces the number of calls to apply_join_filter_to_indices to 1 per join_left_and_right_batch call.

Also adds caching for join indices.

Are these changes tested?

Tested with the following query using TPCH data (SF=1)

EXPLAIN ANALYZE SELECT count(1) FROM nation n JOIN lineitem li ON n.n_nationkey < li.l_orderkey;
x join time percent change
before regression 9.421251914s 0%
after regression 51.073303629s +442.11%
fix without cache 10.450086252s +10.92%
fix with cache 7.390388154s -21.56%

Are there any user-facing changes?

Only performance changes.

@github-actions github-actions bot added the physical-expr Physical Expressions label Sep 19, 2024
@alihan-synnada alihan-synnada changed the title Optimize apply_join_filter_to_indices calls Fix NestedLoopJoin performance regression Sep 19, 2024
let capacity = left_row_count * right_row_count;

// Left indices are 0..left_row_count repeated right_row_count times
let mut left_indices_builder = UInt64Array::builder(capacity);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Vec to build the indices is slightly faster and has some nicer syntax.

// Right indices are each right row index repeated left_row_count times
let mut right_indices_builder = UInt32Array::builder(capacity);
for right_index in 0..right_row_count {
right_indices_builder.extend(vec![Some(right_index as u32); left_row_count])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid this intermediate Vec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up

@ozankabak
Copy link
Contributor

/benchmark

@ozankabak ozankabak marked this pull request as ready for review September 19, 2024 15:07
Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but let's get some more eyes on this.

The accidental regression made us look into index calculations more closely, enabling us to optimize the code relative to how it was before the regression-inducing PR.

datafusion/physical-plan/src/joins/nested_loop_join.rs Outdated Show resolved Hide resolved
@berkaysynnada
Copy link
Contributor

cc @korowa, @comphead

Copy link
Contributor

@korowa korowa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall -- I have some doubts regarding if caching really required here -- we can discuss it.

Also I've noticed that this PR increases the size of intermediate batches, which, though, seems to be acceptable until NLJ is allowed to emit massive (significantly more than configured batch size) batches as its output (it works as before and there is an issue for fixing this).


// We always use the same indices before applying the filter, so we can cache them
let (left_indices_cache, right_indices_cache) = indices_cache;
let cached_output_row_count = left_indices_cache.len();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of 25 rows build-side there are 200k arrays, for 500 rows -- 4kk and so on (I suppose we don't need that much data on the build side to reach GBs size for these arrays).

I understand that we still will have to create interemediate batches to apply filter, and produce output batches, but I suppose, that starting from some point the size of these caches will become meaningful.

Copy link
Contributor Author

@alihan-synnada alihan-synnada Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can do away with the cache or make it optional. In case we remove the cache, we could create the indices and apply the filter in chunks similar to before. If we pass in a range that we then use to calculate the indices for instead of creating right_batch.num_rows() chunks, we can control the size of the intermediate batches too. Something like (0..output_row_count).chunks(CHUNK_SIZE) should do the trick, now that we create the indices by mapping the current row index.

I believe it can bring the performance without cache down to a similar level to before the regression, maybe even better. I'll run a few benchmarks with this setup without a cache and update the benchmarks table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chunks approach didn't change the performance, but it helped reduce the sizes of the intermediate batches. The 10% performance hit without a cache comes from the way the arrays are constructed and I couldn't find a faster approach for now. I suggest we go with the cached approach for now. When the issue that enables NLJ to emit massive batches is implemented, we can choose between the cached and chunked approaches depending on NLJ's output size. I'll open an issue about it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I will merge this soon to avoid performance issues in any upcoming release unless there is more feedback. We seem to gain 20% performance relative to how it was before with caches, and we can migrate to a cached-vs-chunked-depending-on-output-batch-size approach in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chunks approach didn't change the performance, but it helped reduce the sizes of the intermediate batches.

Thank you for checking this option

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Sep 20, 2024
@ozankabak ozankabak merged commit 8397855 into apache:main Sep 20, 2024
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

NestedLoopJoin performance regression
5 participants