Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an explain only mode to the plugin #4322

Merged
merged 33 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8eb81f6
Add an explain only mode configuration to run plugin on CPU to get wh…
tgravescs Dec 1, 2021
1fca5ef
Updates
tgravescs Dec 1, 2021
f043f86
Don't allocate gpu or enable shuffle for explain only mode
tgravescs Dec 2, 2021
431426c
explain only mode check for rapids shuffle internal manager
tgravescs Dec 2, 2021
3541832
update doc
tgravescs Dec 2, 2021
6ec51d0
Change how we check explain with sql enabled
tgravescs Dec 3, 2021
21a79ca
Merge branch 'explainonlymode' of github.com:tgravescs/spark-rapids i…
tgravescs Dec 3, 2021
b8e9e26
update not work message:
tgravescs Dec 3, 2021
c15b786
Merge remote-tracking branch 'origin/branch-22.02' into explainonlymode
tgravescs Dec 3, 2021
ba3b355
fix spacing
tgravescs Dec 6, 2021
42aa80f
Update doc adding explain option
tgravescs Dec 6, 2021
278d96c
update docs
tgravescs Dec 6, 2021
2a3b228
Add explian only mode test to make sure runs on cpu
tgravescs Dec 6, 2021
3bdf4f2
add note about adaptive
tgravescs Dec 7, 2021
4b88718
get rest of broadcast shims
tgravescs Dec 7, 2021
8f9301a
Update logging of enabled and explain only mode
tgravescs Dec 7, 2021
d79c9a4
update config docs
tgravescs Dec 7, 2021
6402631
Merge remote-tracking branch 'origin/branch-22.02' into explainonlymode
tgravescs Jan 12, 2022
ab49521
Update to use the spark.rapids.sql.mode config
tgravescs Jan 12, 2022
ce38223
update config doc and formatting
tgravescs Jan 12, 2022
b1dd70e
update docs
tgravescs Jan 12, 2022
c94d732
fix typo
tgravescs Jan 12, 2022
0003e4e
update configs.md
tgravescs Jan 12, 2022
77c82a2
Change to check for isSqlEnabled and the mode separately because we may
tgravescs Jan 13, 2022
c02dae5
fix spacing
tgravescs Jan 13, 2022
2afeb2b
update auto generated configs doc
tgravescs Jan 13, 2022
dae6749
Update docs/get-started/getting-started-workload-qualification.md
tgravescs Jan 14, 2022
0ec5e1f
Update docs/get-started/getting-started-workload-qualification.md
tgravescs Jan 14, 2022
8e84659
update copyrights and change text from plugin to Rapids Accelerator
tgravescs Jan 14, 2022
6aaacf8
Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.s…
tgravescs Jan 14, 2022
f6f9d67
Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.s…
tgravescs Jan 14, 2022
fc5a1c2
Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.s…
tgravescs Jan 14, 2022
a3f1039
update configs doc
tgravescs Jan 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ Name | Description | Default Value
<a name="sql.join.leftSemi.enabled"></a>spark.rapids.sql.join.leftSemi.enabled|When set to true left semi joins are enabled on the GPU|true
<a name="sql.join.rightOuter.enabled"></a>spark.rapids.sql.join.rightOuter.enabled|When set to true right outer joins are enabled on the GPU|true
<a name="sql.metrics.level"></a>spark.rapids.sql.metrics.level|GPU plans can produce a lot more metrics than CPU plans do. In very large queries this can sometimes result in going over the max result size limit for the driver. Supported values include DEBUG which will enable all metrics supported and typically only needs to be enabled when debugging the plugin. MODERATE which should output enough metrics to understand how long each part of the query is taking and how much data is going to each part of the query. ESSENTIAL which disables most metrics except those Apache Spark CPU plans will also report or their equivalents.|MODERATE
<a name="sql.mode"></a>spark.rapids.sql.mode|Set the mode for the Rapids Accelerator. The supported modes are explainOnly and executeOnGPU. This config can not be changed at runtime, you must restart the application for it to take affect. The default mode is executeOnGPU, which means the Rapids Accelerator plugin convert the Spark operations and execute them on the GPU when possible. The explainOnly mode allows running queries on the CPU and the Rapids Accelerator will evaluate the queries as if it was going to run on the GPU. The explanations of what would have run on the GPU and why are output in log messages. When using explainOnly mode, the default explain output is ALL, this can be changed by setting spark.rapids.sql.explain. Seeing that config for more details.|executeongpu
jlowe marked this conversation as resolved.
Show resolved Hide resolved
<a name="sql.python.gpu.enabled"></a>spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false
<a name="sql.reader.batchSizeBytes"></a>spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647
<a name="sql.reader.batchSizeRows"></a>spark.rapids.sql.reader.batchSizeRows|Soft limit on the maximum number of rows the reader will read per batch. The orc and parquet readers will read row groups until this limit is met or exceeded. The limit is respected by the csv reader.|2147483647
Expand Down
69 changes: 65 additions & 4 deletions docs/get-started/getting-started-workload-qualification.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,76 @@ Since the two tools are only analyzing Spark event logs they do not have the det
captured from a running Spark job. However it is very convenient because you can run the tools on
existing logs and do not need a GPU cluster to run the tools.

