Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable GpuShuffledSymmetricHashJoin by default #10418

Merged
merged 4 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

pytestmark = [pytest.mark.nightly_resource_consuming_test]

all_join_types = ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter']
all_non_symmetric_join_types = ['Left', 'Right', 'LeftSemi', 'LeftAnti', 'Cross']
all_symmetric_join_types = ['Inner', 'FullOuter']
all_join_types = all_non_symmetric_join_types + all_symmetric_join_types

all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
BooleanGen(), DateGen(), TimestampGen(), null_gen,
Expand Down Expand Up @@ -208,13 +210,7 @@ def do_join(spark):
# 3 times smaller than the other side. So it is not likely to happen
# unless we can give it some help. Parameters are setup to try to make
# this happen, if test fails something might have changed related to that.
@validate_execs_in_gpu_plan('GpuShuffledHashJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', basic_nested_gens + [decimal_gen_128bit], ids=idfn)
@pytest.mark.parametrize('join_type', all_join_types, ids=idfn)
@pytest.mark.parametrize('sub_part_enabled', ['false', 'true'], ids=['SubPartition_OFF', 'SubPartition_ON'])
@allow_non_gpu(*non_utc_allow)
def test_hash_join_ridealong(data_gen, join_type, sub_part_enabled):
def hash_join_ridealong(data_gen, join_type, sub_part_enabled):
def do_join(spark):
left, right = create_ridealong_df(spark, short_gen, data_gen, 50, 500)
return left.join(right, left.key == right.r_key, join_type)
Expand All @@ -223,6 +219,24 @@ def do_join(spark):
})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_all_conf)

@validate_execs_in_gpu_plan('GpuShuffledHashJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', basic_nested_gens + [decimal_gen_128bit], ids=idfn)
@pytest.mark.parametrize('join_type', all_non_symmetric_join_types, ids=idfn)
@pytest.mark.parametrize('sub_part_enabled', ['false', 'true'], ids=['SubPartition_OFF', 'SubPartition_ON'])
@allow_non_gpu(*non_utc_allow)
def test_hash_join_ridealong_non_symmetric(data_gen, join_type, sub_part_enabled):
hash_join_ridealong(data_gen, join_type, sub_part_enabled)

@validate_execs_in_gpu_plan('GpuShuffledSymmetricHashJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', basic_nested_gens + [decimal_gen_128bit], ids=idfn)
@pytest.mark.parametrize('join_type', all_symmetric_join_types, ids=idfn)
@pytest.mark.parametrize('sub_part_enabled', ['false', 'true'], ids=['SubPartition_OFF', 'SubPartition_ON'])
@allow_non_gpu(*non_utc_allow)
def test_hash_join_ridealong_symmetric(data_gen, join_type, sub_part_enabled):
hash_join_ridealong(data_gen, join_type, sub_part_enabled)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
Expand Down Expand Up @@ -969,12 +983,7 @@ def do_join(spark):

limited_integral_gens = [byte_gen, ShortGen(max_val=BYTE_MAX), IntegerGen(max_val=BYTE_MAX), LongGen(max_val=BYTE_MAX)]

@validate_execs_in_gpu_plan('GpuShuffledHashJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('left_gen', limited_integral_gens, ids=idfn)
@pytest.mark.parametrize('right_gen', limited_integral_gens, ids=idfn)
@pytest.mark.parametrize('join_type', all_join_types, ids=idfn)
def test_hash_join_different_key_integral_types(left_gen, right_gen, join_type):
def hash_join_different_key_integral_types(left_gen, right_gen, join_type):
def do_join(spark):
left = unary_op_df(spark, left_gen, length=50)
right = unary_op_df(spark, right_gen, length=500)
Expand All @@ -984,6 +993,23 @@ def do_join(spark):
})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_all_conf)

