Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Support GpuCollectList/GpuCollectSet in groupBy aggregation #2804

Closed

Conversation

sperlingxx
Copy link
Collaborator

@sperlingxx sperlingxx commented Jun 24, 2021

Current PR is to support GpuCollectList and GpuCollectSet in groupBy aggregation.

CollectList and CollectSet are the first two TypedImperativeAggregate, which we attempt to provide GPU support. Unlike DeclativeAggregate and other ImperativeAggregate (such as PivotFirst), TypedImperativeAggregate will create aggBufferAttributes of BinaryType (for serialization). Meanwhile, in GPU counterparts, we store aggBufferAttributes just as datatype of update expressions (Array[elementType] for collectOps). Therefore, we involve three additional tasks in this PR:

  1. Replaces Binary typed aggBufferAttributes with corresponding GPU buffers, which is done in GpuObjectHashAggregateMeta.convertToGpu. Although ObjectHashAggregateExec doesn't cover all cases of TypedImperativeAggregate, it covers the most common usages and we can support another situations later (for instance, aggregation contains multiple distinct, which rewritten by optimizer).
  2. Bypasses the type check system through marking TypeSig.BINARY as plugin-supported for ObjectHashAggregateExec, HashPartition and ShuffleExchangeExec. And checks the validity of BinaryTypes in the tagForGpu methods.
  3. Reworked (simplified) GpuHashAggregateExec.setupReferences to make it work with TypedImperativeAggregate in PartialMerge mode.

I labeled this PR as WIP, since I am not quite confident whether the above approaches are appropriate or not.

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@@ -371,6 +374,107 @@ def test_hash_reduction_pivot_without_nans(data_gen, conf):
.agg(f.sum('c')),
conf=conf)

_repeat_agg_column_for_collect_op = [
Copy link
Collaborator

@ttnghia ttnghia Jun 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "hash_aggregate"? In cudf, collect list/set are only sort_aggregages.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ttnghia This test is not named after how cudf implements something. What is more cudf hides that detail from users so any test outside of cudf itself That is a hidden implementation detail. It is named after how Spark implemented it, and we are trying to get coverage on that operator.

That said CollectList and CollectSet apparently are showing up as ObjectHashAggregateExec and there are also some SortAggregateExec tests in here too. At some point we should either split this up into separate files or rename it.

throw new UnsupportedOperationException("CollectSet is not yet supported in reduction")
override lazy val mergeReductionAggregate: cudf.ColumnVector => cudf.Scalar =
throw new UnsupportedOperationException("CollectSet is not yet supported in reduction")
override lazy val updateAggregate: Aggregation = Aggregation.collectSet()
Copy link
Collaborator

@ttnghia ttnghia Jun 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Humnn, is there any way to call Aggregation.collectList if the data is partitioned into more than one batch? As we only need lists (which may contain duplicates) for the intermediate results. Calling collectSet to generate the intermediate results is expensive, as that involves unnecessarily executing drop_list_duplicates on the temporary lists.

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lot of change and I think we need to look into what is the proper way to deal with ObjectHashAggregate. It feels kind of hacked together and I want to get a real design put in place before we push this in.

@@ -371,6 +374,107 @@ def test_hash_reduction_pivot_without_nans(data_gen, conf):
.agg(f.sum('c')),
conf=conf)

_repeat_agg_column_for_collect_op = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ttnghia This test is not named after how cudf implements something. What is more cudf hides that detail from users so any test outside of cudf itself That is a hidden implementation detail. It is named after how Spark implemented it, and we are trying to get coverage on that operator.

That said CollectList and CollectSet apparently are showing up as ObjectHashAggregateExec and there are also some SortAggregateExec tests in here too. At some point we should either split this up into separate files or rename it.

(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.STRUCT).nested(),
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.BINARY +
TypeSig.STRUCT).nested()
.withPsNote(TypeEnum.BINARY, "Marking BINARY as plugin-supported is only to " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly does this mean and how does this help an end user?

.withPsNote(TypeEnum.STRUCT, "Round-robin partitioning is not supported for nested " +
s"structs if ${SQLConf.SORT_BEFORE_REPARTITION.key} is true")
.withPsNote(TypeEnum.ARRAY, "Round-robin partitioning is not supported if " +
s"${SQLConf.SORT_BEFORE_REPARTITION.key} is true")
.withPsNote(TypeEnum.MAP, "Round-robin partitioning is not supported if " +
s"${SQLConf.SORT_BEFORE_REPARTITION.key} is true"),
s"${SQLConf.SORT_BEFORE_REPARTITION.key} is true")
.withPsNote(TypeEnum.BINARY, "Marking BINARY as plugin-supported is only to " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a restriction on this?

@@ -3004,6 +3034,19 @@ object GpuOverrides {
.withPsNote(TypeEnum.STRUCT, "not allowed for grouping expressions"),
TypeSig.all),
(agg, conf, p, r) => new GpuHashAggregateMeta(agg, conf, p, r)),
exec[ObjectHashAggregateExec](
"The backend for hash based aggregations",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a clearer description to distinguish it from the regular HashAggregateExec.


def filterNonAggBufBinaryExpressions(expressions: Seq[Expression]): Seq[Expression] = {
expressions.filter {
case AttributeReference("buf", BinaryType, _, _) => false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What makes "buf" special? This is not good enough.

mutableAggBufferOffset: Int = 0,
inputAggBufferOffset: Int = 0)
extends GpuCollectBase[CollectSetAggregation] {

override lazy val updateExpressions: Seq[GpuExpression] = new CudfCollectSet(inputBuf) :: Nil

override lazy val mergeExpressions: Seq[GpuExpression] = new CudfMergeSets(outputBuf) :: Nil
Copy link
Collaborator

@ttnghia ttnghia Jun 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking of totally removing CudfCollectSet and CudfMergeSet. We just call CudfCollectList in updateExpressions, and CudfMergeLists in mergeExpressions. During evaluateExpression, we call CudfDropListDuplicates on the merged lists.

@jlowe
Copy link
Member

jlowe commented Jul 23, 2021

Is this superceded by #2971?

@sperlingxx
Copy link
Collaborator Author

Is this superceded by #2971?

Yes, I closed it.

@sperlingxx sperlingxx closed this Jul 26, 2021
@sperlingxx sperlingxx deleted the collect_ops_groupby branch December 2, 2021 02:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants