Skip to content

Commit

Permalink
works, ugly
Browse files Browse the repository at this point in the history
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>

refine code, broken

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>

fix

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>

complete version 1

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>

Introduce lore id

Introduce lore id

Fix type

Fix type

with loreid

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>

clean ut

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>

refine log

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>

clean

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>

fix ut

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>

fix idgen

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
  • Loading branch information
binmahone committed Jun 12, 2024
1 parent 2cf5934 commit 0c74d38
Show file tree
Hide file tree
Showing 9 changed files with 391 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
*/
package com.nvidia.spark.rapids

import java.io.{File, FileOutputStream}
import java.io.{ByteArrayOutputStream, File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
import java.util.Random

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import ai.rapids.cudf._
import ai.rapids.cudf.ColumnWriterOptions._
Expand Down Expand Up @@ -99,6 +100,22 @@ object DumpUtils extends Logging {
}
}

def deserializeObject[T: ClassTag](readPath: String): T = {
val fileIn: FileInputStream = new FileInputStream(readPath)
val in: ObjectInputStream = new ObjectInputStream(fileIn)
val ret = in.readObject().asInstanceOf[T]
in.close()
ret
}

def serializeObject(obj: Any): Array[Byte] = {
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(obj)
oos.close()
bos.toByteArray
}

private def dumpToParquetFileImp(table: Table, filePrefix: String): String = {
val path = genPath(filePrefix)
withResource(new ParquetDumper(path, table)) { dumper =>
Expand Down
118 changes: 115 additions & 3 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,26 @@ package com.nvidia.spark.rapids

import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.DumpUtils.serializeObject
import com.nvidia.spark.rapids.filecache.FileCacheConf
import com.nvidia.spark.rapids.lore.IdGen.loreIdOf
import com.nvidia.spark.rapids.profiling.ReplayDumpRDD
import com.nvidia.spark.rapids.shims.SparkShimImpl
import org.apache.hadoop.fs.{FSDataOutputStream, Path}

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.rapids.LocationPreservingMapPartitionsRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, ExprId}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.catalyst.trees.{TreeNodeTag, UnaryLike}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.rapids.GpuTaskMetrics
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

sealed class MetricsLevel(val num: Integer) extends Serializable {
def >=(other: MetricsLevel): Boolean =
Expand Down Expand Up @@ -212,6 +218,27 @@ object GpuExec {
}

trait GpuExec extends SparkPlan {

// For LORE replay
@transient var loreReplayInputDir: String = null // null is better than None considering ser/der
@transient var loreIsReplayingOperator: Boolean = false
// For LORE dump
@transient var shouldDumpOutput: Boolean = false
@transient var dumpForLOREId: String = ""
@transient lazy val loreDumpOperator: Option[String] = RapidsConf.LORE_DUMP_OPERATOR.get(conf)
@transient lazy val loreDumpLOREIds: String = RapidsConf.LORE_DUMP_LORE_IDS.get(conf)
@transient lazy val loreDumpPartitions: String = RapidsConf.LORE_DUMP_PARTITIONS.get(conf)

// For LORE DumpedExecReplayer, the spark plan is deserialized from the plan.meta file, so
// some of the transient fields will be null, and we need to workaround this
override protected def sparkContext = SparkSession.getActiveSession.get.sparkContext
override protected def waitForSubqueries(): Unit = synchronized {
// only do it when it's not doing LORE replaying
if (!loreIsReplayingOperator && loreReplayInputDir == null) {
super.waitForSubqueries()
}
}

import GpuMetric._
def sparkSession: SparkSession = {
SparkShimImpl.sessionFromPlan(this)
Expand Down Expand Up @@ -363,16 +390,101 @@ trait GpuExec extends SparkPlan {
this.getTagValue(GpuExec.TASK_METRICS_TAG)

final override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val orig = internalDoExecuteColumnar()
val hadoopConf = new SerializableConfiguration(sparkSession.sparkContext.hadoopConfiguration)
def getOutputStream(filePath: String): FSDataOutputStream = {
val hadoopPath = new Path(filePath)
val fs = hadoopPath.getFileSystem(hadoopConf.value)
fs.create(hadoopPath, true)
}

if (loreReplayInputDir != null) {
return new ReplayDumpRDD(sparkSession, loreReplayInputDir)
}
val className = this.getClass.getSimpleName
val myLoreId = loreIdOf(this).getOrElse("unknown")
if (loreDumpOperator.exists(o => o.equals(className)) ||
loreDumpLOREIds.split(',').contains(myLoreId)
) {
val childAsGpuExec = this.asInstanceOf[UnaryLike[SparkPlan]].child.asInstanceOf[GpuExec]
childAsGpuExec.shouldDumpOutput = true
childAsGpuExec.dumpForLOREId = myLoreId
val childPlanId = childAsGpuExec.id
// dump plan node
val planBytes = serializeObject(this)
val fos = getOutputStream(
s"file:/tmp/lore/lore_id=${myLoreId}_plan_id=${childPlanId}/plan.meta")
fos.write(planBytes)
fos.close()
}
val shouldDumpOutputToBroadcast = shouldDumpOutput
val dumpForLOREIdToBroadcast = dumpForLOREId
val loreDumpPartitionsToBroadcast = loreDumpPartitions

var orig: RDD[ColumnarBatch] = null
orig = internalDoExecuteColumnar()

val planId = id
val metrics = getTaskMetrics

metrics.map { gpuMetrics =>
// This is ugly, but it reduces the need to change all exec nodes, so we are doing it here
LocationPreservingMapPartitionsRDD(orig) { iter =>
gpuMetrics.makeSureRegistered()
iter

var batchId = 0
iter.map(cb => {

val tc = TaskContext.get()
if (shouldDumpOutputToBroadcast &&
loreDumpPartitionsToBroadcast.split(',').map(_.toInt).contains(tc.partitionId())) {

println(s"LORE dump activated, for the operator to dump output: " +
s"className: ${className}, " +
s"stage id: ${tc.stageId()}, " +
s"for LORE id: ${dumpForLOREIdToBroadcast}, " +
s"plan id: ${planId}, " +
s"partition id: ${tc.partitionId()}, " +
s"batch id: ${batchId}, " +
s"batch size: ${cb.numRows()} rows.")

val partitionId = TaskContext.get().partitionId()

{
// dump col types for column batch to remote storage
val cbTypes = GpuColumnVector.extractTypes(cb)
val bytes = serializeObject(cbTypes)
val fos = getOutputStream(
s"file:/tmp/lore/lore_id=${dumpForLOREIdToBroadcast}_plan_id=${planId}/" +
s"partition_id=${partitionId}/" +
s"batch_id=${batchId}/col_types.meta")
fos.write(bytes)
fos.close()
}

// dump data for column batch to /tmp dir
withResource(GpuColumnVector.from(cb)) { table =>
val path = s"/tmp/lore/lore_id=${dumpForLOREIdToBroadcast}_plan_id=${planId}/" +
s"partition_id=${partitionId}/" +
s"batch_id=${batchId}/cb_data.parquet"
withResource(new ParquetDumper(path, table)) { dumper =>
dumper.writeTable(table)
path
}
}
}
batchId = batchId + 1
cb
})
}
}.getOrElse(orig)
}

override def nodeName: String = {
loreIdOf(this) match {
case Some(loreId) => s"${super.nodeName} [LOREID=$loreId]"
case None => s"${super.nodeName} [LOREID=unknown]"
}
}

protected def internalDoExecuteColumnar(): RDD[ColumnarBatch]
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.util.control.NonFatal
import ai.rapids.cudf.DType
import com.nvidia.spark.rapids.RapidsConf.{SUPPRESS_PLANNING_FAILURE, TEST_CONF}
import com.nvidia.spark.rapids.jni.GpuTimeZoneDB
import com.nvidia.spark.rapids.lore.IdGen
import com.nvidia.spark.rapids.shims._
import com.nvidia.spark.rapids.window.{GpuDenseRank, GpuLag, GpuLead, GpuPercentRank, GpuRank, GpuRowNumber, GpuSpecialFrameBoundary, GpuWindowExecMeta, GpuWindowSpecDefinitionMeta}
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -4707,7 +4708,12 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
}
}
}
GpuOverrides.doConvertPlan(wrap, conf, optimizations)
val convertedPlan = GpuOverrides.doConvertPlan(wrap, conf, optimizations)
if (conf.get(RapidsConf.TAG_LORE_ID_ENABLED)) {
IdGen.tagLoreId(convertedPlan)
} else {
convertedPlan
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.collection.mutable

import com.nvidia.spark.rapids.lore.IdGen
import com.nvidia.spark.rapids.shims.{GpuBatchScanExec, SparkShimImpl}

import org.apache.spark.SparkContext
Expand Down Expand Up @@ -823,6 +824,10 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
updatedPlan = fixupAdaptiveExchangeReuse(updatedPlan)
}

if (rapidsConf.get(RapidsConf.TAG_LORE_ID_ENABLED)) {
updatedPlan = IdGen.tagLoreId(updatedPlan)
}

if (rapidsConf.logQueryTransformations) {
logWarning(s"Transformed query:" +
s"\nOriginal Plan:\n$plan\nTransformed Plan:\n$updatedPlan")
Expand Down
31 changes: 31 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,33 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.checkValues(Set("DEBUG", "MODERATE", "ESSENTIAL"))
.createWithDefault("MODERATE")

val TAG_LORE_ID_ENABLED = conf("spark.rapids.LORE.tagging")
.doc("Enable tagging a LORE id to each gpu plan node")
.booleanConf
.createWithDefault(true)

val LORE_DUMP_OPERATOR = conf("spark.rapids.LORE.operatorToDump")
.doc("The name of SparkPlan to dump, e.g. GpuHashAggregateExec")
.internal()
.stringConf
.createOptional

val LORE_DUMP_LORE_IDS = conf("spark.rapids.LORE.idsToDump")
.doc("Specify which operator(s) to dump by LORE ID. " +
"Specify multiple partitions by using comma, e.g. \"12,31\"")
.internal()
.stringConf
.createWithDefault("")

val LORE_DUMP_PARTITIONS = conf("spark.rapids.LORE.partitionsToDump")
.doc("Which partition of the operator(the operator relates to a fixed stage, " +
"each stage is divided into many tasks by partition id) to dump. User can " +
"specify multiple partitions by using comma, e.g. \"0,3,5\"." +
"By default it will dump the first partitions.")
.internal()
.stringConf
.createWithDefault("0")

val PROFILE_PATH = conf("spark.rapids.profile.pathPrefix")
.doc("Enables profiling and specifies a URI path to use when writing profile data")
.internal()
Expand Down Expand Up @@ -2462,6 +2489,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val metricsLevel: String = get(METRICS_LEVEL)

lazy val loreDumpOperator: Option[String] = get(LORE_DUMP_OPERATOR)

lazy val loreDumpPartitions: String = get(LORE_DUMP_PARTITIONS)

lazy val profilePath: Option[String] = get(PROFILE_PATH)

lazy val profileExecutors: String = get(PROFILE_EXECUTORS)
Expand Down
61 changes: 61 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.lore

import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.concurrent.atomic.AtomicInteger

import com.nvidia.spark.rapids.GpuExec

import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}

object IdGen {
val LORE_ID_TAG: TreeNodeTag[String] = new TreeNodeTag[String]("rapids.gpu.lore.id")

/**
* LORE id generator. Key is [[SQLExecution.EXECUTION_ID_KEY]].
*/
private val idGen: ConcurrentMap[String, AtomicInteger] =
new ConcurrentHashMap[String, AtomicInteger]()

private def nextLoreIdOfSparkPlan(plan: SparkPlan): Option[Int] = {
// When the execution id is not set, it means there is no actual execution happening, in this
// case we don't need to generate lore id.
Option(plan.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY))
.map { executionId =>
idGen.computeIfAbsent(executionId, _ => new AtomicInteger(0)).getAndIncrement()
}
}

def tagLoreId(sparkPlan: SparkPlan): SparkPlan = {
sparkPlan.foreachUp {
case g: GpuExec => {
nextLoreIdOfSparkPlan(g).foreach { id =>
g.setTagValue(LORE_ID_TAG, id.toString)
}
}
case _ =>
}

sparkPlan
}

def loreIdOf(node: SparkPlan): Option[String] = {
node.getTagValue(LORE_ID_TAG)
}
}
Loading

0 comments on commit 0c74d38

Please sign in to comment.