@validate_execs_in_gpu_plan('GpuShuffledHashJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('left_gen', limited_integral_gens, ids=idfn)
@pytest.mark.parametrize('right_gen', limited_integral_gens, ids=idfn)
@pytest.mark.parametrize('join_type', all_non_symmetric_join_types, ids=idfn)
def test_hash_join_different_key_integral_types_non_symmetric(left_gen, right_gen, join_type):
hash_join_different_key_integral_types(left_gen, right_gen, join_type)

@validate_execs_in_gpu_plan('GpuShuffledSymmetricHashJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('left_gen', limited_integral_gens, ids=idfn)
@pytest.mark.parametrize('right_gen', limited_integral_gens, ids=idfn)
@pytest.mark.parametrize('join_type', all_symmetric_join_types, ids=idfn)
def test_hash_join_different_key_integral_types_symmetric(left_gen, right_gen, join_type):
hash_join_different_key_integral_types(left_gen, right_gen, join_type)


bloom_filter_confs = {
"spark.sql.autoBroadcastJoinThreshold": "1",
"spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold": 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class GpuShuffledHashJoinMeta(
left,
right,
conf.isGPUShuffle,
conf.gpuTargetBatchSizeBytes)(
conf.gpuTargetBatchSizeBytes,
isSkewJoin = false)(
join.leftKeys,
join.rightKeys)
case _ =>
Expand Down Expand Up @@ -110,7 +111,7 @@ case class GpuShuffledHashJoinExec(
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
isSkewJoin: Boolean)(
override val isSkewJoin: Boolean)(
cpuLeftKeys: Seq[Expression],
cpuRightKeys: Seq[Expression]) extends ShimBinaryExecNode with GpuHashJoin
with GpuSubPartitionHashJoin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.nvidia.spark.rapids.GpuShuffledSymmetricHashJoinExec.JoinInfo
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, ShimBinaryExecNode}
import com.nvidia.spark.rapids.shims.GpuHashPartitioning

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -35,7 +35,8 @@ import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, JoinType}
import org.apache.spark.sql.catalyst.plans.physical.Distribution
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
import org.apache.spark.sql.rapids.execution.{ConditionalHashJoinIterator, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase, HashFullJoinIterator, HashFullJoinStreamSideIterator, HashJoinIterator}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.rapids.execution.{ConditionalHashJoinIterator, GpuCustomShuffleReaderExec, GpuHashJoin, GpuJoinExec, GpuShuffleExchangeExecBase, HashFullJoinIterator, HashFullJoinStreamSideIterator, HashJoinIterator}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -69,13 +70,15 @@ object GpuShuffledSymmetricHashJoinExec {
val rightTypes = rightOutput.map(_.dataType).toArray
val boundLeftKeys = GpuBindReferences.bindGpuReferences(leftKeys, leftOutput)
val boundRightKeys = GpuBindReferences.bindGpuReferences(rightKeys, rightOutput)
val boundCondition = condition.map { c =>
GpuBindReferences.bindGpuReference(c, leftOutput ++ rightOutput)
}
val (boundBuildKeys, buildTypes, boundStreamKeys, streamTypes, streamOutput) =
val (boundBuildKeys, buildTypes, buildOutput, boundStreamKeys, streamTypes, streamOutput) =
buildSide match {
case GpuBuildRight => (boundRightKeys, rightTypes, boundLeftKeys, leftTypes, leftOutput)
case GpuBuildLeft => (boundLeftKeys, leftTypes, boundRightKeys, rightTypes, rightOutput)
case GpuBuildRight =>
(boundRightKeys, rightTypes, rightOutput, boundLeftKeys, leftTypes, leftOutput)
case GpuBuildLeft =>
(boundLeftKeys, leftTypes, leftOutput, boundRightKeys, rightTypes, rightOutput)
}
val boundCondition = condition.map { c =>
GpuBindReferences.bindGpuReference(c, streamOutput ++ buildOutput)
}
// For join types other than FullOuter, we simply set compareNullsEqual as true to adapt
// struct keys with nullable children. Non-nested keys can also be correctly processed with
Expand All @@ -85,7 +88,7 @@ object GpuShuffledSymmetricHashJoinExec {
GpuHashJoin.anyNullableStructChild(boundBuildKeys)
val needNullFilter = compareNullsEqual && boundBuildKeys.exists(_.nullable)
BoundJoinExprs(boundBuildKeys, buildTypes, boundStreamKeys, streamTypes, streamOutput,
boundCondition, leftOutput.size, compareNullsEqual, needNullFilter)
boundCondition, streamOutput.size, compareNullsEqual, needNullFilter)
}
}

