Skip to content

Commit

Permalink
Add support for self-contained profiling (NVIDIA#10870)
Browse files Browse the repository at this point in the history
* Add support for self-contained profiling

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Use Scala regex, add executor-side logging on profile startup/shutdown

* Use reflection to handle potentially missing Hadoop CallerContext

* scala 2.13 fix

---------

Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored and nvliyuan committed May 30, 2024
1 parent fab029b commit ed651cb
Show file tree
Hide file tree
Showing 6 changed files with 671 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ class RapidsDriverPlugin extends DriverPlugin with Logging {
}
rapidsShuffleHeartbeatManager.executorHeartbeat(id)
case m: GpuCoreDumpMsg => GpuCoreDumpHandler.handleMsg(m)
case m: ProfileMsg => ProfilerOnDriver.handleMsg(m)
case m => throw new IllegalStateException(s"Unknown message $m")
}
}
Expand All @@ -458,6 +459,7 @@ class RapidsDriverPlugin extends DriverPlugin with Logging {
RapidsPluginUtils.detectMultipleJars(conf)
RapidsPluginUtils.logPluginMode(conf)
GpuCoreDumpHandler.driverInit(sc, conf)
ProfilerOnDriver.init(sc, conf)

if (GpuShuffleEnv.isRapidsShuffleAvailable(conf)) {
GpuShuffleEnv.initShuffleManager()
Expand Down Expand Up @@ -507,6 +509,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
val sparkConf = pluginContext.conf()
val numCores = RapidsPluginUtils.estimateCoresOnExec(sparkConf)
val conf = new RapidsConf(extraConf.asScala.toMap)
ProfilerOnExecutor.init(pluginContext, conf)

// Checks if the current GPU architecture is supported by the
// spark-rapids-jni and cuDF libraries.
Expand Down Expand Up @@ -656,6 +659,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
GpuSemaphore.shutdown()
PythonWorkerSemaphore.shutdown()
GpuDeviceManager.shutdown()
ProfilerOnExecutor.shutdown()
Option(rapidsShuffleHeartbeatEndpoint).foreach(_.close())
extraExecutorPlugins.foreach(_.shutdown())
FileCache.shutdown()
Expand Down Expand Up @@ -692,6 +696,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
override def onTaskStart(): Unit = {
startTaskNvtx(TaskContext.get)
extraExecutorPlugins.foreach(_.onTaskStart())
ProfilerOnExecutor.onTaskStart()
}

override def onTaskSucceeded(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import scala.util.Try

/**
* Determines if a value is in a comma-separated list of values and/or
* hyphenated ranges provided by the user for a configuration setting.
*/
class RangeConfMatcher(configKey: String, configValue: Option[String]) {
def this(conf: RapidsConf, entry: ConfEntry[String]) = {
this(entry.key, Some(conf.get(entry)))
}

def this(conf: RapidsConf, entry: OptionalConfEntry[String]) = {
this(entry.key, conf.get(entry))
}

private val (stringSet, intRanges) = {
configValue.map { cv =>
val parts = cv.split(',')
val (rangeParts, singleParts) = parts.partition(_.contains('-'))
val ranges = try {
rangeParts.map(RangeConfMatcher.parseRange)
} catch {
case e: IllegalArgumentException =>
throw new IllegalArgumentException(s"Invalid range settings for $configKey: $cv", e)
}
(singleParts.map(_.trim).toSet, ranges)
}.getOrElse((Set.empty[String], Array.empty[(Int, Int)]))
}

val isEmpty: Boolean = stringSet.isEmpty && intRanges.isEmpty
val nonEmpty: Boolean = !isEmpty

def size: Int = {
stringSet.size + intRanges.map {
case (start, end) => end - start + 1
}.sum
}

/** Returns true if the string value is in the configured values or ranges. */
def contains(v: String): Boolean = {
stringSet.contains(v) || (intRanges.nonEmpty && Try(v.toInt).map(checkRanges).getOrElse(false))
}

/** Returns true if the integer value is in the configured values or ranges. */
def contains(v: Int): Boolean = {
checkRanges(v) || stringSet.contains(v.toString)
}

private def checkRanges(v: Int): Boolean = {
intRanges.exists {
case (start, end) => start <= v && v <= end
}
}
}

object RangeConfMatcher {
def parseRange(rangeStr: String): (Int,Int) = {
val rangePair = rangeStr.split('-')
if (rangePair.length != 2) {
throw new IllegalArgumentException(s"Invalid range: $rangeStr")
}
val start = rangePair.head.trim.toInt
val end = rangePair.last.trim.toInt
if (end < start) {
throw new IllegalArgumentException(s"Invalid range: $rangeStr")
}
(start, end)
}
}
83 changes: 83 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,71 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.checkValues(Set("DEBUG", "MODERATE", "ESSENTIAL"))
.createWithDefault("MODERATE")

val PROFILE_PATH = conf("spark.rapids.profile.pathPrefix")
.doc("Enables profiling and specifies a URI path to use when writing profile data")
.internal()
.stringConf
.createOptional

val PROFILE_EXECUTORS = conf("spark.rapids.profile.executors")
.doc("Comma-separated list of executors IDs and hyphenated ranges of executor IDs to " +
"profile when profiling is enabled")
.internal()
.stringConf
.createWithDefault("0")

val PROFILE_TIME_RANGES_SECONDS = conf("spark.rapids.profile.timeRangesInSeconds")
.doc("Comma-separated list of start-end ranges of time, in seconds, since executor startup " +
"to start and stop profiling. For example, a value of 10-30,100-110 will have the profiler " +
"wait for 10 seconds after executor startup then profile for 20 seconds, then wait for " +
"70 seconds then profile again for the next 10 seconds")
.internal()
.stringConf
.createOptional

val PROFILE_JOBS = conf("spark.rapids.profile.jobs")
.doc("Comma-separated list of job IDs and hyphenated ranges of job IDs to " +
"profile when profiling is enabled")
.internal()
.stringConf
.createOptional

val PROFILE_STAGES = conf("spark.rapids.profile.stages")
.doc("Comma-separated list of stage IDs and hyphenated ranges of stage IDs to " +
"profile when profiling is enabled")
.internal()
.stringConf
.createOptional

val PROFILE_DRIVER_POLL_MILLIS = conf("spark.rapids.profile.driverPollMillis")
.doc("Interval in milliseconds the executors will poll for job and stage completion when " +
"stage-level profiling is used.")
.internal()
.integerConf
.createWithDefault(1000)

val PROFILE_COMPRESSION = conf("spark.rapids.profile.compression")
.doc("Specifies the compression codec to use when writing profile data, one of " +
"zstd or none")
.internal()
.stringConf
.transform(_.toLowerCase(java.util.Locale.ROOT))
.checkValues(Set("zstd", "none"))
.createWithDefault("zstd")

val PROFILE_FLUSH_PERIOD_MILLIS = conf("spark.rapids.profile.flushPeriodMillis")
.doc("Specifies the time period in milliseconds to flush profile records. " +
"A value <= 0 will disable time period flushing.")
.internal()
.integerConf
.createWithDefault(0)

val PROFILE_WRITE_BUFFER_SIZE = conf("spark.rapids.profile.writeBufferSize")
.doc("Buffer size to use when writing profile records.")
.internal()
.bytesConf(ByteUnit.BYTE)
.createWithDefault(1024 * 1024)

// ENABLE/DISABLE PROCESSING

val SQL_ENABLED = conf("spark.rapids.sql.enabled")
Expand Down Expand Up @@ -2495,6 +2560,24 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val metricsLevel: String = get(METRICS_LEVEL)

lazy val profilePath: Option[String] = get(PROFILE_PATH)

lazy val profileExecutors: String = get(PROFILE_EXECUTORS)

lazy val profileTimeRangesSeconds: Option[String] = get(PROFILE_TIME_RANGES_SECONDS)

lazy val profileJobs: Option[String] = get(PROFILE_JOBS)

lazy val profileStages: Option[String] = get(PROFILE_STAGES)

lazy val profileDriverPollMillis: Int = get(PROFILE_DRIVER_POLL_MILLIS)

lazy val profileCompression: String = get(PROFILE_COMPRESSION)

lazy val profileFlushPeriodMillis: Int = get(PROFILE_FLUSH_PERIOD_MILLIS)

lazy val profileWriteBufferSize: Long = get(PROFILE_WRITE_BUFFER_SIZE)

lazy val isSqlEnabled: Boolean = get(SQL_ENABLED)

lazy val isSqlExecuteOnGPU: Boolean = get(SQL_MODE).equals("executeongpu")
Expand Down
Loading

0 comments on commit ed651cb

Please sign in to comment.