## 2. Function `explainPotentialGpuPlan`
## 2. Get the Explain Output

This allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was
going to run on the GPU and tell you what would and wouldn't have been run on the GPU.
There are two ways to run this, one is running with the RAPIDS Accelerator set to explain only mode and
the other is to modify your existing Spark application code to call a function directly.

Please note that if using adaptive execution in Spark the explain output may not be perfect
as the plan could have changed along the way in a way that we wouldn't see by looking at just
the CPU plan.

### Requirements

- A Spark 3.x CPU cluster
- The `rapids-4-spark` and `cudf` [jars](../download.md)
- Ability to modify the existing Spark application code
- Ability to modify the existing Spark application code if using the function call directly

### How to use
### Using the Configuration Flag for Explain Only Mode

Starting with version 22.02, the RAPIDS Accelerator can be run in explain only mode.
This mode allows you to run on a CPU cluster and can help us understand the potential GPU plan and
if there are any unsupported features. Basically it will log the output which is the same as
the driver logs with `spark.rapids.sql.explain=all`.

1. In `spark-shell`, add the `rapids-4-spark` and `cudf` jars into --jars option or put them in the
Spark classpath and enable the configs `spark.rapids.sql.mode=explainOnly` and
`spark.plugins=com.nvidia.spark.SQLPlugin`.

For example:

```bash
spark-shell --jars /PathTo/cudf-<version>.jar,/PathTo/rapids-4-spark_<version>.jar --conf spark.rapids.sql.mode=explainOnly --conf spark.plugins=com.nvidia.spark.SQLPlugin
```
2. Enable optional RAPIDS Accelerator related parameters based on your setup.

Enabling optional parameters may allow more operations to run on the GPU but please understand
the meaning and risk of above parameters before enabling it. Please refer to the
[configuration documentation](../configs.md) for details of RAPIDS Accelerator
parameters.

For example, if your jobs have `double`, `float` and `decimal` operators together with some Scala
UDFs, you can set the following parameters:

```scala
spark.conf.set("spark.rapids.sql.incompatibleOps.enabled", true)
spark.conf.set("spark.rapids.sql.variableFloatAgg.enabled", true)
spark.conf.set("spark.rapids.sql.decimalType.enabled", true)
spark.conf.set("spark.rapids.sql.castFloatToDecimal.enabled",true)
spark.conf.set("spark.rapids.sql.castDecimalToFloat.enabled",true)
spark.conf.set("spark.rapids.sql.udfCompiler.enabled",true)
```

3. Run your query and check the driver logs for the explain output.

Below are sample driver log messages starting with `!` which indicate the unsupported features in
this version:

```
! <RowDataSourceScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RowDataSourceScanExec
```

This log can show you which operators (on what data type) can not run on GPU and the reason.
If it shows a specific RAPIDS Accelerator parameter which can be turned on to enable that feature,
you should first understand the risk and applicability of that parameter based on [configs
doc](../configs.md) and then enable that parameter and try the tool again.

Since its output is directly based on specific version of `rapids-4-spark` jar, the gap analysis is
pretty accurate.

### How to use the Function Call

