Skip to content

Commit

Permalink
Refactor join code to reduce duplicated code (NVIDIA#1839)
Browse files Browse the repository at this point in the history
* Refactor join code to reduce duplicated code

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Move nodeName override to base class
  • Loading branch information
jlowe authored Mar 2, 2021
1 parent 057f682 commit 05e08ce
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 276 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,11 @@ package com.nvidia.spark.rapids.shims.spark300

import com.nvidia.spark.rapids._

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, ShuffledHashJoinExec}
import org.apache.spark.sql.rapids.execution.{GpuHashJoin, GpuShuffledHashJoinBase}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.sql.rapids.execution.GpuHashJoin

object GpuJoinUtils {
def getGpuBuildSide(buildSide: BuildSide): GpuBuildSide = {
Expand Down Expand Up @@ -85,79 +80,6 @@ case class GpuShuffledHashJoinExec(
extends GpuShuffledHashJoinBase(
leftKeys,
rightKeys,
joinType,
buildSide,
condition,
left,
right,
isSkewJoin)
with GpuHashJoin {
import GpuMetric._

override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE),
BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME),
STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))

override def requiredChildDistribution: Seq[Distribution] =
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil

override protected def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException(
"GpuShuffledHashJoin does not support the execute() code path.")
}

override def childrenCoalesceGoal: Seq[CoalesceGoal] = buildSide match {
case GpuBuildLeft => Seq(RequireSingleBatch, null)
case GpuBuildRight => Seq(null, RequireSingleBatch)
}

override def doExecuteColumnar() : RDD[ColumnarBatch] = {
val buildDataSize = gpuLongMetric(BUILD_DATA_SIZE)
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val totalTime = gpuLongMetric(TOTAL_TIME)
val buildTime = gpuLongMetric(BUILD_TIME)
val streamTime = gpuLongMetric(STREAM_TIME)
val joinTime = gpuLongMetric(JOIN_TIME)
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) {
(streamIter, buildIter) => {
var combinedSize = 0

val startTime = System.nanoTime()
val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(
buildIter, localBuildOutput)) { buildBatch: ColumnarBatch =>
withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch))
withResource(combined) { combined =>
combinedSize =
GpuColumnVector.extractColumns(combined)
.map(_.getBase.getDeviceMemorySize).sum.toInt
GpuColumnVector.from(combined)
}
}
}

val delta = System.nanoTime() - startTime
buildTime += delta
totalTime += delta
buildDataSize += combinedSize
val context = TaskContext.get()
context.addTaskCompletionListener[Unit](_ => builtTable.close())

doJoin(builtTable, streamIter, boundCondition,
numOutputRows, joinOutputRows, numOutputBatches,
streamTime, joinTime, filterTime, totalTime)
}
}
}
}
isSkewJoin = isSkewJoin)
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,12 @@ package com.nvidia.spark.rapids.shims.spark301db

import com.nvidia.spark.rapids._

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution}
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.rapids.execution.GpuHashJoin
import org.apache.spark.sql.vectorized.ColumnarBatch

object GpuJoinUtils {
def getGpuBuildSide(buildSide: BuildSide): GpuBuildSide = {
Expand Down Expand Up @@ -80,74 +75,10 @@ case class GpuShuffledHashJoinExec(
buildSide: GpuBuildSide,
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._

override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE),
BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME),
STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))

override def requiredChildDistribution: Seq[Distribution] =
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil

override protected def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException(
"GpuShuffledHashJoin does not support the execute() code path.")
}

override def childrenCoalesceGoal: Seq[CoalesceGoal] = buildSide match {
case GpuBuildLeft => Seq(RequireSingleBatch, null)
case GpuBuildRight => Seq(null, RequireSingleBatch)
}

override def doExecuteColumnar() : RDD[ColumnarBatch] = {
val buildDataSize = gpuLongMetric(BUILD_DATA_SIZE)
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val totalTime = gpuLongMetric(TOTAL_TIME)
val buildTime = gpuLongMetric(BUILD_TIME)
val streamTime = gpuLongMetric(STREAM_TIME)
val joinTime = gpuLongMetric(JOIN_TIME)
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) {
(streamIter, buildIter) => {
var combinedSize = 0

val startTime = System.nanoTime()
val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(
buildIter, localBuildOutput)) { buildBatch: ColumnarBatch =>
withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch))
withResource(combined) { combined =>
combinedSize =
GpuColumnVector.extractColumns(combined)
.map(_.getBase.getDeviceMemorySize).sum.toInt
GpuColumnVector.from(combined)
}
}
}

val delta = System.nanoTime() - startTime
buildTime += delta
totalTime += delta
buildDataSize += combinedSize
val context = TaskContext.get()
context.addTaskCompletionListener[Unit](_ => builtTable.close())

doJoin(builtTable, streamIter, boundCondition,
numOutputRows, joinOutputRows, numOutputBatches,
streamTime, joinTime, filterTime, totalTime)
}
}
}
}
right: SparkPlan)
extends GpuShuffledHashJoinBase(
leftKeys,
rightKeys,
buildSide,
condition,
isSkewJoin = false)
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,12 @@ package com.nvidia.spark.rapids.shims.spark311

import com.nvidia.spark.rapids._

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.rapids.execution.{GpuHashJoin, GpuShuffledHashJoinBase}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.sql.rapids.execution.GpuHashJoin

object GpuJoinUtils {
def getGpuBuildSide(buildSide: BuildSide): GpuBuildSide = {
Expand Down Expand Up @@ -86,79 +81,6 @@ case class GpuShuffledHashJoinExec(
extends GpuShuffledHashJoinBase(
leftKeys,
rightKeys,
joinType,
buildSide,
condition,
left,
right,
isSkewJoin)
with GpuHashJoin {
import GpuMetric._

override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE),
BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME),
STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))

override def requiredChildDistribution: Seq[Distribution] =
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil

override protected def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException(
"GpuShuffledHashJoin does not support the execute() code path.")
}

override def childrenCoalesceGoal: Seq[CoalesceGoal] = buildSide match {
case GpuBuildLeft => Seq(RequireSingleBatch, null)
case GpuBuildRight => Seq(null, RequireSingleBatch)
}

override def doExecuteColumnar() : RDD[ColumnarBatch] = {
val buildDataSize = gpuLongMetric(BUILD_DATA_SIZE)
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val totalTime = gpuLongMetric(TOTAL_TIME)
val buildTime = gpuLongMetric(BUILD_TIME)
val streamTime = gpuLongMetric(STREAM_TIME)
val joinTime = gpuLongMetric(JOIN_TIME)
val filterTime = gpuLongMetric(FILTER_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)

val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) {
(streamIter, buildIter) => {
var combinedSize = 0

val startTime = System.nanoTime()
val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(
buildIter, localBuildOutput)) { buildBatch: ColumnarBatch =>
withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch))
withResource(combined) { combined =>
combinedSize =
GpuColumnVector.extractColumns(combined)
.map(_.getBase.getDeviceMemorySize).sum.toInt
GpuColumnVector.from(combined)
}
}
}

val delta = System.nanoTime() - startTime
buildTime += delta
totalTime += delta
buildDataSize += combinedSize
val context = TaskContext.get()
context.addTaskCompletionListener[Unit](_ => builtTable.close())

doJoin(builtTable, streamIter, boundCondition,
numOutputRows, joinOutputRows, numOutputBatches,
streamTime, joinTime, filterTime, totalTime)
}
}
}
}
isSkewJoin = isSkewJoin)
Loading

0 comments on commit 05e08ce

Please sign in to comment.