From 25549273d0abd7e1bd6a980e520224b597430a53 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Wed, 1 Dec 2021 19:34:49 +0800 Subject: [PATCH] Reuse some codes Signed-off-by: Chong Gao --- .../spark/rapids/shims/v2/Spark30XShims.scala | 326 +---------------- .../shims/v2/Spark30Xuntil33XShims.scala | 332 +++++++++++++++++- .../spark/rapids/shims/v2/Spark31XShims.scala | 323 +---------------- 3 files changed, 341 insertions(+), 640 deletions(-) diff --git a/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala b/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala index ecb4c989ca3..e27e3eb6c8c 100644 --- a/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala +++ b/sql-plugin/src/main/301until310-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark30XShims.scala @@ -16,59 +16,39 @@ package com.nvidia.spark.rapids.shims.v2 -import java.net.URI import java.nio.ByteBuffer -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import com.nvidia.spark.rapids._ import org.apache.arrow.memory.ReferenceManager import org.apache.arrow.vector.ValueVector -import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.rapids.shims.v2.GpuShuffleExchangeExec -import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.Resolver -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.errors.attachTree import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.Average -import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNode -import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, ShuffleQueryStageExec} -import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand} -import org.apache.spark.sql.execution.datasources.{FileIndex, FilePartition, FileScanRDD, HadoopFsRelation, InMemoryFileIndex, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex} -import org.apache.spark.sql.execution.datasources.rapids.GpuPartitioningUtils -import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan -import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeExec} -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.python.{AggregateInPandasExec, ArrowEvalPythonExec, FlatMapGroupsInPandasExec, MapInPandasExec, WindowInPandasExec} -import org.apache.spark.sql.execution.window.WindowExecBase import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.{GpuAbs, GpuAverage, GpuFileSourceScanExec, GpuTimeSub} -import org.apache.spark.sql.rapids.execution.{GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks, SerializeBatchDeserializeHostBuffer, SerializeConcatHostBuffersDeserializeBatch} +import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBase, JoinTypeChecks} import org.apache.spark.sql.rapids.execution.python._ import org.apache.spark.sql.rapids.execution.python.shims.v2._ -import org.apache.spark.sql.rapids.shims.v2.GpuSchemaUtils -import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types._ import org.apache.spark.storage.{BlockId, BlockManagerId} import org.apache.spark.unsafe.types.CalendarInterval abstract class Spark30XShims extends Spark301util320Shims with Logging { - override def v1RepairTableCommand(tableName: TableIdentifier): RunnableCommand = - AlterTableRecoverPartitionsCommand(tableName) - override def getScalaUDFAsExpression( function: AnyRef, dataType: DataType, @@ -92,16 +72,6 @@ abstract class Spark30XShims extends Spark301util320Shims with Logging { startMapIndex, endMapIndex, startPartition, endPartition) } - override def getGpuBroadcastNestedLoopJoinShim( - left: SparkPlan, - right: SparkPlan, - join: BroadcastNestedLoopJoinExec, - joinType: JoinType, - condition: Option[Expression], - targetSizeBytes: Long): GpuBroadcastNestedLoopJoinExecBase = { - GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes) - } - override def getGpuShuffleExchangeExec( gpuOutputPartitioning: GpuPartitioning, child: SparkPlan, @@ -117,22 +87,6 @@ abstract class Spark30XShims extends Spark301util320Shims with Logging { queryStage.shuffle.asInstanceOf[GpuShuffleExchangeExecBase] } - override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = { - plan match { - case _: GpuBroadcastHashJoinExec => true - case _ => false - } - } - - override def isWindowFunctionExec(plan: SparkPlan): Boolean = plan.isInstanceOf[WindowExecBase] - - override def isGpuShuffledHashJoin(plan: SparkPlan): Boolean = { - plan match { - case _: GpuShuffledHashJoinExec => true - case _ => false - } - } - override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = { Seq( GpuOverrides.exec[WindowInPandasExec]( @@ -352,141 +306,14 @@ abstract class Spark30XShims extends Spark301util320Shims with Logging { })) } - override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq( - GpuOverrides.scan[ParquetScan]( - "Parquet parsing", - (a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) { - override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this) - - override def convertToGpu(): Scan = { - GpuParquetScan(a.sparkSession, - a.hadoopConf, - a.fileIndex, - a.dataSchema, - a.readDataSchema, - a.readPartitionSchema, - a.pushedFilters, - a.options, - a.partitionFilters, - a.dataFilters, - conf) - } - }), - GpuOverrides.scan[OrcScan]( - "ORC parsing", - (a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) { - override def tagSelfForGpu(): Unit = - GpuOrcScanBase.tagSupport(this) - - override def convertToGpu(): Scan = - GpuOrcScan(a.sparkSession, - a.hadoopConf, - a.fileIndex, - a.dataSchema, - a.readDataSchema, - a.readPartitionSchema, - a.options, - a.pushedFilters, - a.partitionFilters, - a.dataFilters, - conf) - }) - ).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap - - override def getBuildSide(join: HashJoin): GpuBuildSide = { - GpuJoinUtils.getGpuBuildSide(join.buildSide) - } - - override def getBuildSide(join: BroadcastNestedLoopJoinExec): GpuBuildSide = { - GpuJoinUtils.getGpuBuildSide(join.buildSide) - } - - override def getPartitionFileNames( - partitions: Seq[PartitionDirectory]): Seq[String] = { - val files = partitions.flatMap(partition => partition.files) - files.map(_.getPath.getName) - } - - override def getPartitionFileStatusSize(partitions: Seq[PartitionDirectory]): Long = { - partitions.map(_.files.map(_.getLen).sum).sum - } - - override def getPartitionedFiles( - partitions: Array[PartitionDirectory]): Array[PartitionedFile] = { - partitions.flatMap { p => - p.files.map { f => - PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) - } - } - } - - override def getPartitionSplitFiles( - partitions: Array[PartitionDirectory], - maxSplitBytes: Long, - relation: HadoopFsRelation): Array[PartitionedFile] = { - partitions.flatMap { partition => - partition.files.flatMap { file => - // getPath() is very expensive so we only want to call it once in this block: - val filePath = file.getPath - val isSplitable = relation.fileFormat.isSplitable( - relation.sparkSession, relation.options, filePath) - PartitionedFileUtil.splitFiles( - sparkSession = relation.sparkSession, - file = file, - filePath = filePath, - isSplitable = isSplitable, - maxSplitBytes = maxSplitBytes, - partitionValues = partition.values - ) - } - } - } - - override def getFileScanRDD( - sparkSession: SparkSession, - readFunction: PartitionedFile => Iterator[InternalRow], - filePartitions: Seq[FilePartition]): RDD[InternalRow] = { - new FileScanRDD(sparkSession, readFunction, filePartitions) - } - // Hardcoded for Spark-3.0.* override def getFileSourceMaxMetadataValueLength(sqlConf: SQLConf): Int = 100 - override def createFilePartition(index: Int, files: Array[PartitionedFile]): FilePartition = { - FilePartition(index, files) - } - - override def copyBatchScanExec( - batchScanExec: GpuBatchScanExec, - queryUsesInputFile: Boolean): GpuBatchScanExec = { - val scanCopy = batchScanExec.scan match { - case parquetScan: GpuParquetScan => - parquetScan.copy(queryUsesInputFile=queryUsesInputFile) - case orcScan: GpuOrcScan => - orcScan.copy(queryUsesInputFile=queryUsesInputFile) - case _ => throw new RuntimeException("Wrong format") // never reach here - } - batchScanExec.copy(scan=scanCopy) - } - - override def copyFileSourceScanExec( - scanExec: GpuFileSourceScanExec, - queryUsesInputFile: Boolean): GpuFileSourceScanExec = { - scanExec.copy(queryUsesInputFile=queryUsesInputFile)(scanExec.rapidsConf) - } - override def getGpuColumnarToRowTransition(plan: SparkPlan, exportColumnRdd: Boolean): GpuColumnarToRowExecParent = { GpuColumnarToRowExec(plan, exportColumnRdd) } - override def checkColumnNameDuplication( - schema: StructType, - colType: String, - resolver: Resolver): Unit = { - GpuSchemaUtils.checkColumnNameDuplication(schema, colType, resolver) - } - override def sortOrder( child: Expression, direction: SortDirection, @@ -496,13 +323,6 @@ abstract class Spark30XShims extends Spark301util320Shims with Logging { s.copy(child = child) } - override def alias(child: Expression, name: String)( - exprId: ExprId, - qualifier: Seq[String], - explicitMetadata: Option[Metadata]): Alias = { - Alias(child, name)(exprId, qualifier, explicitMetadata) - } - override def shouldIgnorePath(path: String): Boolean = { InMemoryFileIndex.shouldFilterOut(path) } @@ -515,112 +335,8 @@ abstract class Spark30XShims extends Spark301util320Shims with Logging { (arrowBuf.nioBuffer(), arrowBuf.getReferenceManager) } - override def getArrowValidityBuf(vec: ValueVector): (ByteBuffer, ReferenceManager) = { - val arrowBuf = vec.getValidityBuffer - (arrowBuf.nioBuffer(), arrowBuf.getReferenceManager) - } - - override def getArrowOffsetsBuf(vec: ValueVector): (ByteBuffer, ReferenceManager) = { - val arrowBuf = vec.getOffsetBuffer - (arrowBuf.nioBuffer(), arrowBuf.getReferenceManager) - } - - override def replaceWithAlluxioPathIfNeeded( - conf: RapidsConf, - relation: HadoopFsRelation, - partitionFilters: Seq[Expression], - dataFilters: Seq[Expression]): FileIndex = { - - val alluxioPathsReplace: Option[Seq[String]] = conf.getAlluxioPathsToReplace - - if (alluxioPathsReplace.isDefined) { - // alluxioPathsReplace: Seq("key->value", "key1->value1") - // turn the rules to the Map with eg - // { s3:/foo -> alluxio://0.1.2.3:19998/foo, - // gs:/bar -> alluxio://0.1.2.3:19998/bar, - // /baz -> alluxio://0.1.2.3:19998/baz } - val replaceMapOption = alluxioPathsReplace.map(rules => { - rules.map(rule => { - val split = rule.split("->") - if (split.size == 2) { - split(0).trim -> split(1).trim - } else { - throw new IllegalArgumentException(s"Invalid setting for " + - s"${RapidsConf.ALLUXIO_PATHS_REPLACE.key}") - } - }).toMap - }) - - replaceMapOption.map(replaceMap => { - - def isDynamicPruningFilter(e: Expression): Boolean = - e.find(_.isInstanceOf[PlanExpression[_]]).isDefined - - val partitionDirs = relation.location.listFiles( - partitionFilters.filterNot(isDynamicPruningFilter), dataFilters) - - // replacement func to check if the file path is prefixed with the string user configured - // if yes, replace it - val replaceFunc = (f: Path) => { - val pathStr = f.toString - val matchedSet = replaceMap.keySet.filter(reg => pathStr.startsWith(reg)) - if (matchedSet.size > 1) { - // never reach here since replaceMap is a Map - throw new IllegalArgumentException(s"Found ${matchedSet.size} same replacing rules " + - s"from ${RapidsConf.ALLUXIO_PATHS_REPLACE.key} which requires only 1 rule for each " + - s"file path") - } else if (matchedSet.size == 1) { - new Path(pathStr.replaceFirst(matchedSet.head, replaceMap(matchedSet.head))) - } else { - f - } - } - - // replace all of input files - val inputFiles: Seq[Path] = partitionDirs.flatMap(partitionDir => { - replacePartitionDirectoryFiles(partitionDir, replaceFunc) - }) - - // replace all of rootPaths which are already unique - val rootPaths = relation.location.rootPaths.map(replaceFunc) - - val parameters: Map[String, String] = relation.options - - // infer PartitionSpec - val partitionSpec = GpuPartitioningUtils.inferPartitioning( - relation.sparkSession, - rootPaths, - inputFiles, - parameters, - Option(relation.dataSchema), - replaceFunc) - - // generate a new InMemoryFileIndex holding paths with alluxio schema - new InMemoryFileIndex( - relation.sparkSession, - inputFiles, - parameters, - Option(relation.dataSchema), - userSpecifiedPartitionSpec = Some(partitionSpec)) - }).getOrElse(relation.location) - - } else { - relation.location - } - } - - override def replacePartitionDirectoryFiles(partitionDir: PartitionDirectory, - replaceFunc: Path => Path): Seq[Path] = { - partitionDir.files.map(f => replaceFunc(f.getPath)) - } - override def shouldFailDivByZero(): Boolean = false - override def reusedExchangeExecPfn: PartialFunction[SparkPlan, ReusedExchangeExec] = { - case ShuffleQueryStageExec(_, e: ReusedExchangeExec) => e - case BroadcastQueryStageExec(_, e: ReusedExchangeExec) => e - } - /** dropped by SPARK-34234 */ override def attachTreeIfSupported[TreeType <: TreeNode[_], A]( tree: TreeType, @@ -630,38 +346,8 @@ abstract class Spark30XShims extends Spark301util320Shims with Logging { attachTree(tree, msg)(f) } - override def createTable(table: CatalogTable, - sessionCatalog: SessionCatalog, - tableLocation: Option[URI], - result: BaseRelation) = { - val newTable = table.copy( - storage = table.storage.copy(locationUri = tableLocation), - // We will use the schema of resolved.relation as the schema of the table (instead of - // the schema of df). It is important since the nullability may be changed by the relation - // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). - schema = result.schema) - // Table location is already validated. No need to check it again during table creation. - sessionCatalog.createTable(newTable, ignoreIfExists = false, validateLocation = false) - } - - override def hasAliasQuoteFix: Boolean = false - override def hasCastFloatTimestampUpcast: Boolean = false - override def filesFromFileIndex(fileIndex: PartitioningAwareFileIndex): Seq[FileStatus] = { - fileIndex.allFiles() - } - - override def broadcastModeTransform(mode: BroadcastMode, rows: Array[InternalRow]): Any = - mode.transform(rows) - - override def registerKryoClasses(kryo: Kryo): Unit = { - kryo.register(classOf[SerializeConcatHostBuffersDeserializeBatch], - new KryoJavaSerializer()) - kryo.register(classOf[SerializeBatchDeserializeHostBuffer], - new KryoJavaSerializer()) - } - override def getAdaptiveInputPlan(adaptivePlan: AdaptiveSparkPlanExec): SparkPlan = { adaptivePlan.initialPlan } diff --git a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/Spark30Xuntil33XShims.scala b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/Spark30Xuntil33XShims.scala index e35465fa433..04f3c7ee3e4 100644 --- a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/Spark30Xuntil33XShims.scala +++ b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/Spark30Xuntil33XShims.scala @@ -16,13 +16,341 @@ package com.nvidia.spark.rapids.shims.v2 +import java.net.URI +import java.nio.ByteBuffer + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import com.nvidia.spark.rapids._ +import org.apache.arrow.memory.ReferenceManager +import org.apache.arrow.vector.ValueVector +import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog} +import org.apache.spark.sql.catalyst.csv.CSVOptions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode +import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ShuffleQueryStageExec} +import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.rapids.GpuPartitioningUtils +import org.apache.spark.sql.execution.datasources.v2.ShowCurrentNamespaceExec +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec +import org.apache.spark.sql.execution.joins._ +import org.apache.spark.sql.execution.window.WindowExecBase +import org.apache.spark.sql.rapids._ +import org.apache.spark.sql.rapids.execution.{GpuBroadcastNestedLoopJoinExecBase, SerializeBatchDeserializeHostBuffer, SerializeConcatHostBuffersDeserializeBatch} +import org.apache.spark.sql.rapids.shims.v2.GpuSchemaUtils +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types._ trait Spark30Xuntil33XShims extends SparkShims { + + override def v1RepairTableCommand(tableName: TableIdentifier): RunnableCommand = + AlterTableRecoverPartitionsCommand(tableName) + + override def getGpuBroadcastNestedLoopJoinShim( + left: SparkPlan, + right: SparkPlan, + join: BroadcastNestedLoopJoinExec, + joinType: JoinType, + condition: Option[Expression], + targetSizeBytes: Long): GpuBroadcastNestedLoopJoinExecBase = { + GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes) + } + + override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = { + plan match { + case _: GpuBroadcastHashJoinExec => true + case _ => false + } + } + + override def isWindowFunctionExec(plan: SparkPlan): Boolean = plan.isInstanceOf[WindowExecBase] + + override def isGpuShuffledHashJoin(plan: SparkPlan): Boolean = { + plan match { + case _: GpuShuffledHashJoinExec => true + case _ => false + } + } + + override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq( + GpuOverrides.scan[ParquetScan]( + "Parquet parsing", + (a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) { + override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this) + + override def convertToGpu(): Scan = { + GpuParquetScan(a.sparkSession, + a.hadoopConf, + a.fileIndex, + a.dataSchema, + a.readDataSchema, + a.readPartitionSchema, + a.pushedFilters, + a.options, + a.partitionFilters, + a.dataFilters, + conf) + } + }), + GpuOverrides.scan[OrcScan]( + "ORC parsing", + (a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) { + override def tagSelfForGpu(): Unit = + GpuOrcScanBase.tagSupport(this) + + override def convertToGpu(): Scan = + GpuOrcScan(a.sparkSession, + a.hadoopConf, + a.fileIndex, + a.dataSchema, + a.readDataSchema, + a.readPartitionSchema, + a.options, + a.pushedFilters, + a.partitionFilters, + a.dataFilters, + conf) + }) + ).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap + + override def getBuildSide(join: HashJoin): GpuBuildSide = { + GpuJoinUtils.getGpuBuildSide(join.buildSide) + } + + override def getBuildSide(join: BroadcastNestedLoopJoinExec): GpuBuildSide = { + GpuJoinUtils.getGpuBuildSide(join.buildSide) + } + + override def getPartitionFileNames( + partitions: Seq[PartitionDirectory]): Seq[String] = { + val files = partitions.flatMap(partition => partition.files) + files.map(_.getPath.getName) + } + + override def getPartitionFileStatusSize(partitions: Seq[PartitionDirectory]): Long = { + partitions.map(_.files.map(_.getLen).sum).sum + } + + override def getPartitionedFiles( + partitions: Array[PartitionDirectory]): Array[PartitionedFile] = { + partitions.flatMap { p => + p.files.map { f => + PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) + } + } + } + + override def getPartitionSplitFiles( + partitions: Array[PartitionDirectory], + maxSplitBytes: Long, + relation: HadoopFsRelation): Array[PartitionedFile] = { + partitions.flatMap { partition => + partition.files.flatMap { file => + // getPath() is very expensive so we only want to call it once in this block: + val filePath = file.getPath + val isSplitable = relation.fileFormat.isSplitable( + relation.sparkSession, relation.options, filePath) + PartitionedFileUtil.splitFiles( + sparkSession = relation.sparkSession, + file = file, + filePath = filePath, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partition.values + ) + } + } + } + + override def getFileScanRDD( + sparkSession: SparkSession, + readFunction: PartitionedFile => Iterator[InternalRow], + filePartitions: Seq[FilePartition]): RDD[InternalRow] = { + new FileScanRDD(sparkSession, readFunction, filePartitions) + } + + override def createFilePartition(index: Int, files: Array[PartitionedFile]): FilePartition = { + FilePartition(index, files) + } + + override def copyBatchScanExec( + batchScanExec: GpuBatchScanExec, + queryUsesInputFile: Boolean): GpuBatchScanExec = { + val scanCopy = batchScanExec.scan match { + case parquetScan: GpuParquetScan => + parquetScan.copy(queryUsesInputFile = queryUsesInputFile) + case orcScan: GpuOrcScan => + orcScan.copy(queryUsesInputFile = queryUsesInputFile) + case _ => throw new RuntimeException("Wrong format") // never reach here + } + batchScanExec.copy(scan = scanCopy) + } + + override def copyFileSourceScanExec( + scanExec: GpuFileSourceScanExec, + queryUsesInputFile: Boolean): GpuFileSourceScanExec = { + scanExec.copy(queryUsesInputFile = queryUsesInputFile)(scanExec.rapidsConf) + } + + override def checkColumnNameDuplication( + schema: StructType, + colType: String, + resolver: Resolver): Unit = { + GpuSchemaUtils.checkColumnNameDuplication(schema, colType, resolver) + } + + override def alias(child: Expression, name: String)( + exprId: ExprId, + qualifier: Seq[String], + explicitMetadata: Option[Metadata]): Alias = { + Alias(child, name)(exprId, qualifier, explicitMetadata) + } + + override def getArrowValidityBuf(vec: ValueVector): (ByteBuffer, ReferenceManager) = { + val arrowBuf = vec.getValidityBuffer + (arrowBuf.nioBuffer(), arrowBuf.getReferenceManager) + } + + override def getArrowOffsetsBuf(vec: ValueVector): (ByteBuffer, ReferenceManager) = { + val arrowBuf = vec.getOffsetBuffer + (arrowBuf.nioBuffer(), arrowBuf.getReferenceManager) + } + + override def replaceWithAlluxioPathIfNeeded( + conf: RapidsConf, + relation: HadoopFsRelation, + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): FileIndex = { + + val alluxioPathsReplace: Option[Seq[String]] = conf.getAlluxioPathsToReplace + + if (alluxioPathsReplace.isDefined) { + // alluxioPathsReplace: Seq("key->value", "key1->value1") + // turn the rules to the Map with eg + // { s3:/foo -> alluxio://0.1.2.3:19998/foo, + // gs:/bar -> alluxio://0.1.2.3:19998/bar, + // /baz -> alluxio://0.1.2.3:19998/baz } + val replaceMapOption = alluxioPathsReplace.map(rules => { + rules.map(rule => { + val split = rule.split("->") + if (split.size == 2) { + split(0).trim -> split(1).trim + } else { + throw new IllegalArgumentException(s"Invalid setting for " + + s"${RapidsConf.ALLUXIO_PATHS_REPLACE.key}") + } + }).toMap + }) + + replaceMapOption.map(replaceMap => { + + def isDynamicPruningFilter(e: Expression): Boolean = + e.find(_.isInstanceOf[PlanExpression[_]]).isDefined + + val partitionDirs = relation.location.listFiles( + partitionFilters.filterNot(isDynamicPruningFilter), dataFilters) + + // replacement func to check if the file path is prefixed with the string user configured + // if yes, replace it + val replaceFunc = (f: Path) => { + val pathStr = f.toString + val matchedSet = replaceMap.keySet.filter(reg => pathStr.startsWith(reg)) + if (matchedSet.size > 1) { + // never reach here since replaceMap is a Map + throw new IllegalArgumentException(s"Found ${matchedSet.size} same replacing rules " + + s"from ${RapidsConf.ALLUXIO_PATHS_REPLACE.key} which requires only 1 rule for each " + + s"file path") + } else if (matchedSet.size == 1) { + new Path(pathStr.replaceFirst(matchedSet.head, replaceMap(matchedSet.head))) + } else { + f + } + } + + // replace all of input files + val inputFiles: Seq[Path] = partitionDirs.flatMap(partitionDir => { + replacePartitionDirectoryFiles(partitionDir, replaceFunc) + }) + + // replace all of rootPaths which are already unique + val rootPaths = relation.location.rootPaths.map(replaceFunc) + + val parameters: Map[String, String] = relation.options + + // infer PartitionSpec + val partitionSpec = GpuPartitioningUtils.inferPartitioning( + relation.sparkSession, + rootPaths, + inputFiles, + parameters, + Option(relation.dataSchema), + replaceFunc) + + // generate a new InMemoryFileIndex holding paths with alluxio schema + new InMemoryFileIndex( + relation.sparkSession, + inputFiles, + parameters, + Option(relation.dataSchema), + userSpecifiedPartitionSpec = Some(partitionSpec)) + }).getOrElse(relation.location) + + } else { + relation.location + } + } + + override def replacePartitionDirectoryFiles(partitionDir: PartitionDirectory, + replaceFunc: Path => Path): Seq[Path] = { + partitionDir.files.map(f => replaceFunc(f.getPath)) + } + + override def hasAliasQuoteFix: Boolean = false + + override def filesFromFileIndex(fileIndex: PartitioningAwareFileIndex): Seq[FileStatus] = { + fileIndex.allFiles() + } + + override def broadcastModeTransform(mode: BroadcastMode, rows: Array[InternalRow]): Any = + mode.transform(rows) + + override def registerKryoClasses(kryo: Kryo): Unit = { + kryo.register(classOf[SerializeConcatHostBuffersDeserializeBatch], + new KryoJavaSerializer()) + kryo.register(classOf[SerializeBatchDeserializeHostBuffer], + new KryoJavaSerializer()) + } + override def reusedExchangeExecPfn: PartialFunction[SparkPlan, ReusedExchangeExec] = { + case ShuffleQueryStageExec(_, e: ReusedExchangeExec) => e + case BroadcastQueryStageExec(_, e: ReusedExchangeExec) => e + } + + override def createTable(table: CatalogTable, + sessionCatalog: SessionCatalog, + tableLocation: Option[URI], + result: BaseRelation) = { + val newTable = table.copy( + storage = table.storage.copy(locationUri = tableLocation), + // We will use the schema of resolved.relation as the schema of the table (instead of + // the schema of df). It is important since the nullability may be changed by the relation + // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). + schema = result.schema) + // Table location is already validated. No need to check it again during table creation. + sessionCatalog.createTable(newTable, ignoreIfExists = false, validateLocation = false) + } + def dateFormatInRead(csvOpts: CSVOptions): Option[String] = { Option(csvOpts.dateFormat) } diff --git a/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark31XShims.scala b/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark31XShims.scala index 00dfdd3aae7..18bff4c7d27 100644 --- a/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark31XShims.scala +++ b/sql-plugin/src/main/311until320-nondb/scala/com/nvidia/spark/rapids/shims/v2/Spark31XShims.scala @@ -16,61 +16,42 @@ package com.nvidia.spark.rapids.shims.v2 -import java.net.URI import java.nio.ByteBuffer -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import com.nvidia.spark.InMemoryTableScanMeta import com.nvidia.spark.rapids._ import org.apache.arrow.memory.ReferenceManager import org.apache.arrow.vector.ValueVector -import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.rapids.shims.v2.GpuShuffleExchangeExec -import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.Resolver -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.errors.attachTree import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.Average -import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNode -import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, ShuffleQueryStageExec} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec -import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.rapids.GpuPartitioningUtils -import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan -import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ReusedExchangeExec, ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python._ -import org.apache.spark.sql.execution.window.WindowExecBase import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.rapids._ -import org.apache.spark.sql.rapids.execution.{GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks, SerializeBatchDeserializeHostBuffer, SerializeConcatHostBuffersDeserializeBatch} +import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBase, JoinTypeChecks} import org.apache.spark.sql.rapids.execution.python._ import org.apache.spark.sql.rapids.execution.python.shims.v2._ -import org.apache.spark.sql.rapids.shims.v2.{GpuColumnarToRowTransitionExec, GpuSchemaUtils, HadoopFSUtilsShim} -import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.rapids.shims.v2.{GpuColumnarToRowTransitionExec, HadoopFSUtilsShim} import org.apache.spark.sql.types._ import org.apache.spark.storage.{BlockId, BlockManagerId} // 31x nondb shims, used by 311cdh and 31x abstract class Spark31XShims extends Spark301util320Shims with Logging { - override def v1RepairTableCommand(tableName: TableIdentifier): RunnableCommand = - AlterTableRecoverPartitionsCommand(tableName) - override def getScalaUDFAsExpression( function: AnyRef, dataType: DataType, @@ -94,32 +75,6 @@ abstract class Spark31XShims extends Spark301util320Shims with Logging { startMapIndex, endMapIndex, startPartition, endPartition) } - override def getGpuBroadcastNestedLoopJoinShim( - left: SparkPlan, - right: SparkPlan, - join: BroadcastNestedLoopJoinExec, - joinType: JoinType, - condition: Option[Expression], - targetSizeBytes: Long): GpuBroadcastNestedLoopJoinExecBase = { - GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes) - } - - override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = { - plan match { - case _: GpuBroadcastHashJoinExec => true - case _ => false - } - } - - override def isWindowFunctionExec(plan: SparkPlan): Boolean = plan.isInstanceOf[WindowExecBase] - - override def isGpuShuffledHashJoin(plan: SparkPlan): Boolean = { - plan match { - case _: GpuShuffledHashJoinExec => true - case _ => false - } - } - override def getFileSourceMaxMetadataValueLength(sqlConf: SQLConf): Int = sqlConf.maxMetadataStringLength @@ -484,126 +439,6 @@ abstract class Spark31XShims extends Spark301util320Shims with Logging { ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap } - override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq( - GpuOverrides.scan[ParquetScan]( - "Parquet parsing", - (a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) { - override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this) - - override def convertToGpu(): Scan = { - GpuParquetScan(a.sparkSession, - a.hadoopConf, - a.fileIndex, - a.dataSchema, - a.readDataSchema, - a.readPartitionSchema, - a.pushedFilters, - a.options, - a.partitionFilters, - a.dataFilters, - conf) - } - }), - GpuOverrides.scan[OrcScan]( - "ORC parsing", - (a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) { - override def tagSelfForGpu(): Unit = - GpuOrcScanBase.tagSupport(this) - - override def convertToGpu(): Scan = - GpuOrcScan(a.sparkSession, - a.hadoopConf, - a.fileIndex, - a.dataSchema, - a.readDataSchema, - a.readPartitionSchema, - a.options, - a.pushedFilters, - a.partitionFilters, - a.dataFilters, - conf) - }) - ).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap - - override def getBuildSide(join: HashJoin): GpuBuildSide = { - GpuJoinUtils.getGpuBuildSide(join.buildSide) - } - - override def getBuildSide(join: BroadcastNestedLoopJoinExec): GpuBuildSide = { - GpuJoinUtils.getGpuBuildSide(join.buildSide) - } - - override def getPartitionFileNames( - partitions: Seq[PartitionDirectory]): Seq[String] = { - val files = partitions.flatMap(partition => partition.files) - files.map(_.getPath.getName) - } - - override def getPartitionFileStatusSize(partitions: Seq[PartitionDirectory]): Long = { - partitions.map(_.files.map(_.getLen).sum).sum - } - - override def getPartitionedFiles( - partitions: Array[PartitionDirectory]): Array[PartitionedFile] = { - partitions.flatMap { p => - p.files.map { f => - PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) - } - } - } - - override def getPartitionSplitFiles( - partitions: Array[PartitionDirectory], - maxSplitBytes: Long, - relation: HadoopFsRelation): Array[PartitionedFile] = { - partitions.flatMap { partition => - partition.files.flatMap { file => - // getPath() is very expensive so we only want to call it once in this block: - val filePath = file.getPath - val isSplitable = relation.fileFormat.isSplitable( - relation.sparkSession, relation.options, filePath) - PartitionedFileUtil.splitFiles( - sparkSession = relation.sparkSession, - file = file, - filePath = filePath, - isSplitable = isSplitable, - maxSplitBytes = maxSplitBytes, - partitionValues = partition.values - ) - } - } - } - - override def getFileScanRDD( - sparkSession: SparkSession, - readFunction: PartitionedFile => Iterator[InternalRow], - filePartitions: Seq[FilePartition]): RDD[InternalRow] = { - new FileScanRDD(sparkSession, readFunction, filePartitions) - } - - override def createFilePartition(index: Int, files: Array[PartitionedFile]): FilePartition = { - FilePartition(index, files) - } - - override def copyBatchScanExec( - batchScanExec: GpuBatchScanExec, - queryUsesInputFile: Boolean): GpuBatchScanExec = { - val scanCopy = batchScanExec.scan match { - case parquetScan: GpuParquetScan => - parquetScan.copy(queryUsesInputFile=queryUsesInputFile) - case orcScan: GpuOrcScan => - orcScan.copy(queryUsesInputFile=queryUsesInputFile) - case _ => throw new RuntimeException("Wrong format") // never reach here - } - batchScanExec.copy(scan=scanCopy) - } - - override def copyFileSourceScanExec( - scanExec: GpuFileSourceScanExec, - queryUsesInputFile: Boolean): GpuFileSourceScanExec = { - scanExec.copy(queryUsesInputFile=queryUsesInputFile)(scanExec.rapidsConf) - } - override def getGpuColumnarToRowTransition(plan: SparkPlan, exportColumnRdd: Boolean): GpuColumnarToRowExecParent = { val serName = plan.conf.getConf(StaticSQLConf.SPARK_CACHE_SERIALIZER) @@ -615,13 +450,6 @@ abstract class Spark31XShims extends Spark301util320Shims with Logging { } } - override def checkColumnNameDuplication( - schema: StructType, - colType: String, - resolver: Resolver): Unit = { - GpuSchemaUtils.checkColumnNameDuplication(schema, colType, resolver) - } - override def getGpuShuffleExchangeExec( gpuOutputPartitioning: GpuPartitioning, child: SparkPlan, @@ -645,13 +473,6 @@ abstract class Spark31XShims extends Spark301util320Shims with Logging { s.copy(child = child) } - override def alias(child: Expression, name: String)( - exprId: ExprId, - qualifier: Seq[String], - explicitMetadata: Option[Metadata]): Alias = { - Alias(child, name)(exprId, qualifier, explicitMetadata) - } - override def shouldIgnorePath(path: String): Boolean = { HadoopFSUtilsShim.shouldIgnorePath(path) } @@ -666,127 +487,9 @@ abstract class Spark31XShims extends Spark301util320Shims with Logging { (arrowBuf.nioBuffer(), arrowBuf.getReferenceManager) } - override def getArrowValidityBuf(vec: ValueVector): (ByteBuffer, ReferenceManager) = { - val arrowBuf = vec.getValidityBuffer - (arrowBuf.nioBuffer(), arrowBuf.getReferenceManager) - } - - override def createTable(table: CatalogTable, - sessionCatalog: SessionCatalog, - tableLocation: Option[URI], - result: BaseRelation) = { - val newTable = table.copy( - storage = table.storage.copy(locationUri = tableLocation), - // We will use the schema of resolved.relation as the schema of the table (instead of - // the schema of df). It is important since the nullability may be changed by the relation - // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). - schema = result.schema) - // Table location is already validated. No need to check it again during table creation. - sessionCatalog.createTable(newTable, ignoreIfExists = false, validateLocation = false) - } - - override def getArrowOffsetsBuf(vec: ValueVector): (ByteBuffer, ReferenceManager) = { - val arrowBuf = vec.getOffsetBuffer - (arrowBuf.nioBuffer(), arrowBuf.getReferenceManager) - } - /** matches SPARK-33008 fix in 3.1.1 */ override def shouldFailDivByZero(): Boolean = SQLConf.get.ansiEnabled - override def replaceWithAlluxioPathIfNeeded( - conf: RapidsConf, - relation: HadoopFsRelation, - partitionFilters: Seq[Expression], - dataFilters: Seq[Expression]): FileIndex = { - - val alluxioPathsReplace: Option[Seq[String]] = conf.getAlluxioPathsToReplace - - if (alluxioPathsReplace.isDefined) { - // alluxioPathsReplace: Seq("key->value", "key1->value1") - // turn the rules to the Map with eg - // { s3:/foo -> alluxio://0.1.2.3:19998/foo, - // gs:/bar -> alluxio://0.1.2.3:19998/bar, - // /baz -> alluxio://0.1.2.3:19998/baz } - val replaceMapOption = alluxioPathsReplace.map(rules => { - rules.map(rule => { - val split = rule.split("->") - if (split.size == 2) { - split(0).trim -> split(1).trim - } else { - throw new IllegalArgumentException(s"Invalid setting for " + - s"${RapidsConf.ALLUXIO_PATHS_REPLACE.key}") - } - }).toMap - }) - - replaceMapOption.map(replaceMap => { - - def isDynamicPruningFilter(e: Expression): Boolean = - e.find(_.isInstanceOf[PlanExpression[_]]).isDefined - - val partitionDirs = relation.location.listFiles( - partitionFilters.filterNot(isDynamicPruningFilter), dataFilters) - - // replacement func to check if the file path is prefixed with the string user configured - // if yes, replace it - val replaceFunc = (f: Path) => { - val pathStr = f.toString - val matchedSet = replaceMap.keySet.filter(reg => pathStr.startsWith(reg)) - if (matchedSet.size > 1) { - // never reach here since replaceMap is a Map - throw new IllegalArgumentException(s"Found ${matchedSet.size} same replacing rules " + - s"from ${RapidsConf.ALLUXIO_PATHS_REPLACE.key} which requires only 1 rule for each " + - s"file path") - } else if (matchedSet.size == 1) { - new Path(pathStr.replaceFirst(matchedSet.head, replaceMap(matchedSet.head))) - } else { - f - } - } - - // replace all of input files - val inputFiles: Seq[Path] = partitionDirs.flatMap(partitionDir => { - replacePartitionDirectoryFiles(partitionDir, replaceFunc) - }) - - // replace all of rootPaths which are already unique - val rootPaths = relation.location.rootPaths.map(replaceFunc) - - val parameters: Map[String, String] = relation.options - - // infer PartitionSpec - val partitionSpec = GpuPartitioningUtils.inferPartitioning( - relation.sparkSession, - rootPaths, - inputFiles, - parameters, - Option(relation.dataSchema), - replaceFunc) - - // generate a new InMemoryFileIndex holding paths with alluxio schema - new InMemoryFileIndex( - relation.sparkSession, - inputFiles, - parameters, - Option(relation.dataSchema), - userSpecifiedPartitionSpec = Some(partitionSpec)) - }).getOrElse(relation.location) - - } else { - relation.location - } - } - - override def replacePartitionDirectoryFiles(partitionDir: PartitionDirectory, - replaceFunc: Path => Path): Seq[Path] = { - partitionDir.files.map(f => replaceFunc(f.getPath)) - } - - override def reusedExchangeExecPfn: PartialFunction[SparkPlan, ReusedExchangeExec] = { - case ShuffleQueryStageExec(_, e: ReusedExchangeExec) => e - case BroadcastQueryStageExec(_, e: ReusedExchangeExec) => e - } - /** dropped by SPARK-34234 */ override def attachTreeIfSupported[TreeType <: TreeNode[_], A]( tree: TreeType, @@ -796,22 +499,6 @@ abstract class Spark31XShims extends Spark301util320Shims with Logging { attachTree(tree, msg)(f) } - override def hasAliasQuoteFix: Boolean = false - - override def filesFromFileIndex(fileIndex: PartitioningAwareFileIndex): Seq[FileStatus] = { - fileIndex.allFiles() - } - - override def broadcastModeTransform(mode: BroadcastMode, rows: Array[InternalRow]): Any = - mode.transform(rows) - - override def registerKryoClasses(kryo: Kryo): Unit = { - kryo.register(classOf[SerializeConcatHostBuffersDeserializeBatch], - new KryoJavaSerializer()) - kryo.register(classOf[SerializeBatchDeserializeHostBuffer], - new KryoJavaSerializer()) - } - override def shouldFallbackOnAnsiTimestamp(): Boolean = SQLConf.get.ansiEnabled override def getAdaptiveInputPlan(adaptivePlan: AdaptiveSparkPlanExec): SparkPlan = {