Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing conflicts in branch-0.3 #762

Merged
merged 7 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
395 changes: 395 additions & 0 deletions CHANGELOG.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/get-started/getting-started-on-prem.md
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ To enable _GPU Scheduling for Pandas UDF_, you need to configure your spark job
On Standalone, you need to add
```shell
...
--conf spark.executorEnv.PYTHONPATH=rapids-4-spark_2.12-0.2.0-SNAPSHOT.jar \
--conf spark.executorEnv.PYTHONPATH=rapids-4-spark_2.12-0.2.0.jar \
--py-files ${SPARK_RAPIDS_PLUGIN_JAR}
```

Expand Down
16 changes: 14 additions & 2 deletions integration_tests/src/main/python/tpcds_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
'q30', 'q31', 'q32', 'q33', 'q34', 'q35', 'q36', 'q37', 'q38', 'q39a', 'q39b',
'q40', 'q41', 'q42', 'q43', 'q44', 'q45', 'q46', 'q47', 'q48', 'q49',
'q50', 'q51', 'q52', 'q53', 'q54', 'q55', 'q56', 'q57', 'q58', 'q59',
'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q67', 'q68', 'q69',
'q70', 'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79',
'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q68', 'q69',
'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79',
'q80', 'q81', 'q82', 'q83', 'q84', 'q85', 'q86', 'q87', 'q88', 'q89',
'q90', 'q91', 'q92', 'q93', 'q94', 'q95', 'q96', 'q97', 'q98', 'q99',
'ss_max', 'ss_maxb']
Expand All @@ -35,5 +35,17 @@
@allow_non_gpu(any=True)
@pytest.mark.parametrize('query', queries)
def test_tpcds(tpcds, query):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpcds.do_test_query(query),
conf={'spark.rapids.sql.variableFloatAgg.enabled': 'true'})

no_var_agg_queries = ['q67', 'q70']

@incompat
@ignore_order
@approximate_float
@allow_non_gpu(any=True)
@pytest.mark.parametrize('query', no_var_agg_queries)
def test_tpcds_no_var_agg(tpcds, query):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpcds.do_test_query(query))
1 change: 1 addition & 0 deletions integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def pandas_sum(to_process: pd.Series) -> float:
pandas_sum(f.col('a'))),
conf=arrow_udf_conf)

@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/757")
@ignore_order
@allow_non_gpu('AggregateInPandasExec', 'PythonUDF', 'Alias')
@pytest.mark.parametrize('data_gen', integral_gens, ids=idfn)
Expand Down
2 changes: 1 addition & 1 deletion jenkins/Dockerfile.integration.centos7
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ ENV PATH="/opt/conda/bin:${PATH}"
RUN conda --version

ARG CUDA_TOOLKIT_VER=10.1
RUN conda install -c rapidsai-nightly -c nvidia -c conda-forge \
RUN conda install -c rapidsai -c nvidia -c conda-forge \
-c defaults cudf=0.15 python=3.7 cudatoolkit=${CUDA_TOOLKIT_VER}

RUN conda install spacy && \
Expand Down
9 changes: 4 additions & 5 deletions scripts/generate-changelog
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,13 @@ def form_changelog(path: str, changelog: dict):
sorted_dict = OrderedDict(sorted(changelog.items(), reverse=True))
subsections = ""
for project_name, issues in sorted_dict.items():
subsections += f"\n## {project_name}\n"
subsections += f"\n\n## {project_name}"
subsections += form_subsection(issues, FEATURES)
subsections += form_subsection(issues, PERFORMANCE)
subsections += form_subsection(issues, BUGS_FIXED)
subsections += form_subsection(issues, PRS)
markdown = f"""# Change log
Generated on {date.today()}
{subsections}
Generated on {date.today()}{subsections}
"""
with open(path, "w") as file:
file.write(markdown)
Expand All @@ -273,8 +272,8 @@ Generated on {date.today()}
def form_subsection(issues: dict, subtitle: str):
if len(issues[subtitle]) == 0:
return ''
subsection = f"\n### {subtitle}\n"
subsection += "|||\n|:---|:---|"
subsection = f"\n\n### {subtitle}"
subsection += "\n|||\n|:---|:---|"
for issue in sorted(issues[subtitle], key=lambda x: x['time'], reverse=True):
subsection += f"\n|[#{issue['number']}]({issue['url']})|{issue['title']}|"
return subsection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,15 @@ case class GpuBroadcastHashJoinExec(
val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

lazy val builtTable = {
// TODO clean up intermediate results...
val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)
val combined = combine(keys, broadcastRelation.value.batch)
val ret = GpuColumnVector.from(combined)
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
val filtered = filterBuiltTableIfNeeded(combined)
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, HashJoin}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.rapids.GpuAnd
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

