diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index e06fd5ceb16..5e32e07d230 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -24,7 +24,9 @@ pytestmark = [pytest.mark.nightly_resource_consuming_test] -all_join_types = ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'] +all_non_symmetric_join_types = ['Left', 'Right', 'LeftSemi', 'LeftAnti', 'Cross'] +all_symmetric_join_types = ['Inner', 'FullOuter'] +all_join_types = all_non_symmetric_join_types + all_symmetric_join_types all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(), BooleanGen(), DateGen(), TimestampGen(), null_gen, @@ -208,13 +210,7 @@ def do_join(spark): # 3 times smaller than the other side. So it is not likely to happen # unless we can give it some help. Parameters are setup to try to make # this happen, if test fails something might have changed related to that. -@validate_execs_in_gpu_plan('GpuShuffledHashJoinExec') -@ignore_order(local=True) -@pytest.mark.parametrize('data_gen', basic_nested_gens + [decimal_gen_128bit], ids=idfn) -@pytest.mark.parametrize('join_type', all_join_types, ids=idfn) -@pytest.mark.parametrize('sub_part_enabled', ['false', 'true'], ids=['SubPartition_OFF', 'SubPartition_ON']) -@allow_non_gpu(*non_utc_allow) -def test_hash_join_ridealong(data_gen, join_type, sub_part_enabled): +def hash_join_ridealong(data_gen, join_type, sub_part_enabled): def do_join(spark): left, right = create_ridealong_df(spark, short_gen, data_gen, 50, 500) return left.join(right, left.key == right.r_key, join_type) @@ -223,6 +219,24 @@ def do_join(spark): }) assert_gpu_and_cpu_are_equal_collect(do_join, conf=_all_conf) +@validate_execs_in_gpu_plan('GpuShuffledHashJoinExec') +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', basic_nested_gens + [decimal_gen_128bit], ids=idfn) +@pytest.mark.parametrize('join_type', all_non_symmetric_join_types, ids=idfn) +@pytest.mark.parametrize('sub_part_enabled', ['false', 'true'], ids=['SubPartition_OFF', 'SubPartition_ON']) +@allow_non_gpu(*non_utc_allow) +def test_hash_join_ridealong_non_symmetric(data_gen, join_type, sub_part_enabled): + hash_join_ridealong(data_gen, join_type, sub_part_enabled) + +@validate_execs_in_gpu_plan('GpuShuffledSymmetricHashJoinExec') +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', basic_nested_gens + [decimal_gen_128bit], ids=idfn) +@pytest.mark.parametrize('join_type', all_symmetric_join_types, ids=idfn) +@pytest.mark.parametrize('sub_part_enabled', ['false', 'true'], ids=['SubPartition_OFF', 'SubPartition_ON']) +@allow_non_gpu(*non_utc_allow) +def test_hash_join_ridealong_symmetric(data_gen, join_type, sub_part_enabled): + hash_join_ridealong(data_gen, join_type, sub_part_enabled) + # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) @@ -969,12 +983,7 @@ def do_join(spark): limited_integral_gens = [byte_gen, ShortGen(max_val=BYTE_MAX), IntegerGen(max_val=BYTE_MAX), LongGen(max_val=BYTE_MAX)] -@validate_execs_in_gpu_plan('GpuShuffledHashJoinExec') -@ignore_order(local=True) -@pytest.mark.parametrize('left_gen', limited_integral_gens, ids=idfn) -@pytest.mark.parametrize('right_gen', limited_integral_gens, ids=idfn) -@pytest.mark.parametrize('join_type', all_join_types, ids=idfn) -def test_hash_join_different_key_integral_types(left_gen, right_gen, join_type): +def hash_join_different_key_integral_types(left_gen, right_gen, join_type): def do_join(spark): left = unary_op_df(spark, left_gen, length=50) right = unary_op_df(spark, right_gen, length=500) @@ -984,6 +993,23 @@ def do_join(spark): }) assert_gpu_and_cpu_are_equal_collect(do_join, conf=_all_conf) +@validate_execs_in_gpu_plan('GpuShuffledHashJoinExec') +@ignore_order(local=True) +@pytest.mark.parametrize('left_gen', limited_integral_gens, ids=idfn) +@pytest.mark.parametrize('right_gen', limited_integral_gens, ids=idfn) +@pytest.mark.parametrize('join_type', all_non_symmetric_join_types, ids=idfn) +def test_hash_join_different_key_integral_types_non_symmetric(left_gen, right_gen, join_type): + hash_join_different_key_integral_types(left_gen, right_gen, join_type) + +@validate_execs_in_gpu_plan('GpuShuffledSymmetricHashJoinExec') +@ignore_order(local=True) +@pytest.mark.parametrize('left_gen', limited_integral_gens, ids=idfn) +@pytest.mark.parametrize('right_gen', limited_integral_gens, ids=idfn) +@pytest.mark.parametrize('join_type', all_symmetric_join_types, ids=idfn) +def test_hash_join_different_key_integral_types_symmetric(left_gen, right_gen, join_type): + hash_join_different_key_integral_types(left_gen, right_gen, join_type) + + bloom_filter_confs = { "spark.sql.autoBroadcastJoinThreshold": "1", "spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold": 1, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala index 83879282aa7..67c7433aed6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala @@ -80,7 +80,8 @@ class GpuShuffledHashJoinMeta( left, right, conf.isGPUShuffle, - conf.gpuTargetBatchSizeBytes)( + conf.gpuTargetBatchSizeBytes, + isSkewJoin = false)( join.leftKeys, join.rightKeys) case _ => @@ -111,7 +112,7 @@ case class GpuShuffledHashJoinExec( override val condition: Option[Expression], left: SparkPlan, right: SparkPlan, - isSkewJoin: Boolean)( + override val isSkewJoin: Boolean)( cpuLeftKeys: Seq[Expression], cpuRightKeys: Seq[Expression]) extends ShimBinaryExecNode with GpuHashJoin with GpuSubPartitionHashJoin { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSymmetricHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSymmetricHashJoinExec.scala index d4086dedf54..ee9b28b0d1f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSymmetricHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSymmetricHashJoinExec.scala @@ -26,7 +26,7 @@ import com.nvidia.spark.rapids.GpuShuffledSymmetricHashJoinExec.JoinInfo import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion -import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, ShimBinaryExecNode} +import com.nvidia.spark.rapids.shims.GpuHashPartitioning import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -35,7 +35,8 @@ import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, JoinType} import org.apache.spark.sql.catalyst.plans.physical.Distribution import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec -import org.apache.spark.sql.rapids.execution.{ConditionalHashJoinIterator, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase, HashFullJoinIterator, HashFullJoinStreamSideIterator, HashJoinIterator} +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec +import org.apache.spark.sql.rapids.execution.{ConditionalHashJoinIterator, GpuCustomShuffleReaderExec, GpuHashJoin, GpuJoinExec, GpuShuffleExchangeExecBase, HashFullJoinIterator, HashFullJoinStreamSideIterator, HashJoinIterator} import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -69,13 +70,15 @@ object GpuShuffledSymmetricHashJoinExec { val rightTypes = rightOutput.map(_.dataType).toArray val boundLeftKeys = GpuBindReferences.bindGpuReferences(leftKeys, leftOutput) val boundRightKeys = GpuBindReferences.bindGpuReferences(rightKeys, rightOutput) - val boundCondition = condition.map { c => - GpuBindReferences.bindGpuReference(c, leftOutput ++ rightOutput) - } - val (boundBuildKeys, buildTypes, boundStreamKeys, streamTypes, streamOutput) = + val (boundBuildKeys, buildTypes, buildOutput, boundStreamKeys, streamTypes, streamOutput) = buildSide match { - case GpuBuildRight => (boundRightKeys, rightTypes, boundLeftKeys, leftTypes, leftOutput) - case GpuBuildLeft => (boundLeftKeys, leftTypes, boundRightKeys, rightTypes, rightOutput) + case GpuBuildRight => + (boundRightKeys, rightTypes, rightOutput, boundLeftKeys, leftTypes, leftOutput) + case GpuBuildLeft => + (boundLeftKeys, leftTypes, leftOutput, boundRightKeys, rightTypes, rightOutput) + } + val boundCondition = condition.map { c => + GpuBindReferences.bindGpuReference(c, streamOutput ++ buildOutput) } // For join types other than FullOuter, we simply set compareNullsEqual as true to adapt // struct keys with nullable children. Non-nested keys can also be correctly processed with @@ -85,7 +88,7 @@ object GpuShuffledSymmetricHashJoinExec { GpuHashJoin.anyNullableStructChild(boundBuildKeys) val needNullFilter = compareNullsEqual && boundBuildKeys.exists(_.nullable) BoundJoinExprs(boundBuildKeys, buildTypes, boundStreamKeys, streamTypes, streamOutput, - boundCondition, leftOutput.size, compareNullsEqual, needNullFilter) + boundCondition, streamOutput.size, compareNullsEqual, needNullFilter) } } @@ -356,9 +359,10 @@ case class GpuShuffledSymmetricHashJoinExec( left: SparkPlan, right: SparkPlan, isGpuShuffle: Boolean, - gpuBatchSizeBytes: Long)( + gpuBatchSizeBytes: Long, + override val isSkewJoin: Boolean)( cpuLeftKeys: Seq[Expression], - cpuRightKeys: Seq[Expression]) extends ShimBinaryExecNode with GpuExec { + cpuRightKeys: Seq[Expression]) extends GpuJoinExec { import GpuShuffledSymmetricHashJoinExec._ override def otherCopyArgs: Seq[AnyRef] = Seq(cpuLeftKeys, cpuRightKeys) @@ -604,11 +608,16 @@ case class GpuShuffledSymmetricHashJoinExec( case _: GpuShuffleCoalesceExec => throw new IllegalStateException("Should not have shuffle coalesce before this node") case _: GpuShuffleExchangeExecBase | _: GpuCustomShuffleReaderExec => true + case _: ReusedExchangeExec => true case _: ShuffleQueryStageExec => true case _ => false } } } + + override def nodeName: String = { + if (isSkewJoin) super.nodeName + "(skew=true)" else super.nodeName + } } /** @@ -731,11 +740,10 @@ class NullFilteredBatchIterator( override def hasNext: Boolean = { while (onDeck.isEmpty && iter.hasNext) { - val batch = withResource(iter.next()) { batch => - opTime.ns { - val spillable = SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) - GpuHashJoin.filterNullsWithRetryAndClose(spillable, boundKeys) - } + val rawBatch = iter.next() + val batch = opTime.ns { + val spillable = SpillableColumnarBatch(rawBatch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) + GpuHashJoin.filterNullsWithRetryAndClose(spillable, boundKeys) } if (batch.numRows > 0) { onDeck = Some(batch) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala index acc4b2a80e1..d4b87f0b094 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala @@ -92,7 +92,8 @@ class GpuSortMergeJoinMeta( left, right, conf.isGPUShuffle, - conf.gpuTargetBatchSizeBytes)( + conf.gpuTargetBatchSizeBytes, + join.isSkewJoin)( join.leftKeys, join.rightKeys) case _ => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 05bc5dcbf66..f52c3b5f334 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -638,7 +638,7 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") "joins. Requires spark.rapids.sql.shuffledHashJoin.optimizeShuffle=true.") .internal() .booleanConf - .createWithDefault(false) + .createWithDefault(true) val STABLE_SORT = conf("spark.rapids.sql.stableSort.enabled") .doc("Enable or disable stable sorting. Apache Spark's sorting is typically a stable " + diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala index df9a96639b5..46d796540c1 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala @@ -22,10 +22,10 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{withRestoreOnRetry, withRetryNoSplit} import com.nvidia.spark.rapids.jni.GpuOOM +import com.nvidia.spark.rapids.shims.ShimBinaryExecNode import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, NamedExpression} import org.apache.spark.sql.catalyst.plans.{Cross, ExistenceJoin, FullOuter, Inner, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} -import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch @@ -522,9 +522,17 @@ class ConditionalHashJoinIterator( withResource(GpuColumnVector.from(leftData.getBatch)) { leftTable => withResource(GpuColumnVector.from(rightData.getBatch)) { rightTable => val maps = joinType match { - case _: InnerLike => + case _: InnerLike if buildSide == GpuBuildRight => Table.mixedInnerJoinGatherMaps(leftKeys, rightKeys, leftTable, rightTable, compiledCondition, nullEquality) + case _: InnerLike if buildSide == GpuBuildLeft => + // Even though it's an inner join, we need to switch the join order since the + // condition has been compiled to expect the build side on the left and the stream + // side on the right. + // Reverse the output of the join, because we expect the right gather map to + // always be on the right. + Table.mixedInnerJoinGatherMaps(rightKeys, leftKeys, rightTable, leftTable, + compiledCondition, nullEquality).reverse case LeftOuter => Table.mixedLeftJoinGatherMaps(leftKeys, rightKeys, leftTable, rightTable, compiledCondition, nullEquality) @@ -540,8 +548,7 @@ class ConditionalHashJoinIterator( Array(Table.mixedLeftAntiJoinGatherMap(leftKeys, rightKeys, leftTable, rightTable, compiledCondition, nullEquality)) case _ => - throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" + - s" supported") + throw new NotImplementedError(s"Join $joinType $buildSide is not currently supported") } makeGatherer(maps, leftData, rightData, joinType) } @@ -960,13 +967,15 @@ class HashedExistenceJoinIterator( } } -trait GpuHashJoin extends GpuExec { - def left: SparkPlan - def right: SparkPlan +trait GpuJoinExec extends ShimBinaryExecNode with GpuExec { def joinType: JoinType def condition: Option[Expression] def leftKeys: Seq[Expression] def rightKeys: Seq[Expression] + def isSkewJoin: Boolean = false +} + +trait GpuHashJoin extends GpuJoinExec { def buildSide: GpuBuildSide protected lazy val (buildPlan, streamedPlan) = buildSide match { @@ -1065,14 +1074,14 @@ trait GpuHashJoin extends GpuExec { } protected lazy val (numFirstConditionTableColumns, boundCondition) = { - val (joinLeft, joinRight) = joinType match { - case RightOuter => (right, left) - case _ => (left, right) + val (buildOutput, streamOutput) = buildSide match { + case GpuBuildRight => (right.output, left.output) + case GpuBuildLeft => (left.output, right.output) } val boundCondition = condition.map { c => - GpuBindReferences.bindGpuReference(c, joinLeft.output ++ joinRight.output) + GpuBindReferences.bindGpuReference(c, streamOutput ++ buildOutput) } - (joinLeft.output.size, boundCondition) + (streamOutput.size, boundCondition) } def doJoin( diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala index 85c5806b47e..8aa792c23a4 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions.{col, when} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.{ExecutionPlanCaptureCallback, GpuFileSourceScanExec} -import org.apache.spark.sql.rapids.execution.GpuCustomShuffleReaderExec +import org.apache.spark.sql.rapids.execution.{GpuCustomShuffleReaderExec, GpuJoinExec} import org.apache.spark.sql.types.{ArrayType, DataTypes, DecimalType, IntegerType, StringType, StructField, StructType} class AdaptiveQueryExecSuite @@ -77,9 +77,10 @@ class AdaptiveQueryExecSuite } } - private def findTopLevelGpuShuffleHashJoin(plan: SparkPlan): Seq[GpuShuffledHashJoinExec] = { + private def findTopLevelGpuShuffleHashJoin(plan: SparkPlan): Seq[GpuJoinExec] = { collect(plan) { case j: GpuShuffledHashJoinExec => j + case j: GpuShuffledSymmetricHashJoinExec => j } } @@ -202,25 +203,35 @@ class AdaptiveQueryExecSuite } val shj = TestUtils.findOperator(df.queryExecution.executedPlan, - _.isInstanceOf[GpuShuffledHashJoinExec]).get - .asInstanceOf[GpuShuffledHashJoinExec] + _.isInstanceOf[GpuJoinExec]).get assert(shj.children.length == 2) - val childrenToCheck = if (shouldOptimizeHashJoinShuffle) { - // assert that the stream side of SHJ is coalesced - shj.buildSide match { - case GpuBuildLeft => Seq(shj.right) - case GpuBuildRight => Seq(shj.left) - } - } else { - // assert that both the build and stream side of SHJ are coalesced - // if we are not optimizing the build side shuffle - shj.children + shj match { + case j: GpuShuffledSymmetricHashJoinExec => + // assert that both children are NOT coalesced, as join will directly handle shuffle + assert(j.children.forall { + case GpuShuffleCoalesceExec(_, _) => false + case GpuCoalesceBatches(GpuShuffleCoalesceExec(_, _), _) => false + case _ => true + }) + case j: GpuShuffledHashJoinExec => + val childrenToCheck = if (shouldOptimizeHashJoinShuffle) { + // assert that the stream side of SHJ is coalesced + j.buildSide match { + case GpuBuildLeft => Seq(j.right) + case GpuBuildRight => Seq(j.left) + } + } else { + // assert that both the build and stream side of SHJ are coalesced + // if we are not optimizing the build side shuffle + j.children + } + assert(childrenToCheck.forall { + case GpuShuffleCoalesceExec(_, _) => true + case GpuCoalesceBatches(GpuShuffleCoalesceExec(_, _), _) => true + case _ => false + }) + case j => throw new IllegalStateException(s"Unexpected join: $j") } - assert(childrenToCheck.forall { - case GpuShuffleCoalesceExec(_, _) => true - case GpuCoalesceBatches(GpuShuffleCoalesceExec(_, _), _) => true - case _ => false - }) } }, conf) @@ -671,7 +682,7 @@ class AdaptiveQueryExecSuite } def checkSkewJoin( - joins: Seq[GpuShuffledHashJoinExec], + joins: Seq[GpuJoinExec], leftSkewNum: Int, rightSkewNum: Int): Unit = { assert(joins.size == 1 && joins.head.isSkewJoin) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/BroadcastHashJoinSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/BroadcastHashJoinSuite.scala index 22b35ab76ce..70fbf438f1d 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/BroadcastHashJoinSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/BroadcastHashJoinSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,7 +43,7 @@ class BroadcastHashJoinSuite extends SparkQueryCompareTestSuite { val bhjCount = PlanUtils.findOperators(plan, _.isInstanceOf[GpuBroadcastHashJoinExec]) assert(bhjCount.size === 1) - val shjCount = PlanUtils.findOperators(plan, _.isInstanceOf[GpuShuffledHashJoinExec]) + val shjCount = PlanUtils.findOperators(plan, _.isInstanceOf[GpuShuffledSymmetricHashJoinExec]) assert(shjCount.size === 1) }, conf) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/HashSortOptimizeSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/HashSortOptimizeSuite.scala index 3b306bb2769..7f417af464d 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/HashSortOptimizeSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/HashSortOptimizeSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023 NVIDIA CORPORATION. + * Copyright (c) 2019-2024 NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -89,8 +89,8 @@ class HashSortOptimizeSuite extends SparkQueryCompareTestSuite with FunSuiteWith val plan = rdf.queryExecution.executedPlan // execute the plan so that the final adaptive plan is available when AQE is on rdf.collect() - val joinNode = findOperator(plan, _.isInstanceOf[GpuShuffledHashJoinExec]) - assert(joinNode.isDefined, "No broadcast join node found") + val joinNode = findOperator(plan, _.isInstanceOf[GpuShuffledSymmetricHashJoinExec]) + assert(joinNode.isDefined, "No shuffled hash join node found") // should not have sort, because of not have GpuDataWritingCommandExec val sortNode = findOperator(plan, _.isInstanceOf[GpuSortExec]) assert(sortNode.isEmpty)