Skip to content

Commit

Permalink
Distinct inner join (#10503)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Mar 4, 2024
1 parent a8141bc commit dfc18b2
Showing 1 changed file with 31 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ abstract class BaseHashJoinIterator(
opTime = opTime,
joinTime = joinTime) {
// We can cache this because the build side is not changing
private lazy val streamMagnificationFactor = joinType match {
protected lazy val (streamMagnificationFactor, isDistinctJoin) = joinType match {
case _: InnerLike | LeftOuter | RightOuter | FullOuter =>
built.checkpoint()
withRetryNoSplit {
Expand All @@ -289,7 +289,7 @@ abstract class BaseHashJoinIterator(
}
case _ =>
// existence joins don't change size
1.0
(1.0, false)
}

override def computeNumJoinRows(cb: LazySpillableColumnarBatch): Long = {
Expand Down Expand Up @@ -406,17 +406,19 @@ abstract class BaseHashJoinIterator(
}

/**
* Guess the magnification factor for a stream side batch.
* Guess the magnification factor for a stream side batch and detect if the build side contains
* only unique join keys.
* This is temporary until cudf gives us APIs to get the actual gather map size.
*/
private def guessStreamMagnificationFactor(builtKeys: ColumnarBatch): Double = {
private def guessStreamMagnificationFactor(builtKeys: ColumnarBatch): (Double, Boolean) = {
// Based off of the keys on the build side guess at how many output rows there
// will be for each input row on the stream side. This does not take into account
// the join type, data skew or even if the keys actually match.
withResource(countGroups(builtKeys)) { builtCount =>
val isDistinct = builtCount.getRowCount == builtKeys.numRows()
val counts = builtCount.getColumn(builtCount.getNumberOfColumns - 1)
withResource(counts.reduce(ReductionAggregation.mean(), DType.FLOAT64)) { scalarAverage =>
scalarAverage.getDouble
(scalarAverage.getDouble, isDistinct)
}
}
}
Expand Down Expand Up @@ -466,20 +468,32 @@ class HashJoinIterator(
rightKeys: Table,
rightData: LazySpillableColumnarBatch): Option[JoinGatherer] = {
withResource(new NvtxWithMetrics("hash join gather map", NvtxColor.ORANGE, joinTime)) { _ =>
val maps = joinType match {
case LeftOuter => leftKeys.leftJoinGatherMaps(rightKeys, compareNullsEqual)
case RightOuter =>
// Reverse the output of the join, because we expect the right gather map to
// always be on the right
rightKeys.leftJoinGatherMaps(leftKeys, compareNullsEqual).reverse
case _: InnerLike => leftKeys.innerJoinGatherMaps(rightKeys, compareNullsEqual)
case LeftSemi => Array(leftKeys.leftSemiJoinGatherMap(rightKeys, compareNullsEqual))
case LeftAnti => Array(leftKeys.leftAntiJoinGatherMap(rightKeys, compareNullsEqual))
case _ =>
throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" +
// hack to work around unique_join not handling empty tables
if (joinType.isInstanceOf[InnerLike] &&
(leftKeys.getRowCount == 0 || rightKeys.getRowCount == 0)) {
None
} else {
val maps = joinType match {
case LeftOuter => leftKeys.leftJoinGatherMaps(rightKeys, compareNullsEqual)
case RightOuter =>
// Reverse the output of the join, because we expect the right gather map to
// always be on the right
rightKeys.leftJoinGatherMaps(leftKeys, compareNullsEqual).reverse
case _: InnerLike if isDistinctJoin =>
if (buildSide == GpuBuildRight) {
leftKeys.innerDistinctJoinGatherMaps(rightKeys, compareNullsEqual)
} else {
rightKeys.innerDistinctJoinGatherMaps(leftKeys, compareNullsEqual).reverse
}
case _: InnerLike => leftKeys.innerJoinGatherMaps(rightKeys, compareNullsEqual)
case LeftSemi => Array(leftKeys.leftSemiJoinGatherMap(rightKeys, compareNullsEqual))
case LeftAnti => Array(leftKeys.leftAntiJoinGatherMap(rightKeys, compareNullsEqual))
case _ =>
throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" +
s" supported")
}
makeGatherer(maps, leftData, rightData, joinType)
}
makeGatherer(maps, leftData, rightData, joinType)
}
}
}
Expand Down

0 comments on commit dfc18b2

Please sign in to comment.