Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support GpuCollectList and GpuCollectSet as TypedImperativeAggregate #2971

Merged
merged 17 commits into from
Aug 3, 2021

Conversation

sperlingxx
Copy link
Collaborator

@sperlingxx sperlingxx commented Jul 20, 2021

Signed-off-by: sperlingxx lovedreamf@gmail.com

Current PR is to support GpuCollectList and GpuCollectSet as TypedImperativeAggregate, which is the task 3 of #2916. In this PR, we also introduce TypedImperativeAggExprMeta and GpuNoHashAggregateMeta to provide a general support for TypedImperativeAggregate functions.

In addition, Aggregate stacks with TypedImperativeAggregate functions may lead to unexpected crash if the stack falls back to CPU partially, because GPU data types are inconsistent with CPU counterparts. This problem will be fixed in the task 4 of #2916. To avoid this kind of unexpected crash in current, we bring up the "associated fallback" mechanism in this PR, which only affects Aggregate plans containing TypedImperativeAggregate functions.

The "associated fallback" falls back all stages of an Aggregate (logical plan) to CPU once we need to fall back any stage of the plan. The "associated fallback" will be triggered on each final stage of Aggregate which contains TypedImperativeAggregate functions. It traverses the plan tree to collect all stages of current Aggregate (logical plan), and to determine whether to fallback them entirely or not. In addition, the "associated fallback" also works when AQE is on.

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

@jlowe
Copy link
Member

jlowe commented Jul 20, 2021

#2916 was postponed to the 21.10 release. Given we're in burndown, I think this should be retargeted to branch-21.10 when that is available (hopefully soon).

@pxLi
Copy link
Collaborator

pxLi commented Jul 21, 2021

Yes, I am still waiting for cudf to get their CI ready for 21.10 (planned on Jul 22). After that I can start working on setup CICD for our plugin 21.10.0

pre-merge for 21.10 is ready

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

@sperlingxx sperlingxx changed the base branch from branch-21.08 to branch-21.10 July 21, 2021 07:21
@sperlingxx
Copy link
Collaborator Author

#2916 was postponed to the 21.10 release. Given we're in burndown, I think this should be retargeted to branch-21.10 when that is available (hopefully soon).

Re-targeted to the new branch.

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

@abellina abellina self-requested a review July 21, 2021 12:37
@sameerz sameerz added the task Work required that improves the product but is not user facing label Jul 21, 2021
Copy link
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sperlingxx first pass through your changes.