Starting with version 21.12 of the RAPIDS Accelerator, a new function named
`explainPotentialGpuPlan` is added which can help us understand the potential GPU plan and if there
Expand Down Expand Up @@ -157,4 +218,4 @@ For example, the log lines starting with `!` is the so-called not-supported mess
The indentation indicates the parent and child relationship for those expressions.
If not all of the children expressions can run on GPU, the parent can not run on GPU either.
So above example shows the missing feature is `ReplicateRows` expression. So we filed a feature request
[issue-4104](https://github.com/NVIDIA/spark-rapids/issues/4104) based on 21.12 version.
[issue-4104](https://github.com/NVIDIA/spark-rapids/issues/4104) based on 21.12 version.
49 changes: 49 additions & 0 deletions integration_tests/src/main/python/explain_mode_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from pyspark.sql.types import *
from asserts import assert_gpu_fallback_collect
from data_gen import *
from marks import ignore_order

# copied from sort_test and added explainOnly mode
_explain_mode_conf = {'spark.rapids.sql.mode': 'explainOnly',
'spark.sql.join.preferSortMergeJoin': 'True',
'spark.sql.shuffle.partitions': '2',
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'
}

def create_df(spark, data_gen, left_length, right_length):
left = binary_op_df(spark, data_gen, length=left_length)
right = binary_op_df(spark, data_gen, length=right_length).withColumnRenamed("a", "r_a")\
.withColumnRenamed("b", "r_b")
return left, right


# just run with one join type since not testing join itself
all_join_types = ['Left']

# use a subset of types just to test explain only mode
all_gen = [StringGen(), ByteGen()]

# here we use the assert_gpu_fallback_collect to make sure explain only mode runs on the CPU
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', all_join_types, ids=idfn)
def test_explain_only_sortmerge_join(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
return left.join(right, left.a == right.r_a, join_type)
assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_explain_mode_conf)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -328,7 +328,8 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer with Arm {
val bytesAllowedPerBatch = getBytesAllowedPerBatch(conf)
val (schemaWithUnambiguousNames, _) = getSupportedSchemaFromUnsupported(schema)
val structSchema = schemaWithUnambiguousNames.toStructType
if (rapidsConf.isSqlEnabled && isSchemaSupportedByCudf(schema)) {
if (rapidsConf.isSqlEnabled && rapidsConf.isSqlExecuteOnGPU &&
isSchemaSupportedByCudf(schema)) {
def putOnGpuIfNeeded(batch: ColumnarBatch): ColumnarBatch = {
if (!batch.column(0).isInstanceOf[GpuColumnVector]) {
val s: StructType = structSchema
Expand Down Expand Up @@ -553,7 +554,7 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer with Arm {
val rapidsConf = new RapidsConf(conf)
val (cachedSchemaWithNames, selectedSchemaWithNames) =
getSupportedSchemaFromUnsupported(cacheAttributes, newSelectedAttributes)
if (rapidsConf.isSqlEnabled &&
if (rapidsConf.isSqlEnabled && rapidsConf.isSqlExecuteOnGPU &&
isSchemaSupportedByCudf(cachedSchemaWithNames)) {
val batches = convertCachedBatchToColumnarInternal(input, cachedSchemaWithNames,
selectedSchemaWithNames, newSelectedAttributes)
Expand Down Expand Up @@ -1454,7 +1455,8 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer with Arm {
val rapidsConf = new RapidsConf(conf)
val bytesAllowedPerBatch = getBytesAllowedPerBatch(conf)
val (schemaWithUnambiguousNames, _) = getSupportedSchemaFromUnsupported(schema)
if (rapidsConf.isSqlEnabled && isSchemaSupportedByCudf(schema)) {
if (rapidsConf.isSqlEnabled && rapidsConf.isSqlExecuteOnGPU &&
isSchemaSupportedByCudf(schema)) {
val structSchema = schemaWithUnambiguousNames.toStructType
val converters = new GpuRowToColumnConverter(structSchema)
val columnarBatchRdd = input.mapPartitions(iter => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ class GpuBroadcastHashJoinMeta(
}

if (!canBuildSideBeReplaced(buildSideMeta)) {
willNotWorkOnGpu("the broadcast for this join must be on the GPU too")
if (conf.isSqlExplainOnlyEnabled && wrapped.conf.adaptiveExecutionEnabled) {
willNotWorkOnGpu("explain only mode with AQE, we cannot determine " +
"if the broadcast for this join is on the GPU too")
} else {
willNotWorkOnGpu("the broadcast for this join must be on the GPU too")
}
}

if (!canThisBeReplaced) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -127,12 +127,15 @@ object GpuDeviceManager extends Logging {

def initializeGpuAndMemory(resources: Map[String, ResourceInformation],
conf: RapidsConf): Unit = {
// Set the GPU before RMM is initialized if spark provided the GPU address so that RMM
// uses that GPU. We only need to initialize RMM once per Executor because we are relying on
// only 1 GPU per executor.
// If Spark didn't provide the address we just use the default GPU.
val addr = initializeGpu(resources, conf)
initializeMemory(addr)
// as long in execute mode initialize everything because we could enable it after startup
if (conf.isSqlExecuteOnGPU) {
// Set the GPU before RMM is initialized if spark provided the GPU address so that RMM
// uses that GPU. We only need to initialize RMM once per Executor because we are relying on
// only 1 GPU per executor.
// If Spark didn't provide the address we just use the default GPU.
val addr = initializeGpu(resources, conf)
initializeMemory(addr)
}
}

def shutdown(): Unit = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3852,7 +3852,12 @@ object GpuOverrides extends Logging {
override def getChecks: Option[TypeChecks[_]] = None
}

// Only run the explain and don't actually convert or run on GPU.
/**
* Only run the explain and don't actually convert or run on GPU.
* This gets the plan from the dataframe so it's after catalyst has run through all the
* rules to modify the plan. This means we have to try to undo some of the last rules
* to make it close to when the columnar rules would normally run on the plan.
*/
def explainPotentialGpuPlan(df: DataFrame, explain: String): String = {
val plan = df.queryExecution.executedPlan
val conf = new RapidsConf(plan.conf)
Expand Down Expand Up @@ -3880,6 +3885,23 @@ object GpuOverrides extends Logging {
}
}

/**
* Use explain mode on an active SQL plan as its processed through catalyst.
* This path is the same as being run through the plugin running on hosts with
* GPUs.
*/
private def explainCatalystSQLPlan(updatedPlan: SparkPlan, conf: RapidsConf): Unit = {
val explainSetting = if (conf.shouldExplain) {
conf.explain
} else {
"ALL"
}
val explainOutput = explainSinglePlan(updatedPlan, conf, explainSetting)
if (explainOutput.nonEmpty) {
logWarning(s"\n$explainOutput")
}
}

private def getSubqueryExpressions(e: Expression): Seq[ExecSubqueryExpression] = {
val childExprs = e.children.flatMap(getSubqueryExpressions(_))
val res = e match {
Expand Down Expand Up @@ -3953,27 +3975,36 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
// gets called once for each query stage (where a query stage is an `Exchange`).
override def apply(sparkPlan: SparkPlan): SparkPlan = GpuOverrideUtil.tryOverride { plan =>
val conf = new RapidsConf(plan.conf)
if (conf.isSqlEnabled) {
if (conf.isSqlEnabled && conf.isSqlExecuteOnGPU) {
GpuOverrides.logDuration(conf.shouldExplain,
t => f"Plan conversion to the GPU took $t%.2f ms") {
val updatedPlan = if (plan.conf.adaptiveExecutionEnabled) {
// AQE can cause Spark to inject undesired CPU shuffles into the plan because GPU and CPU
// distribution expressions are not semantically equal.
val newPlan = GpuOverrides.removeExtraneousShuffles(plan, conf)

// AQE can cause ReusedExchangeExec instance to cache the wrong aggregation buffer type
// compared to the desired buffer type from a reused GPU shuffle.
GpuOverrides.fixupReusedExchangeExecs(newPlan)
} else {
plan
}
val updatedPlan = updateForAdaptivePlan(plan, conf)
applyOverrides(updatedPlan, conf)
}
} else if (conf.isSqlEnabled && conf.isSqlExplainOnlyEnabled) {
// this mode logs the explain output and returns the original CPU plan
val updatedPlan = updateForAdaptivePlan(plan, conf)
GpuOverrides.explainCatalystSQLPlan(updatedPlan, conf)
plan
} else {
plan
}
}(sparkPlan)

private def updateForAdaptivePlan(plan: SparkPlan, conf: RapidsConf): SparkPlan = {
if (plan.conf.adaptiveExecutionEnabled) {
// AQE can cause Spark to inject undesired CPU shuffles into the plan because GPU and CPU
// distribution expressions are not semantically equal.
val newPlan = GpuOverrides.removeExtraneousShuffles(plan, conf)

// AQE can cause ReusedExchangeExec instance to cache the wrong aggregation buffer type
// compared to the desired buffer type from a reused GPU shuffle.
GpuOverrides.fixupReusedExchangeExecs(newPlan)
} else {
plan
}
}

private def applyOverrides(plan: SparkPlan, conf: RapidsConf): SparkPlan = {
val wrap = GpuOverrides.wrapAndTagPlan(plan, conf)
val reasonsToNotReplaceEntirePlan = wrap.getReasonsNotToReplaceEntirePlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {

override def apply(sparkPlan: SparkPlan): SparkPlan = GpuOverrideUtil.tryOverride { plan =>
this.rapidsConf = new RapidsConf(plan.conf)
if (rapidsConf.isSqlEnabled) {
if (rapidsConf.isSqlEnabled && rapidsConf.isSqlExecuteOnGPU) {
GpuOverrides.logDuration(rapidsConf.shouldExplain,
t => f"GPU plan transition optimization took $t%.2f ms") {
var updatedPlan = insertHashOptimizeSorts(plan)
Expand Down Expand Up @@ -603,4 +603,4 @@ object GpuTransitionOverrides {
case _: InputFileBlockLength => true
case e => e.children.exists(checkHasInputFileExpressions)
}
}
}
Loading