Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broadcast nested loop join for the no column case #353

Merged
merged 1 commit into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/323')
def test_broadcast_nested_loop_join_special_case(data_gen):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,35 @@ class GpuCartesianRDD(
}
}

object GpuNoColumnCrossJoin {
def divideIntoBatches(rowCounts: RDD[Long],
targetSizeBytes: Long,
numOutputRows: SQLMetric,
numOutputBatches: SQLMetric): RDD[ColumnarBatch] = {
// Hash aggregate explodes the rows out, so if we go too large
// it can blow up. The size of a Long is 8 bytes so we just go with
// that as our estimate, no nulls.
val maxRowCount = targetSizeBytes/8

def divideIntoBatches(rows: Long): Iterable[ColumnarBatch] = {
val numBatches = (rows + maxRowCount - 1)/maxRowCount
(0L until numBatches).map(i => {
val ret = new ColumnarBatch(new Array[ColumnVector](0))
if ((i + 1) * maxRowCount > rows) {
ret.setNumRows((rows - (i * maxRowCount)).toInt)
} else {
ret.setNumRows(maxRowCount.toInt)
}
numOutputRows += ret.numRows()
numOutputBatches += 1
ret
})
}

rowCounts.flatMap(divideIntoBatches)
}
}

case class GpuCartesianProductExec(
left: SparkPlan,
right: SparkPlan,
Expand Down Expand Up @@ -229,32 +258,16 @@ case class GpuCartesianProductExec(
ret
}

// Hash aggregate explodes the rows out, so if we go too large
// it can blow up. The size of a Long is 8 bytes so we just go with
// that as our estimate, no nulls.
val maxRowCount = targetSizeBytes/8

def divideIntoBatches(rows: Long): Iterable[ColumnarBatch] = {
val numBatches = (rows + maxRowCount - 1)/maxRowCount
(0L until numBatches).map(i => {
val ret = new ColumnarBatch(new Array[ColumnVector](0))
if ((i + 1) * maxRowCount > rows) {
ret.setNumRows((rows - (i * maxRowCount)).toInt)
} else {
ret.setNumRows(maxRowCount.toInt)
}
numOutputRows += ret.numRows()
numOutputBatches += 1
ret
})
}
val l = left.executeColumnar().map(getRowCountAndClose)
val r = right.executeColumnar().map(getRowCountAndClose)
// TODO here too it would probably be best to avoid doing any re-computation
// that happens with the built in cartesian, but writing another custom RDD
// just for this use case is not worth it without an explicit use case.
val prods = l.cartesian(r).map(p => p._1 * p._2)
prods.flatMap(divideIntoBatches)
GpuNoColumnCrossJoin.divideIntoBatches(
l.cartesian(r).map(p => p._1 * p._2),
targetSizeBytes,
numOutputRows,
numOutputBatches)
} else {
new GpuCartesianRDD(sparkContext,
boundCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNes
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.rapids.execution
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

@SerialVersionUID(100L)
class SerializeConcatHostBuffersDeserializeBatch(
Expand Down Expand Up @@ -75,6 +75,8 @@ class SerializeConcatHostBuffersDeserializeBatch(
} finally {
empty.close()
}
} else if (headers.head.getNumColumns == 0) {
JCudfSerialization.writeRowsToStream(out, numRows)
} else {
JCudfSerialization.writeConcatedStream(headers, buffers, out)
}
Expand All @@ -83,12 +85,19 @@ class SerializeConcatHostBuffersDeserializeBatch(
private def readObject(in: ObjectInputStream): Unit = {
val range = new NvtxRange("DeserializeBatch", NvtxColor.PURPLE)
try {
val tableInfo: JCudfSerialization.TableAndRowCountPair = JCudfSerialization.readTableFrom(in)
val tableInfo: JCudfSerialization.TableAndRowCountPair =
JCudfSerialization.readTableFrom(in)
try {
val table = tableInfo.getTable
// This is read as part of the broadcast join so we expect it to leak.
(0 until table.getNumberOfColumns).foreach(table.getColumn(_).noWarnLeakExpected())
this.batchInternal = GpuColumnVector.from(table)
if (table == null) {
val numRows = tableInfo.getNumRows
this.batchInternal = new ColumnarBatch(new Array[ColumnVector](0))
batchInternal.setNumRows(numRows.toInt)
} else {
// This is read as part of the broadcast join so we expect it to leak.
(0 until table.getNumberOfColumns).foreach(table.getColumn(_).noWarnLeakExpected())
this.batchInternal = GpuColumnVector.from(table)
}
} finally {
tableInfo.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package org.apache.spark.sql.rapids.execution

import ai.rapids.cudf.{NvtxColor, Table}
import com.nvidia.spark.rapids.{Arm, BaseExprMeta, ConfKeysAndIncompat, GpuBindReferences, GpuColumnVector, GpuExec, GpuExpression, GpuFilter, GpuOverrides, NvtxWithMetrics, RapidsConf, RapidsMeta, SparkPlanMeta}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuMetricNames.{NUM_OUTPUT_BATCHES, NUM_OUTPUT_ROWS, TOTAL_TIME}

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
Expand All @@ -29,6 +30,7 @@ import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.rapids.GpuNoColumnCrossJoin
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuBroadcastNestedLoopJoinMeta(
Expand Down Expand Up @@ -133,7 +135,7 @@ case class GpuBroadcastNestedLoopJoinExec(
buildSide: BuildSide,
joinType: JoinType,
condition: Option[Expression],
targetSize: Long) extends BinaryExecNode with GpuExec {
targetSizeBytes: Long) extends BinaryExecNode with GpuExec {

override protected def doExecute(): RDD[InternalRow] =
throw new IllegalStateException("This should only be called from columnar")
Expand Down Expand Up @@ -199,26 +201,50 @@ case class GpuBroadcastNestedLoopJoinExec(
val broadcastedRelation =
broadcastExchange.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]()

lazy val builtTable: Table = {
withResource(new NvtxWithMetrics("build join table", NvtxColor.GREEN, buildTime)) { _ =>
val ret = GpuColumnVector.from(broadcastedRelation.value.batch)
// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach( i => {
val column = ret.getColumn(i)
column.noWarnLeakExpected()
buildDataSize += column.getDeviceMemorySize
})
if (output.isEmpty) {
assert(boundCondition.isEmpty)

lazy val buildCount: Long = {
withResource(new NvtxWithMetrics("build join table", NvtxColor.GREEN, buildTime)) { _ =>
broadcastedRelation.value.batch.numRows()
}
}

def getRowCountAndClose(cb: ColumnarBatch): Long = {
val ret = cb.numRows()
cb.close()
GpuSemaphore.releaseIfNecessary(TaskContext.get())
ret
}
}

streamed.executeColumnar().mapPartitions { streamedIter =>
joinType match {
case _: InnerLike => GpuBroadcastNestedLoopJoinExec.innerLikeJoin(streamedIter,
builtTable, buildSide, boundCondition,
joinTime, joinOutputRows, numOutputRows, numOutputBatches, filterTime, totalTime)
case _ => throw new IllegalArgumentException(s"$joinType + $buildSide is not supported" +
s" and should be run on the CPU")
val counts = streamed.executeColumnar().map(getRowCountAndClose)
GpuNoColumnCrossJoin.divideIntoBatches(
counts.map(s => s * buildCount),
targetSizeBytes,
numOutputRows,
numOutputBatches)
} else {
lazy val builtTable: Table = {
withResource(new NvtxWithMetrics("build join table", NvtxColor.GREEN, buildTime)) { _ =>
val ret = GpuColumnVector.from(broadcastedRelation.value.batch)
// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(i => {
val column = ret.getColumn(i)
column.noWarnLeakExpected()
buildDataSize += column.getDeviceMemorySize
})
ret
}
}

streamed.executeColumnar().mapPartitions { streamedIter =>
joinType match {
case _: InnerLike => GpuBroadcastNestedLoopJoinExec.innerLikeJoin(streamedIter,
builtTable, buildSide, boundCondition,
joinTime, joinOutputRows, numOutputRows, numOutputBatches, filterTime, totalTime)
case _ => throw new IllegalArgumentException(s"$joinType + $buildSide is not supported" +
s" and should be run on the CPU")
}
}
}
}
Expand Down