Expand Down Expand Up @@ -356,9 +359,10 @@ case class GpuShuffledSymmetricHashJoinExec(
left: SparkPlan,
right: SparkPlan,
isGpuShuffle: Boolean,
gpuBatchSizeBytes: Long)(
gpuBatchSizeBytes: Long,
override val isSkewJoin: Boolean)(
cpuLeftKeys: Seq[Expression],
cpuRightKeys: Seq[Expression]) extends ShimBinaryExecNode with GpuExec {
cpuRightKeys: Seq[Expression]) extends GpuJoinExec {
import GpuShuffledSymmetricHashJoinExec._

override def otherCopyArgs: Seq[AnyRef] = Seq(cpuLeftKeys, cpuRightKeys)
Expand Down Expand Up @@ -604,11 +608,16 @@ case class GpuShuffledSymmetricHashJoinExec(
case _: GpuShuffleCoalesceExec =>
throw new IllegalStateException("Should not have shuffle coalesce before this node")
case _: GpuShuffleExchangeExecBase | _: GpuCustomShuffleReaderExec => true
case _: ReusedExchangeExec => true
case _: ShuffleQueryStageExec => true
case _ => false
}
}
}

override def nodeName: String = {
if (isSkewJoin) super.nodeName + "(skew=true)" else super.nodeName
}
}

/**
Expand Down Expand Up @@ -731,11 +740,10 @@ class NullFilteredBatchIterator(

override def hasNext: Boolean = {
while (onDeck.isEmpty && iter.hasNext) {
val batch = withResource(iter.next()) { batch =>
opTime.ns {
val spillable = SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
GpuHashJoin.filterNullsWithRetryAndClose(spillable, boundKeys)
}
val rawBatch = iter.next()
val batch = opTime.ns {
val spillable = SpillableColumnarBatch(rawBatch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
GpuHashJoin.filterNullsWithRetryAndClose(spillable, boundKeys)
}
if (batch.numRows > 0) {
onDeck = Some(batch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class GpuSortMergeJoinMeta(
left,
right,
conf.isGPUShuffle,
conf.gpuTargetBatchSizeBytes)(
conf.gpuTargetBatchSizeBytes,
join.isSkewJoin)(
join.leftKeys,
join.rightKeys)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
"joins. Requires spark.rapids.sql.shuffledHashJoin.optimizeShuffle=true.")
.internal()
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val STABLE_SORT = conf("spark.rapids.sql.stableSort.enabled")
.doc("Enable or disable stable sorting. Apache Spark's sorting is typically a stable " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{withRestoreOnRetry, withRetryNoSplit}
import com.nvidia.spark.rapids.jni.GpuOOM
import com.nvidia.spark.rapids.shims.ShimBinaryExecNode

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans.{Cross, ExistenceJoin, FullOuter, Inner, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -522,9 +522,17 @@ class ConditionalHashJoinIterator(
withResource(GpuColumnVector.from(leftData.getBatch)) { leftTable =>
withResource(GpuColumnVector.from(rightData.getBatch)) { rightTable =>
val maps = joinType match {
case _: InnerLike =>
case _: InnerLike if buildSide == GpuBuildRight =>
Table.mixedInnerJoinGatherMaps(leftKeys, rightKeys, leftTable, rightTable,
compiledCondition, nullEquality)
case _: InnerLike if buildSide == GpuBuildLeft =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we update the error message in the exception to include the build side?

// Even though it's an inner join, we need to switch the join order since the
// condition has been compiled to expect the build side on the left and the stream
// side on the right.
// Reverse the output of the join, because we expect the right gather map to
// always be on the right.
Table.mixedInnerJoinGatherMaps(rightKeys, leftKeys, rightTable, leftTable,
compiledCondition, nullEquality).reverse
case LeftOuter =>
Table.mixedLeftJoinGatherMaps(leftKeys, rightKeys, leftTable, rightTable,
compiledCondition, nullEquality)
Expand All @@ -540,8 +548,7 @@ class ConditionalHashJoinIterator(
Array(Table.mixedLeftAntiJoinGatherMap(leftKeys, rightKeys, leftTable, rightTable,
compiledCondition, nullEquality))
case _ =>
throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" +
s" supported")
throw new NotImplementedError(s"Join $joinType $buildSide is not currently supported")
}
makeGatherer(maps, leftData, rightData, joinType)
}
Expand Down Expand Up @@ -960,13 +967,15 @@ class HashedExistenceJoinIterator(
}
}

trait GpuHashJoin extends GpuExec {
def left: SparkPlan
def right: SparkPlan
trait GpuJoinExec extends ShimBinaryExecNode with GpuExec {
def joinType: JoinType
def condition: Option[Expression]
def leftKeys: Seq[Expression]
def rightKeys: Seq[Expression]
def isSkewJoin: Boolean = false
}

trait GpuHashJoin extends GpuJoinExec {
def buildSide: GpuBuildSide

protected lazy val (buildPlan, streamedPlan) = buildSide match {
Expand Down Expand Up @@ -1065,14 +1074,14 @@ trait GpuHashJoin extends GpuExec {
}

protected lazy val (numFirstConditionTableColumns, boundCondition) = {
val (joinLeft, joinRight) = joinType match {
case RightOuter => (right, left)
case _ => (left, right)
val (buildOutput, streamOutput) = buildSide match {
case GpuBuildRight => (right.output, left.output)
case GpuBuildLeft => (left.output, right.output)
}
val boundCondition = condition.map { c =>
GpuBindReferences.bindGpuReference(c, joinLeft.output ++ joinRight.output)
GpuBindReferences.bindGpuReference(c, streamOutput ++ buildOutput)
}
(joinLeft.output.size, boundCondition)
(streamOutput.size, boundCondition)
}

def doJoin(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.functions.{col, when}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{ExecutionPlanCaptureCallback, GpuFileSourceScanExec}
import org.apache.spark.sql.rapids.execution.GpuCustomShuffleReaderExec
import org.apache.spark.sql.rapids.execution.{GpuCustomShuffleReaderExec, GpuJoinExec}
import org.apache.spark.sql.types.{ArrayType, DataTypes, DecimalType, IntegerType, StringType, StructField, StructType}

class AdaptiveQueryExecSuite
Expand Down Expand Up @@ -77,9 +77,10 @@ class AdaptiveQueryExecSuite
}
}

private def findTopLevelGpuShuffleHashJoin(plan: SparkPlan): Seq[GpuShuffledHashJoinExec] = {
private def findTopLevelGpuShuffleHashJoin(plan: SparkPlan): Seq[GpuJoinExec] = {
collect(plan) {
case j: GpuShuffledHashJoinExec => j
case j: GpuShuffledSymmetricHashJoinExec => j
}
}

Expand Down Expand Up @@ -202,25 +203,35 @@ class AdaptiveQueryExecSuite
}

val shj = TestUtils.findOperator(df.queryExecution.executedPlan,
_.isInstanceOf[GpuShuffledHashJoinExec]).get
.asInstanceOf[GpuShuffledHashJoinExec]
_.isInstanceOf[GpuJoinExec]).get
assert(shj.children.length == 2)
val childrenToCheck = if (shouldOptimizeHashJoinShuffle) {
// assert that the stream side of SHJ is coalesced
shj.buildSide match {
case GpuBuildLeft => Seq(shj.right)
case GpuBuildRight => Seq(shj.left)
}
} else {
// assert that both the build and stream side of SHJ are coalesced
// if we are not optimizing the build side shuffle
shj.children
shj match {
case j: GpuShuffledSymmetricHashJoinExec =>
// assert that both children are NOT coalesced, as join will directly handle shuffle
assert(j.children.forall {
case GpuShuffleCoalesceExec(_, _) => false
case GpuCoalesceBatches(GpuShuffleCoalesceExec(_, _), _) => false
case _ => true
})
case j: GpuShuffledHashJoinExec =>
val childrenToCheck = if (shouldOptimizeHashJoinShuffle) {
// assert that the stream side of SHJ is coalesced
j.buildSide match {
case GpuBuildLeft => Seq(j.right)
case GpuBuildRight => Seq(j.left)
}
} else {
// assert that both the build and stream side of SHJ are coalesced
// if we are not optimizing the build side shuffle
j.children
}
assert(childrenToCheck.forall {
case GpuShuffleCoalesceExec(_, _) => true
case GpuCoalesceBatches(GpuShuffleCoalesceExec(_, _), _) => true
case _ => false
})
case j => throw new IllegalStateException(s"Unexpected join: $j")
}
assert(childrenToCheck.forall {
case GpuShuffleCoalesceExec(_, _) => true
case GpuCoalesceBatches(GpuShuffleCoalesceExec(_, _), _) => true
case _ => false
})
}

}, conf)
Expand Down Expand Up @@ -671,7 +682,7 @@ class AdaptiveQueryExecSuite
}

def checkSkewJoin(
joins: Seq[GpuShuffledHashJoinExec],
joins: Seq[GpuJoinExec],
leftSkewNum: Int,
rightSkewNum: Int): Unit = {
assert(joins.size == 1 && joins.head.isSkewJoin)
Expand Down
Loading
Loading