object GpuHashJoin {
Expand All @@ -39,6 +40,11 @@ object GpuHashJoin {
}
case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported")
}

def incRefCount(cb: ColumnarBatch): ColumnarBatch = {
GpuColumnVector.extractBases(cb).foreach(_.incRefCount())
cb
}
}

trait GpuHashJoin extends GpuExec with HashJoin {
Expand Down Expand Up @@ -110,6 +116,67 @@ trait GpuHashJoin extends GpuExec with HashJoin {
output.indices.map (v => v + joinLength)
}

// Spark adds in rules to filter out nulls for some types of joins, but it does not
// guarantee 100% that all nulls will be filtered out by the time they get to
// this point, but because of https://github.com/rapidsai/cudf/issues/6052
// we need to filter out the nulls ourselves until it is fixed.
// InnerLike | LeftSemi =>
// filter left and right keys
// RightOuter =>
// filter left keys
// LeftOuter | LeftAnti =>
// filter right keys

private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = {
val builtAnyNullable = gpuBuildKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => builtAnyNullable
case (RightOuter, BuildLeft) => builtAnyNullable
case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable
case _ => false
}
}

private[this] lazy val shouldFilterStreamTableForNulls: Boolean = {
val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => streamedAnyNullable
case (RightOuter, BuildRight) => streamedAnyNullable
case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable
case _ => false
}
}

private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression =
exprs.zipWithIndex.map { kv =>
GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable))
}.reduce(GpuAnd)

private[this] lazy val builtTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuBuildKeys)

private[this] lazy val streamedTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuStreamedKeys)

/**
* Filter the builtBatch if needed. builtBatch will be closed.
* @param builtBatch
* @return
*/
def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
if (shouldFilterBuiltTableForNulls) {
GpuFilter(builtBatch, builtTableNullFilterExpression)
} else {
builtBatch
}

private[this] def filterStreamedTableIfNeeded(streamedBatch:ColumnarBatch): ColumnarBatch =
if (shouldFilterStreamTableForNulls) {
GpuFilter(streamedBatch, streamedTableNullFilterExpression)
} else {
streamedBatch
}

def doJoin(builtTable: Table,
stream: Iterator[ColumnarBatch],
boundCondition: Option[Expression],
Expand Down Expand Up @@ -172,16 +239,14 @@ trait GpuHashJoin extends GpuExec with HashJoin {
joinTime: SQLMetric,
filterTime: SQLMetric): Option[ColumnarBatch] = {

val streamedTable = try {
val streamedKeysBatch = GpuProjectExec.project(streamedBatch, gpuStreamedKeys)
try {
val combined = combine(streamedKeysBatch, streamedBatch)
GpuColumnVector.from(combined)
} finally {
streamedKeysBatch.close()
val combined = withResource(streamedBatch) { streamedBatch =>
withResource(GpuProjectExec.project(streamedBatch, gpuStreamedKeys)) {
streamedKeysBatch =>
GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch))
}
} finally {
streamedBatch.close()
}
val streamedTable = withResource(filterStreamedTableIfNeeded(combined)) { cb =>
GpuColumnVector.from(cb)
}

val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,20 @@ case class GpuShuffledHashJoinExec(
streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) {
(streamIter, buildIter) => {
var combinedSize = 0

val startTime = System.nanoTime()
val buildBatch =
ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput)
val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys)
val builtTable = try {
// Combine does not inc any reference counting
val combined = combine(keys, buildBatch)
combinedSize =
GpuColumnVector.extractColumns(combined)
.map(_.getBase.getDeviceMemorySize).sum.toInt
GpuColumnVector.from(combined)
} finally {
keys.close()
buildBatch.close()
val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(
buildIter, localBuildOutput)) { buildBatch: ColumnarBatch =>
withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch))
val filtered = filterBuiltTableIfNeeded(combined)
combinedSize =
GpuColumnVector.extractColumns(filtered)
.map(_.getBase.getDeviceMemorySize).sum.toInt
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}
}

