Skip to content

Commit

Permalink
Fix broadcast nested loop join for the no column case (NVIDIA#353)
Browse files Browse the repository at this point in the history
  • Loading branch information
revans2 authored Jul 16, 2020
1 parent 88820ac commit 462a729
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 46 deletions.
1 change: 0 additions & 1 deletion integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/323')
def test_broadcast_nested_loop_join_special_case(data_gen):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,35 @@ class GpuCartesianRDD(
}
}

object GpuNoColumnCrossJoin {
def divideIntoBatches(rowCounts: RDD[Long],
targetSizeBytes: Long,
numOutputRows: SQLMetric,
numOutputBatches: SQLMetric): RDD[ColumnarBatch] = {
// Hash aggregate explodes the rows out, so if we go too large
// it can blow up. The size of a Long is 8 bytes so we just go with
// that as our estimate, no nulls.
val maxRowCount = targetSizeBytes/8

def divideIntoBatches(rows: Long): Iterable[ColumnarBatch] = {
val numBatches = (rows + maxRowCount - 1)/maxRowCount
(0L until numBatches).map(i => {
val ret = new ColumnarBatch(new Array[ColumnVector](0))
if ((i + 1) * maxRowCount > rows) {
ret.setNumRows((rows - (i * maxRowCount)).toInt)
} else {
ret.setNumRows(maxRowCount.toInt)
}
numOutputRows += ret.numRows()
numOutputBatches += 1
ret
})
}

rowCounts.flatMap(divideIntoBatches)
}
}

