Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up and document metrics [databricks] #3681

Merged
merged 4 commits into from
Sep 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/buildall
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ SPARK_SHIM_VERSIONS="
BUILD_PARALLELISM=1

time (
echo -n "$SPARK_SHIM_VERSIONS" | xargs -I% -P $BUILD_PARALLELISM -n 1 \
echo -n "$SPARK_SHIM_VERSIONS" | xargs -t -I% -P $BUILD_PARALLELISM -n 1 \
abellina marked this conversation as resolved.
Show resolved Hide resolved
bash -c "
mvn -U clean install \
-Dbuildver=% \
Expand Down
92 changes: 82 additions & 10 deletions docs/tuning-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,14 @@ Configuration key: [`spark.rapids.sql.concurrentGpuTasks`](configs.md#sql.concur
Default value: `1`

The RAPIDS Accelerator can further limit the number of tasks that are actively sharing the GPU.
This is useful for avoiding GPU out of memory errors while still allowing full concurrency for the
portions of the job that are not executing on the GPU. Some queries benefit significantly from
It does this using a semaphore. When metrics or documentation refers to the GPU semaphore it
is referring to this. This restriction is useful for avoiding GPU out of memory errors while
still allowing full concurrency for the portions of the job that are not executing on the GPU.
Care is taken to try and avoid doing I/O or other CPU operations while the GPU semaphore is held.
But in the case of a join two batches are required for processing, and it is not always possible
to avoid this case.

Some queries benefit significantly from
setting this to a value between `2` and `4`, with `2` typically providing the most benefit, and
higher numbers giving diminishing returns, but a lot of it depends on the size of the GPU you have.
An 80 GiB A100 will be able to run a lot more in parallel without seeing degradation
Expand Down Expand Up @@ -302,37 +308,103 @@ Custom Spark SQL Metrics are available which can help identify performance bottl

| Key | Name | Description |
|------------------|--------------------------|---------------------------------------------------|
| bufferTime | buffer time | Time spent buffering input from file data sources. |
| bufferTime | buffer time | Time spent buffering input from file data sources. This buffering time happens on the CPU, typically with no GPU semaphore held.|
| buildDataSize | build side size | Size in bytes of the build-side of a join. |
| buildTime | build time | Time to load the build-side of a join. |
| collectTime | collect time | Time spent collecting data from child operator(s).|
| computeAggTime | aggregation time | Time performing aggregation. |
| collectTime | collect time | For a broadcast the amount of time it took to collect the broadcast data back to the driver before broadcasting it back out.|
| computeAggTime | aggregation time | Time computing an aggregation. |
| concatTime | concat batch time | Time to concatenate batches. |
| filterTime | filter time | Time spent applying filters within other operators, such as joins. |
| gpuDecodeTime | GPU decode time | Time spent on GPU decoding encrypted or compressed data. |
| gpuOpTime | GPU op time | Time that an operator spends performing computation on the GPU. |
| joinOutputRows | join output rows | The number of rows produced by a join before any filter expression is applied. |
| joinTime | join time | Total time for performing a join. |
| joinTime | join time | Time doing a join operation. |
| numInputBatches | input columnar batches | Number of columnar batches that the operator received from its child operator(s). |
| numInputRows | input rows | Number of rows that the operator received from its child operator(s). |
| numOutputBatches | output columnar batches | Number of columnar batches that the operator outputs. |
| numOutputRows | output rows | Number of rows that the operator outputs. |
| numPartitions | partitions | Number of output partitions from a file scan or shuffle exchange. |
| opTime | op time | Time that an operator takes, exclusive of the time for executing or fetching results from child operators. |
| opTime | op time | Time that an operator takes, exclusive of the time for executing or fetching results from child operators, and typically outside of the time it takes to acquire the GPU semaphore. |
| partitionSize | partition data size | Total size in bytes of output partitions. |
| peakDevMemory | peak device memory | Peak GPU memory used during execution of an operator. |
| semaphoreWaitTime| GPU semaphore wait time | Time spent waiting for the GPU semaphore. |
| sortTime | sort time | Time spent in sort operations in GpuSortExec and GpuTopN. |
| spillData | bytes spilled from GPU | Total bytes spilled from GPU. |
| spillDisk | bytes spilled to disk | Total bytes spilled from GPU to disk. |
| spillHost | bytes spilled to host | Total bytes spilled from GPU to host memory. |
| streamTime | stream time | Time spent processing stream-side of a hash join. |
| totalTime | total time | Total execution time for the operator, including the time spent executing and fetching data from child operator(s). |
| streamTime | stream time | Time spent reading data from a child. This generally happens for the stream side of a hash join or for columnar to row and row to columnar operations. |

Not all metrics are enabled by default. The configuration setting `spark.rapids.sql.metrics.level` can be set
to `DEBUG`, `MODERATE`, or `ESSENTIAL`, with `MODERATE` being the default value. More information about this
configuration option is available in the <a href="configs.md#sql.metrics.level">configuration</a> documentation.

Output row and batch counts show up for operators where the number of output rows or batches are
expected to change. For example a filter operation would show the number of rows that passed the
filter condition. These can be used to detect small batches. The GPU is much more efficient when the
batch size is large enough to offset the overhead of launching CUDA kernels.

Input rows and batches are really only for debugging and can mostly be ignored.

Many of the questions people really want to answer with the metrics are around how long various
operators take. Where is the bottleneck in my query? How much of my query is executing on the GPU?
How long does operator X take on the GPU vs the CPU?

### Time taken on the GPU

Nearly all GPU operators will have an `op time` metric. This metric times how long a given
operation took to complete on the GPU separate from anything upstream or down stream of the
operator. By looking at the `op time` for each operator you should be able to get a feeling of
how long each section of a query took in terms of time on the GPU.

For many complex operations, like joins and aggregations, the time taken can be broken down further.
These metrics typically only appear in `DEBUG` mode for the metrics though. But can provide extra
information when trying to understand what is happening in a query without having to do profiling
of the GPU query execution.

#### Spilling

Some operators provide out of core algorithms, or algorithms that can process data that is larger
than can fit in GPU memory. This is often done by breaking the problem up into smaller pieces and
letting some of those pieces be moved out of GPU memory when not being worked on. Apache Spark does
similar things when processing data on the CPU. When these types of algorithms are used
`bytes spilled from GPU` will show up as a metric to indicate how much data was transferred off of
the GPU to either host memory or disk to make room for more data to be processed. Generally this
spilling happens while the GPU semaphore is held, and can really slow down processing. Details
about how much data was spilled to host memory vs spilled to disk show up in `DEBUG` mode for the
metrics.

### Time taken on the CPU

Operations that deal with the CPU as well as the GPU will often have multiple metrics broken out,
like in the case of reading data from a parquet file. There will be metrics for how long it took
to read the data to CPU memory, `buffer time`, along with how much time was taken to transfer the
data to the GPU and decode it, `GPU decode time`.

There is also a metric for how long an operation was blocked waiting on the GPU semaphore before
it got a chance to run on the GPU. This metric is enabled in `DEBUG` mode mostly because it is not
complete. Spark does not provide a way for us to accurately report that metric during a shuffle.

Apache Spark provides a `duration` metric for code generation blocks that is intended to measure how
long it took to do the given processing. However, `duration` is measured from the time that the
first row is processed to the time that the operation is done. In most cases this is very close to
the total runtime for the task, and ends up not being that useful in practice. Apache Spark does
not want to try and measure it more accurately, because it processes data one row
at a time with multiple different operators intermixed in the same generated code. In this case
the overhead of measuring how long a single row took to process would likely be very large compared
to the amount of time to actually process the data.

But the RAPIDS Accelerator for Apache Spark does provide a workaround. When data is transferred
from the CPU to the GPU or from the GPU to the CPU the `stream time` is reported. When going from
the CPU to the GPU it is the amount of time take to collect a batches worth of data on CPU before
sending it to the GPU. When going from the GPU to the CPU it is the amount of time taken to get the
batch and put it into a format that the GPU can start to process one row at a time. This can allow
you to get an approximate measurement of the amount of time taken to process a section of the query
on the CPU by subtracting the `stream time` before going to the CPU from the `stream time` after
coming back to the GPU. Please note that this is really only valid in showing the amount of time
a section of a mixed GPU/CPU query took. It should not be used to indicate how long an operation on
the CPU is likely to take in a pure CPU only workload. This is because the memory access patterns
when going from the GPU to the CPU and vise versa are very different from when the data stays on
the CPU the entire time. This can result in very different timings between the different situations.

## Window Operations

Apache Spark supports a few optimizations for different windows patterns. Generally Spark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ case class GpuFlatMapGroupsInPandasExec(
pythonRunnerConf,
// The whole group data should be written in a single call, so here is unlimited
Int.MaxValue,
spillCallback.semaphoreWaitTime,
onDataWriteFinished = null,
pythonOutputSchema,
// We can not assert the result batch from Python has the same row number with the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids._
Expand Down Expand Up @@ -100,10 +101,10 @@ case class GpuBroadcastHashJoinExec(
override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
TOTAL_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_TOTAL_TIME),
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME)) ++ spillMetrics
JOIN_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_JOIN_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -138,7 +139,7 @@ case class GpuBroadcastHashJoinExec(
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val totalTime = gpuLongMetric(TOTAL_TIME)
val opTime = gpuLongMetric(OP_TIME)
val streamTime = gpuLongMetric(STREAM_TIME)
val joinTime = gpuLongMetric(JOIN_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)
Expand All @@ -152,11 +153,11 @@ case class GpuBroadcastHashJoinExec(

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions { it =>
val stIt = new CollectTimeIterator("broadcast join stream", it, streamTime)
val builtBatch = broadcastRelation.value.batch
GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected())
doJoin(builtBatch, it, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
totalTime)
doJoin(builtBatch, stIt, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, opTime, joinTime)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ case class GpuFlatMapGroupsInPandasExec(
pythonRunnerConf,
// The whole group data should be written in a single call, so here is unlimited
Int.MaxValue,
spillCallback.semaphoreWaitTime,
onDataWriteFinished = null,
pythonOutputSchema,
// We can not assert the result batch from Python has the same row number with the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ case class GpuBroadcastHashJoinExec(
override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
TOTAL_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_TOTAL_TIME),
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME)) ++ spillMetrics
JOIN_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_JOIN_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -138,7 +138,7 @@ case class GpuBroadcastHashJoinExec(
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val totalTime = gpuLongMetric(TOTAL_TIME)
val opTime = gpuLongMetric(OP_TIME)
val streamTime = gpuLongMetric(STREAM_TIME)
val joinTime = gpuLongMetric(JOIN_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)
Expand All @@ -152,11 +152,11 @@ case class GpuBroadcastHashJoinExec(

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions { it =>
val stIt = new CollectTimeIterator("broadcast join stream", it, streamTime)
val builtBatch = broadcastRelation.value.batch
GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected())
doJoin(builtBatch, it, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
totalTime)
doJoin(builtBatch, stIt, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, opTime, joinTime)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ case class GpuBroadcastHashJoinExec(
override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
TOTAL_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_TOTAL_TIME),
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME)) ++ spillMetrics
JOIN_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_JOIN_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -141,7 +141,7 @@ case class GpuBroadcastHashJoinExec(
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val totalTime = gpuLongMetric(TOTAL_TIME)
val opTime = gpuLongMetric(OP_TIME)
val streamTime = gpuLongMetric(STREAM_TIME)
val joinTime = gpuLongMetric(JOIN_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)
Expand All @@ -155,11 +155,11 @@ case class GpuBroadcastHashJoinExec(

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions { it =>
val stIt = new CollectTimeIterator("broadcast join stream", it, streamTime)
val builtBatch = broadcastRelation.value.batch
GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected())
doJoin(builtBatch, it, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
totalTime)
doJoin(builtBatch, stIt, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, opTime, joinTime)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids._
Expand Down Expand Up @@ -100,10 +101,10 @@ case class GpuBroadcastHashJoinExec(
override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
TOTAL_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_TOTAL_TIME),
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME)) ++ spillMetrics
JOIN_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_JOIN_TIME)) ++ spillMetrics

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
Expand Down Expand Up @@ -138,7 +139,7 @@ case class GpuBroadcastHashJoinExec(
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val totalTime = gpuLongMetric(TOTAL_TIME)
val opTime = gpuLongMetric(OP_TIME)
val streamTime = gpuLongMetric(STREAM_TIME)
val joinTime = gpuLongMetric(JOIN_TIME)
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)
Expand All @@ -152,11 +153,11 @@ case class GpuBroadcastHashJoinExec(

val rdd = streamedPlan.executeColumnar()
rdd.mapPartitions { it =>
val stIt = new CollectTimeIterator("broadcast join stream", it, streamTime)
val builtBatch = broadcastRelation.value.batch
GpuColumnVector.extractBases(builtBatch).foreach(_.noWarnLeakExpected())
doJoin(builtBatch, it, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, streamTime, joinTime,
totalTime)
doJoin(builtBatch, stIt, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches, opTime, joinTime)
}
}
}
Loading