val delta = System.nanoTime() - startTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,15 @@ case class GpuBroadcastHashJoinExec(
val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

lazy val builtTable = {
// TODO clean up intermediate results...
val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)
val combined = combine(keys, broadcastRelation.value.batch)
val ret = GpuColumnVector.from(combined)
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
val filtered = filterBuiltTableIfNeeded(combined)
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}

// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.execution.joins.HashJoin
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.rapids.GpuAnd
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

object GpuHashJoin {
Expand All @@ -40,6 +41,11 @@ object GpuHashJoin {
}
case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported")
}

def incRefCount(cb: ColumnarBatch): ColumnarBatch = {
GpuColumnVector.extractBases(cb).foreach(_.incRefCount())
cb
}
}

trait GpuHashJoin extends GpuExec with HashJoin {
Expand Down Expand Up @@ -111,6 +117,67 @@ trait GpuHashJoin extends GpuExec with HashJoin {
output.indices.map (v => v + joinLength)
}

// Spark adds in rules to filter out nulls for some types of joins, but it does not
// guarantee 100% that all nulls will be filtered out by the time they get to
// this point, but because of https://github.com/rapidsai/cudf/issues/6052
// we need to filter out the nulls ourselves until it is fixed.
// InnerLike | LeftSemi =>
// filter left and right keys
// RightOuter =>
// filter left keys
// LeftOuter | LeftAnti =>
// filter right keys

private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = {
val builtAnyNullable = gpuBuildKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => builtAnyNullable
case (RightOuter, BuildLeft) => builtAnyNullable
case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable
case _ => false
}
}

private[this] lazy val shouldFilterStreamTableForNulls: Boolean = {
val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => streamedAnyNullable
case (RightOuter, BuildRight) => streamedAnyNullable
case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable
case _ => false
}
}

private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression =
exprs.zipWithIndex.map { kv =>
GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable))
}.reduce(GpuAnd)

private[this] lazy val builtTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuBuildKeys)

private[this] lazy val streamedTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuStreamedKeys)

/**
* Filter the builtBatch if needed. builtBatch will be closed.
* @param builtBatch
* @return
*/
def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
if (shouldFilterBuiltTableForNulls) {
GpuFilter(builtBatch, builtTableNullFilterExpression)
} else {
builtBatch
}

private[this] def filterStreamedTableIfNeeded(streamedBatch:ColumnarBatch): ColumnarBatch =
if (shouldFilterStreamTableForNulls) {
GpuFilter(streamedBatch, streamedTableNullFilterExpression)
} else {
streamedBatch
}

def doJoin(builtTable: Table,
stream: Iterator[ColumnarBatch],
boundCondition: Option[Expression],
Expand Down Expand Up @@ -173,16 +240,14 @@ trait GpuHashJoin extends GpuExec with HashJoin {
joinTime: SQLMetric,
filterTime: SQLMetric): Option[ColumnarBatch] = {

val streamedTable = try {
val streamedKeysBatch = GpuProjectExec.project(streamedBatch, gpuStreamedKeys)
try {
val combined = combine(streamedKeysBatch, streamedBatch)
GpuColumnVector.from(combined)
} finally {
streamedKeysBatch.close()
val combined = withResource(streamedBatch) { streamedBatch =>
withResource(GpuProjectExec.project(streamedBatch, gpuStreamedKeys)) {
streamedKeysBatch =>
GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch))
}
} finally {
streamedBatch.close()
}
val streamedTable = withResource(filterStreamedTableIfNeeded(combined)) { cb =>
GpuColumnVector.from(cb)
}

val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime)
Expand Down
Loading