case class GpuCartesianProductExec(
left: SparkPlan,
right: SparkPlan,
Expand Down Expand Up @@ -229,32 +258,16 @@ case class GpuCartesianProductExec(
ret
}

// Hash aggregate explodes the rows out, so if we go too large
// it can blow up. The size of a Long is 8 bytes so we just go with
// that as our estimate, no nulls.
val maxRowCount = targetSizeBytes/8

def divideIntoBatches(rows: Long): Iterable[ColumnarBatch] = {
val numBatches = (rows + maxRowCount - 1)/maxRowCount
(0L until numBatches).map(i => {
val ret = new ColumnarBatch(new Array[ColumnVector](0))
if ((i + 1) * maxRowCount > rows) {
ret.setNumRows((rows - (i * maxRowCount)).toInt)
} else {
ret.setNumRows(maxRowCount.toInt)
}
numOutputRows += ret.numRows()
numOutputBatches += 1
ret
})
}
val l = left.executeColumnar().map(getRowCountAndClose)
val r = right.executeColumnar().map(getRowCountAndClose)
// TODO here too it would probably be best to avoid doing any re-computation
// that happens with the built in cartesian, but writing another custom RDD
// just for this use case is not worth it without an explicit use case.
val prods = l.cartesian(r).map(p => p._1 * p._2)
prods.flatMap(divideIntoBatches)
GpuNoColumnCrossJoin.divideIntoBatches(
l.cartesian(r).map(p => p._1 * p._2),
targetSizeBytes,
numOutputRows,
numOutputBatches)
} else {
new GpuCartesianRDD(sparkContext,
boundCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNes
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.rapids.execution
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

@SerialVersionUID(100L)
class SerializeConcatHostBuffersDeserializeBatch(
Expand Down Expand Up @@ -75,6 +75,8 @@ class SerializeConcatHostBuffersDeserializeBatch(
} finally {
empty.close()
}
} else if (headers.head.getNumColumns == 0) {
JCudfSerialization.writeRowsToStream(out, numRows)
} else {
JCudfSerialization.writeConcatedStream(headers, buffers, out)
}
Expand All @@ -83,12 +85,19 @@ class SerializeConcatHostBuffersDeserializeBatch(
private def readObject(in: ObjectInputStream): Unit = {
val range = new NvtxRange("DeserializeBatch", NvtxColor.PURPLE)
try {
val tableInfo: JCudfSerialization.TableAndRowCountPair = JCudfSerialization.readTableFrom(in)
val tableInfo: JCudfSerialization.TableAndRowCountPair =
JCudfSerialization.readTableFrom(in)
try {
val table = tableInfo.getTable
// This is read as part of the broadcast join so we expect it to leak.
(0 until table.getNumberOfColumns).foreach(table.getColumn(_).noWarnLeakExpected())
this.batchInternal = GpuColumnVector.from(table)
if (table == null) {
val numRows = tableInfo.getNumRows
this.batchInternal = new ColumnarBatch(new Array[ColumnVector](0))
batchInternal.setNumRows(numRows.toInt)
} else {
// This is read as part of the broadcast join so we expect it to leak.
(0 until table.getNumberOfColumns).foreach(table.getColumn(_).noWarnLeakExpected())
this.batchInternal = GpuColumnVector.from(table)
}
} finally {
tableInfo.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package org.apache.spark.sql.rapids.execution

import ai.rapids.cudf.{NvtxColor, Table}
import com.nvidia.spark.rapids.{Arm, BaseExprMeta, ConfKeysAndIncompat, GpuBindReferences, GpuColumnVector, GpuExec, GpuExpression, GpuFilter, GpuOverrides, NvtxWithMetrics, RapidsConf, RapidsMeta, SparkPlanMeta}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuMetricNames.{NUM_OUTPUT_BATCHES, NUM_OUTPUT_ROWS, TOTAL_TIME}

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
Expand All @@ -29,6 +30,7 @@ import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.rapids.GpuNoColumnCrossJoin
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuBroadcastNestedLoopJoinMeta(
Expand Down Expand Up @@ -133,7 +135,7 @@ case class GpuBroadcastNestedLoopJoinExec(
buildSide: BuildSide,
joinType: JoinType,
condition: Option[Expression],
targetSize: Long) extends BinaryExecNode with GpuExec {
targetSizeBytes: Long) extends BinaryExecNode with GpuExec {

override protected def doExecute(): RDD[InternalRow] =
throw new IllegalStateException("This should only be called from columnar")
Expand Down Expand Up @@ -199,26 +201,50 @@ case class GpuBroadcastNestedLoopJoinExec(
val broadcastedRelation =
broadcastExchange.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

lazy val builtTable: Table = {
withResource(new NvtxWithMetrics("build join table", NvtxColor.GREEN, buildTime)) { _ =>
val ret = GpuColumnVector.from(broadcastedRelation.value.batch)
// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach( i => {
val column = ret.getColumn(i)
column.noWarnLeakExpected()
buildDataSize += column.getDeviceMemorySize
})
if (output.isEmpty) {
assert(boundCondition.isEmpty)

lazy val buildCount: Long = {
withResource(new NvtxWithMetrics("build join table", NvtxColor.GREEN, buildTime)) { _ =>
broadcastedRelation.value.batch.numRows()
}
}

def getRowCountAndClose(cb: ColumnarBatch): Long = {
val ret = cb.numRows()
cb.close()
GpuSemaphore.releaseIfNecessary(TaskContext.get())
ret
}
}

streamed.executeColumnar().mapPartitions { streamedIter =>
joinType match {
case _: InnerLike => GpuBroadcastNestedLoopJoinExec.innerLikeJoin(streamedIter,
builtTable, buildSide, boundCondition,
joinTime, joinOutputRows, numOutputRows, numOutputBatches, filterTime, totalTime)
case _ => throw new IllegalArgumentException(s"$joinType + $buildSide is not supported" +
s" and should be run on the CPU")
val counts = streamed.executeColumnar().map(getRowCountAndClose)
GpuNoColumnCrossJoin.divideIntoBatches(
counts.map(s => s * buildCount),
targetSizeBytes,
numOutputRows,
numOutputBatches)
} else {
lazy val builtTable: Table = {
withResource(new NvtxWithMetrics("build join table", NvtxColor.GREEN, buildTime)) { _ =>
val ret = GpuColumnVector.from(broadcastedRelation.value.batch)
// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(i => {
val column = ret.getColumn(i)
column.noWarnLeakExpected()
buildDataSize += column.getDeviceMemorySize
})
ret
}
}

streamed.executeColumnar().mapPartitions { streamedIter =>
joinType match {
case _: InnerLike => GpuBroadcastNestedLoopJoinExec.innerLikeJoin(streamedIter,
builtTable, buildSide, boundCondition,
joinTime, joinOutputRows, numOutputRows, numOutputBatches, filterTime, totalTime)
case _ => throw new IllegalArgumentException(s"$joinType + $buildSide is not supported" +
s" and should be run on the CPU")
}
}
}
}
Expand Down

0 comments on commit 462a729

Please sign in to comment.