* Base class for metadata around `SortAggregateExec` and `ObjectHashAggregateExec`, which may
* contain TypedImperativeAggregate functions in aggregate expressions.
*/
abstract class GpuNoHashAggregateMeta[INPUT <: SparkPlan](
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is called GpuNoHashAggregateMeta, but ObjectHashAggregateExec inherits from it. I think we should come up with a different name for GpuNoHashAggregateMeta but I can understand why you chose this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because GpuSortAggregateMeta also inherits from it. When spark.sql.execution.useObjectHashAggregateExec is set to False, Spark catalyst will plan a SortAggregateExec instead of ObjectAggregateExec for Aggregate (logical plan) with TypedImperativeAggregate functions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think we can come up with a better name, even a long name like. GpuTypeImperativeSupportedAggregateExecMeta

val column = result.getColumn(i)
val rapidsType = GpuColumnVector.getRapidsType(dataTypes(i))
// extra type conversion check for nested types
if ((rapidsType.equals(DType.LIST) || rapidsType.equals(DType.STRUCT)) &&
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not call typeConversionAllowed for all columns? (i.e. not need to special case LIST and STRUCT)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-nested types, it may happen type casting here, such as casting INT to LONG. Therefore, the check of typeConversionAllowed may fail when the type conversion is necessary. For nested types, no type conversion is necessary (available), which indicates the check is safe. What's more, we didn't match any children types in GpuColumnVector.getRapidsType. So, we check whether they match or not via typeConversionAllowed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to see us handle things in terms of DataType instead of DType. getRapidsType is something we removed when we started to work on nested types because it loses a lot of information and it can easily be misused.

As a side note, are we seeing issues with this? Are we collecting a list/struct and the types are not correct?

sperlingxx and others added 4 commits July 23, 2021 10:50
….scala

Co-authored-by: Alessandro Bellina <abellina@gmail.com>
Co-authored-by: Alessandro Bellina <abellina@gmail.com>
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

@abellina
Copy link
Collaborator

Thanks @sperlingxx. I am sorry for the delay, I'll take a look again today.

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mostly looks good. My main problem with this is

In addition, Aggregate stacks with TypedImperativeAggregate functions may lead to unexpected crash if the stack falls back to CPU partially, because GPU data types are inconsistent with CPU counterparts. This problem will be fixed in the task 4 of #2916. To avoid this kind of unexpected crash in current, we bring up the "associated fallback" mechanism in this PR, which only affects Aggregate plans containing TypedImperativeAggregate functions.

We cannot have CollectList and CollectSet on by default if there are chances that we can crash.

@@ -486,16 +486,32 @@ private static DType toRapidsOrNull(DataType type) {
} else {
return DecimalUtil.createCudfDecimal(dt.precision(), dt.scale());
}
} else if (supportNestedType) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? We removed nested types because a DType.LIST is missing lot of information, and if we are not careful with this type of an API it can cause bugs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. So, I reverted this change.

Copy link
Collaborator Author

@sperlingxx sperlingxx Jul 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"As a side note, are we seeing issues with this? Are we collecting a list/struct and the types are not correct?"

For CollectList and CollectSet, I believe the types are always correct. But I am not sure whether there are some aggregations which we plan to support in future producing the inconsistent nested types.

val column = result.getColumn(i)
val rapidsType = GpuColumnVector.getRapidsType(dataTypes(i))
// extra type conversion check for nested types
if ((rapidsType.equals(DType.LIST) || rapidsType.equals(DType.STRUCT)) &&
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to see us handle things in terms of DataType instead of DType. getRapidsType is something we removed when we started to work on nested types because it loses a lot of information and it can easily be misused.

As a side note, are we seeing issues with this? Are we collecting a list/struct and the types are not correct?

* Base class for metadata around `SortAggregateExec` and `ObjectHashAggregateExec`, which may
* contain TypedImperativeAggregate functions in aggregate expressions.
*/
abstract class GpuNoHashAggregateMeta[INPUT <: SparkPlan](
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think we can come up with a better name, even a long name like. GpuTypeImperativeSupportedAggregateExecMeta

# Queries with multiple distinct aggregations will fallback to CPU if they also contain
# collect aggregations. Because Spark optimizer will insert expressions like `If` and `First`
# when rewriting distinct aggregates, while `GpuIf` and `GpuFirst` doesn't support the datatype
# of collect aggregations (ArrayType).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you dd in references to the issue to support these for GpuIF and GpuFirst? If they do not exist, then could you please file them?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I filed the issue.

count(distinct b),
count(distinct c)
from tbl group by a"""
assert_gpu_and_cpu_are_equal_sql(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically when we have a fallback test we want an assertion that verifies part of the code actually did fall back like assert_gpu_sql_fallback_collect

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the check of fallback capture.

@pytest.mark.parametrize('conf', [_nans_float_conf_partial, _nans_float_conf_final], ids=idfn)
@pytest.mark.parametrize('aqe_enabled', ['true', 'false'], ids=idfn)
def test_hash_groupby_collect_partial_replace_fallback(data_gen, conf, aqe_enabled):
conf.update({'spark.sql.adaptive.enabled': aqe_enabled})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please always copy conf before doing an update. We have seen issues with global values being modified by tests doing this and it is just good practice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@sperlingxx
Copy link
Collaborator Author

This mostly looks good. My main problem with this is

In addition, Aggregate stacks with TypedImperativeAggregate functions may lead to unexpected crash if the stack falls back to CPU partially, because GPU data types are inconsistent with CPU counterparts. This problem will be fixed in the task 4 of #2916. To avoid this kind of unexpected crash in current, we bring up the "associated fallback" mechanism in this PR, which only affects Aggregate plans containing TypedImperativeAggregate functions.

We cannot have CollectList and CollectSet on by default if there are chances that we can crash.

Yes, and I believe we can get rid of the potential crashes with "associated fallback".

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

.groupby('a')
.agg(f.sort_array(f.collect_list('b')), f.sort_array(f.collect_set('b'))),
conf=local_conf)
assert_gpu_fallback_collect(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also verifies that the CPU and the GPU are equal so you don't need both parts.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refined.

cpu_fallback_class_name='ObjectHashAggregateExec',
conf=local_conf)
# test with single Distinct
assert_gpu_and_cpu_are_equal_collect(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too if the tests are slightly different, then lets have a different test function for each test case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refined.

val stageMetas = mutable.ListBuffer[GpuBaseAggregateMeta[_]]()
// Go through all Aggregate stages to check whether all stages is GPU supported. If not,
// we fall back all GPU supported stages to CPU.
if (recursiveCheckForFallback(meta, logicalPlan, stageMetas)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So just to be sure that I understand this correctly. When AQE is not enabled we go through and see if we can fall back or not and if any one of them fell back then we mark all of them as needing to fall back. Is that correct? What about when AQE is enabled and the first aggregation (the partial one) may have already executed? We can mark it to fall back to the CPU, but it will do nothing because it has already executed. How do we handle that case?

Copy link
Collaborator Author

@sperlingxx sperlingxx Aug 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To adapt AQE, I took advantage of gpuSupportedTag which was introduced by @andygrove. I added the line

wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu))

in GpuTypedImperativeSupportedAggregateExecMeta.tagPlanForGpu to retrieve the information about the GPU support which was captured and cached during GpuQueryStagePrepOverrides.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, but does that run over the entire plan at some point, or is it just sections of the plan. If it is the entire plan it would be good to explain that in a comment, because otherwise it looks like we have cases where we can crash.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made up some comments on this section.

meta: GpuTypedImperativeSupportedAggregateExecMeta[_]): Unit = {
// We only run the check for final stages which contain TypedImperativeAggregate.
val needToCheck = meta.agg.aggregateExpressions.exists(e =>
(e.mode == Final || e.mode == Complete) &&
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI Complete means that the entire aggregation is happening in one pass. So there should be no need to check for a corresponding first part of the aggregation, because there should be none. This only shows up on databricks right now, so it is not super simple to test.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed Complete

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also test this on databricks if you have not done so already.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

revans2
revans2 previously approved these changes Aug 2, 2021
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good. I mostly want to be sure that we have run the tests on databricks. And it would be nice to have some of the comments in the fallback code updated to explain how it works with AQE so it is simpler to follow.

assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.groupby('a')
.agg(f.sort_array(f.collect_list('b')), f.count('b')),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems every collect_* function is wrapped in a sort_array. Is that on purpose? Could a comment be added somewhere on why? Especially because we have @ignore_order so I was curious.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is because @ignore_order only ensures the order between rows, while in these cases we also need to take care of the orders of each Array produced by collect ops.
And I added this comment to the test file.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I’d extend ignore_order to do the sorting after the collect for the array case. The reason being you would be able to test the aggregate in another way a user is likely to invoke.

I’m ok if you want to do that as a follow up also.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be fine with an @ignore_array_order or something like that. I'd rather not have @ignore_order cover both.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But do it as a follow on issue if we do it at all.

Copy link
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had at this point the comment on the tests, but otherwise it is looking good so far.

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
task Work required that improves the product but is not user facing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants