Skip to content

Commit

Permalink
Enable GpuShuffledSymmetricHashJoin by default (#10418)
Browse files Browse the repository at this point in the history
* Enable GpuShuffledSymmetricHashJoin by default

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Fix unit tests

* Update exception message to include build side

* Fix isHostBatchProducer to recognize ReuseExchangedExec

---------

Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Feb 22, 2024
1 parent e5924be commit 5be63f9
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 72 deletions.
54 changes: 40 additions & 14 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

pytestmark = [pytest.mark.nightly_resource_consuming_test]

all_join_types = ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter']
all_non_symmetric_join_types = ['Left', 'Right', 'LeftSemi', 'LeftAnti', 'Cross']
all_symmetric_join_types = ['Inner', 'FullOuter']
all_join_types = all_non_symmetric_join_types + all_symmetric_join_types

all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
BooleanGen(), DateGen(), TimestampGen(), null_gen,
Expand Down Expand Up @@ -208,13 +210,7 @@ def do_join(spark):
# 3 times smaller than the other side. So it is not likely to happen
# unless we can give it some help. Parameters are setup to try to make
# this happen, if test fails something might have changed related to that.
@validate_execs_in_gpu_plan('GpuShuffledHashJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', basic_nested_gens + [decimal_gen_128bit], ids=idfn)
@pytest.mark.parametrize('join_type', all_join_types, ids=idfn)
@pytest.mark.parametrize('sub_part_enabled', ['false', 'true'], ids=['SubPartition_OFF', 'SubPartition_ON'])
@allow_non_gpu(*non_utc_allow)
def test_hash_join_ridealong(data_gen, join_type, sub_part_enabled):
def hash_join_ridealong(data_gen, join_type, sub_part_enabled):
def do_join(spark):
left, right = create_ridealong_df(spark, short_gen, data_gen, 50, 500)
return left.join(right, left.key == right.r_key, join_type)
Expand All @@ -223,6 +219,24 @@ def do_join(spark):
})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_all_conf)

@validate_execs_in_gpu_plan('GpuShuffledHashJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', basic_nested_gens + [decimal_gen_128bit], ids=idfn)
@pytest.mark.parametrize('join_type', all_non_symmetric_join_types, ids=idfn)
@pytest.mark.parametrize('sub_part_enabled', ['false', 'true'], ids=['SubPartition_OFF', 'SubPartition_ON'])
@allow_non_gpu(*non_utc_allow)
def test_hash_join_ridealong_non_symmetric(data_gen, join_type, sub_part_enabled):
hash_join_ridealong(data_gen, join_type, sub_part_enabled)

@validate_execs_in_gpu_plan('GpuShuffledSymmetricHashJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', basic_nested_gens + [decimal_gen_128bit], ids=idfn)
@pytest.mark.parametrize('join_type', all_symmetric_join_types, ids=idfn)
@pytest.mark.parametrize('sub_part_enabled', ['false', 'true'], ids=['SubPartition_OFF', 'SubPartition_ON'])
@allow_non_gpu(*non_utc_allow)
def test_hash_join_ridealong_symmetric(data_gen, join_type, sub_part_enabled):
hash_join_ridealong(data_gen, join_type, sub_part_enabled)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
Expand Down Expand Up @@ -969,12 +983,7 @@ def do_join(spark):

limited_integral_gens = [byte_gen, ShortGen(max_val=BYTE_MAX), IntegerGen(max_val=BYTE_MAX), LongGen(max_val=BYTE_MAX)]

@validate_execs_in_gpu_plan('GpuShuffledHashJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('left_gen', limited_integral_gens, ids=idfn)
@pytest.mark.parametrize('right_gen', limited_integral_gens, ids=idfn)
@pytest.mark.parametrize('join_type', all_join_types, ids=idfn)
def test_hash_join_different_key_integral_types(left_gen, right_gen, join_type):
def hash_join_different_key_integral_types(left_gen, right_gen, join_type):
def do_join(spark):
left = unary_op_df(spark, left_gen, length=50)
right = unary_op_df(spark, right_gen, length=500)
Expand All @@ -984,6 +993,23 @@ def do_join(spark):
})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_all_conf)

@validate_execs_in_gpu_plan('GpuShuffledHashJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('left_gen', limited_integral_gens, ids=idfn)
@pytest.mark.parametrize('right_gen', limited_integral_gens, ids=idfn)
@pytest.mark.parametrize('join_type', all_non_symmetric_join_types, ids=idfn)
def test_hash_join_different_key_integral_types_non_symmetric(left_gen, right_gen, join_type):
hash_join_different_key_integral_types(left_gen, right_gen, join_type)

@validate_execs_in_gpu_plan('GpuShuffledSymmetricHashJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('left_gen', limited_integral_gens, ids=idfn)
@pytest.mark.parametrize('right_gen', limited_integral_gens, ids=idfn)
@pytest.mark.parametrize('join_type', all_symmetric_join_types, ids=idfn)
def test_hash_join_different_key_integral_types_symmetric(left_gen, right_gen, join_type):
hash_join_different_key_integral_types(left_gen, right_gen, join_type)


bloom_filter_confs = {
"spark.sql.autoBroadcastJoinThreshold": "1",
"spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold": 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ class GpuShuffledHashJoinMeta(
left,
right,
conf.isGPUShuffle,
conf.gpuTargetBatchSizeBytes)(
conf.gpuTargetBatchSizeBytes,
isSkewJoin = false)(
join.leftKeys,
join.rightKeys)
case _ =>
Expand Down Expand Up @@ -111,7 +112,7 @@ case class GpuShuffledHashJoinExec(
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
isSkewJoin: Boolean)(
override val isSkewJoin: Boolean)(
cpuLeftKeys: Seq[Expression],
cpuRightKeys: Seq[Expression]) extends ShimBinaryExecNode with GpuHashJoin
with GpuSubPartitionHashJoin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.nvidia.spark.rapids.GpuShuffledSymmetricHashJoinExec.JoinInfo
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, ShimBinaryExecNode}
import com.nvidia.spark.rapids.shims.GpuHashPartitioning

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -35,7 +35,8 @@ import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, JoinType}
import org.apache.spark.sql.catalyst.plans.physical.Distribution
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
import org.apache.spark.sql.rapids.execution.{ConditionalHashJoinIterator, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase, HashFullJoinIterator, HashFullJoinStreamSideIterator, HashJoinIterator}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.rapids.execution.{ConditionalHashJoinIterator, GpuCustomShuffleReaderExec, GpuHashJoin, GpuJoinExec, GpuShuffleExchangeExecBase, HashFullJoinIterator, HashFullJoinStreamSideIterator, HashJoinIterator}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -69,13 +70,15 @@ object GpuShuffledSymmetricHashJoinExec {
val rightTypes = rightOutput.map(_.dataType).toArray
val boundLeftKeys = GpuBindReferences.bindGpuReferences(leftKeys, leftOutput)
val boundRightKeys = GpuBindReferences.bindGpuReferences(rightKeys, rightOutput)
val boundCondition = condition.map { c =>
GpuBindReferences.bindGpuReference(c, leftOutput ++ rightOutput)
}
val (boundBuildKeys, buildTypes, boundStreamKeys, streamTypes, streamOutput) =
val (boundBuildKeys, buildTypes, buildOutput, boundStreamKeys, streamTypes, streamOutput) =
buildSide match {
case GpuBuildRight => (boundRightKeys, rightTypes, boundLeftKeys, leftTypes, leftOutput)
case GpuBuildLeft => (boundLeftKeys, leftTypes, boundRightKeys, rightTypes, rightOutput)
case GpuBuildRight =>
(boundRightKeys, rightTypes, rightOutput, boundLeftKeys, leftTypes, leftOutput)
case GpuBuildLeft =>
(boundLeftKeys, leftTypes, leftOutput, boundRightKeys, rightTypes, rightOutput)
}
val boundCondition = condition.map { c =>
GpuBindReferences.bindGpuReference(c, streamOutput ++ buildOutput)
}
// For join types other than FullOuter, we simply set compareNullsEqual as true to adapt
// struct keys with nullable children. Non-nested keys can also be correctly processed with
Expand All @@ -85,7 +88,7 @@ object GpuShuffledSymmetricHashJoinExec {
GpuHashJoin.anyNullableStructChild(boundBuildKeys)
val needNullFilter = compareNullsEqual && boundBuildKeys.exists(_.nullable)
BoundJoinExprs(boundBuildKeys, buildTypes, boundStreamKeys, streamTypes, streamOutput,
boundCondition, leftOutput.size, compareNullsEqual, needNullFilter)
boundCondition, streamOutput.size, compareNullsEqual, needNullFilter)
}
}

Expand Down Expand Up @@ -356,9 +359,10 @@ case class GpuShuffledSymmetricHashJoinExec(
left: SparkPlan,
right: SparkPlan,
isGpuShuffle: Boolean,
gpuBatchSizeBytes: Long)(
gpuBatchSizeBytes: Long,
override val isSkewJoin: Boolean)(
cpuLeftKeys: Seq[Expression],
cpuRightKeys: Seq[Expression]) extends ShimBinaryExecNode with GpuExec {
cpuRightKeys: Seq[Expression]) extends GpuJoinExec {
import GpuShuffledSymmetricHashJoinExec._

override def otherCopyArgs: Seq[AnyRef] = Seq(cpuLeftKeys, cpuRightKeys)
Expand Down Expand Up @@ -604,11 +608,16 @@ case class GpuShuffledSymmetricHashJoinExec(
case _: GpuShuffleCoalesceExec =>
throw new IllegalStateException("Should not have shuffle coalesce before this node")
case _: GpuShuffleExchangeExecBase | _: GpuCustomShuffleReaderExec => true
case _: ReusedExchangeExec => true
case _: ShuffleQueryStageExec => true
case _ => false
}
}
}

override def nodeName: String = {
if (isSkewJoin) super.nodeName + "(skew=true)" else super.nodeName
}
}

/**
Expand Down Expand Up @@ -731,11 +740,10 @@ class NullFilteredBatchIterator(

override def hasNext: Boolean = {
while (onDeck.isEmpty && iter.hasNext) {
val batch = withResource(iter.next()) { batch =>
opTime.ns {
val spillable = SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
GpuHashJoin.filterNullsWithRetryAndClose(spillable, boundKeys)
}
val rawBatch = iter.next()
val batch = opTime.ns {
val spillable = SpillableColumnarBatch(rawBatch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
GpuHashJoin.filterNullsWithRetryAndClose(spillable, boundKeys)
}
if (batch.numRows > 0) {
onDeck = Some(batch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class GpuSortMergeJoinMeta(
left,
right,
conf.isGPUShuffle,
conf.gpuTargetBatchSizeBytes)(
conf.gpuTargetBatchSizeBytes,
join.isSkewJoin)(
join.leftKeys,
join.rightKeys)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
"joins. Requires spark.rapids.sql.shuffledHashJoin.optimizeShuffle=true.")
.internal()
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val STABLE_SORT = conf("spark.rapids.sql.stableSort.enabled")
.doc("Enable or disable stable sorting. Apache Spark's sorting is typically a stable " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{withRestoreOnRetry, withRetryNoSplit}
import com.nvidia.spark.rapids.jni.GpuOOM
import com.nvidia.spark.rapids.shims.ShimBinaryExecNode

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans.{Cross, ExistenceJoin, FullOuter, Inner, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -522,9 +522,17 @@ class ConditionalHashJoinIterator(
withResource(GpuColumnVector.from(leftData.getBatch)) { leftTable =>
withResource(GpuColumnVector.from(rightData.getBatch)) { rightTable =>
val maps = joinType match {
case _: InnerLike =>
case _: InnerLike if buildSide == GpuBuildRight =>
Table.mixedInnerJoinGatherMaps(leftKeys, rightKeys, leftTable, rightTable,
compiledCondition, nullEquality)
case _: InnerLike if buildSide == GpuBuildLeft =>
// Even though it's an inner join, we need to switch the join order since the
// condition has been compiled to expect the build side on the left and the stream
// side on the right.
// Reverse the output of the join, because we expect the right gather map to
// always be on the right.
Table.mixedInnerJoinGatherMaps(rightKeys, leftKeys, rightTable, leftTable,
compiledCondition, nullEquality).reverse
case LeftOuter =>
Table.mixedLeftJoinGatherMaps(leftKeys, rightKeys, leftTable, rightTable,
compiledCondition, nullEquality)
Expand All @@ -540,8 +548,7 @@ class ConditionalHashJoinIterator(
Array(Table.mixedLeftAntiJoinGatherMap(leftKeys, rightKeys, leftTable, rightTable,
compiledCondition, nullEquality))
case _ =>
throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" +
s" supported")
throw new NotImplementedError(s"Join $joinType $buildSide is not currently supported")
}
makeGatherer(maps, leftData, rightData, joinType)
}
Expand Down Expand Up @@ -960,13 +967,15 @@ class HashedExistenceJoinIterator(
}
}

trait GpuHashJoin extends GpuExec {
def left: SparkPlan
def right: SparkPlan
trait GpuJoinExec extends ShimBinaryExecNode with GpuExec {
def joinType: JoinType
def condition: Option[Expression]
def leftKeys: Seq[Expression]
def rightKeys: Seq[Expression]
def isSkewJoin: Boolean = false
}

trait GpuHashJoin extends GpuJoinExec {
def buildSide: GpuBuildSide

protected lazy val (buildPlan, streamedPlan) = buildSide match {
Expand Down Expand Up @@ -1065,14 +1074,14 @@ trait GpuHashJoin extends GpuExec {
}

protected lazy val (numFirstConditionTableColumns, boundCondition) = {
val (joinLeft, joinRight) = joinType match {
case RightOuter => (right, left)
case _ => (left, right)
val (buildOutput, streamOutput) = buildSide match {
case GpuBuildRight => (right.output, left.output)
case GpuBuildLeft => (left.output, right.output)
}
val boundCondition = condition.map { c =>
GpuBindReferences.bindGpuReference(c, joinLeft.output ++ joinRight.output)
GpuBindReferences.bindGpuReference(c, streamOutput ++ buildOutput)
}
(joinLeft.output.size, boundCondition)
(streamOutput.size, boundCondition)
}

def doJoin(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.functions.{col, when}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{ExecutionPlanCaptureCallback, GpuFileSourceScanExec}
import org.apache.spark.sql.rapids.execution.GpuCustomShuffleReaderExec
import org.apache.spark.sql.rapids.execution.{GpuCustomShuffleReaderExec, GpuJoinExec}
import org.apache.spark.sql.types.{ArrayType, DataTypes, DecimalType, IntegerType, StringType, StructField, StructType}

class AdaptiveQueryExecSuite
Expand Down Expand Up @@ -77,9 +77,10 @@ class AdaptiveQueryExecSuite
}
}

private def findTopLevelGpuShuffleHashJoin(plan: SparkPlan): Seq[GpuShuffledHashJoinExec] = {
private def findTopLevelGpuShuffleHashJoin(plan: SparkPlan): Seq[GpuJoinExec] = {
collect(plan) {
case j: GpuShuffledHashJoinExec => j
case j: GpuShuffledSymmetricHashJoinExec => j
}
}

Expand Down Expand Up @@ -202,25 +203,35 @@ class AdaptiveQueryExecSuite
}

val shj = TestUtils.findOperator(df.queryExecution.executedPlan,
_.isInstanceOf[GpuShuffledHashJoinExec]).get
.asInstanceOf[GpuShuffledHashJoinExec]
_.isInstanceOf[GpuJoinExec]).get
assert(shj.children.length == 2)
val childrenToCheck = if (shouldOptimizeHashJoinShuffle) {
// assert that the stream side of SHJ is coalesced
shj.buildSide match {
case GpuBuildLeft => Seq(shj.right)
case GpuBuildRight => Seq(shj.left)
}
} else {
// assert that both the build and stream side of SHJ are coalesced
// if we are not optimizing the build side shuffle
shj.children
shj match {
case j: GpuShuffledSymmetricHashJoinExec =>
// assert that both children are NOT coalesced, as join will directly handle shuffle
assert(j.children.forall {
case GpuShuffleCoalesceExec(_, _) => false
case GpuCoalesceBatches(GpuShuffleCoalesceExec(_, _), _) => false
case _ => true
})
case j: GpuShuffledHashJoinExec =>
val childrenToCheck = if (shouldOptimizeHashJoinShuffle) {
// assert that the stream side of SHJ is coalesced
j.buildSide match {
case GpuBuildLeft => Seq(j.right)
case GpuBuildRight => Seq(j.left)
}
} else {
// assert that both the build and stream side of SHJ are coalesced
// if we are not optimizing the build side shuffle
j.children
}
assert(childrenToCheck.forall {
case GpuShuffleCoalesceExec(_, _) => true
case GpuCoalesceBatches(GpuShuffleCoalesceExec(_, _), _) => true
case _ => false
})
case j => throw new IllegalStateException(s"Unexpected join: $j")
}
assert(childrenToCheck.forall {
case GpuShuffleCoalesceExec(_, _) => true
case GpuCoalesceBatches(GpuShuffleCoalesceExec(_, _), _) => true
case _ => false
})
}

}, conf)
Expand Down Expand Up @@ -671,7 +682,7 @@ class AdaptiveQueryExecSuite
}

def checkSkewJoin(
joins: Seq[GpuShuffledHashJoinExec],
joins: Seq[GpuJoinExec],
leftSkewNum: Int,
rightSkewNum: Int): Unit = {
assert(joins.size == 1 && joins.head.isSkewJoin)
Expand Down
Loading

0 comments on commit 5be63f9

Please sign in to comment.