Skip to content

Commit

Permalink
Remove InMemoryTableScanExec support for Spark 3.5+
Browse files Browse the repository at this point in the history
  • Loading branch information
razajafri committed Mar 14, 2024
1 parent e0ef44a commit 8d38f8a
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, QueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.{DataWritingCommand, DataWritingCommandExec, ExecutedCommandExec, RunnableCommand}
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, SaveIntoDataSourceCommand}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
Expand Down Expand Up @@ -4373,11 +4372,6 @@ object GpuOverrides extends Logging {
ExecChecks((TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(),
TypeSig.all),
(mapPy, conf, p, r) => new GpuMapInPandasExecMeta(mapPy, conf, p, r)),
exec[InMemoryTableScanExec](
"Implementation of InMemoryTableScanExec to use GPU accelerated caching",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.STRUCT + TypeSig.ARRAY +
TypeSig.MAP + GpuTypeShims.additionalCommonOperatorSupportedTypes).nested(), TypeSig.all),
(scan, conf, p, r) => new InMemoryTableScanMeta(scan, conf, p, r)),
neverReplaceExec[AlterNamespaceSetPropertiesExec]("Namespace metadata operation"),
neverReplaceExec[CreateNamespaceExec]("Namespace metadata operation"),
neverReplaceExec[DescribeNamespaceExec]("Namespace metadata operation"),
Expand All @@ -4403,6 +4397,7 @@ object GpuOverrides extends Logging {

lazy val execs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] =
commonExecs ++ GpuHiveOverrides.execs ++ ExternalSource.execRules ++
InMemoryTableScanOverrides.execs ++
SparkShimImpl.getExecs // Shim execs at the end; shims get the last word in substitutions.

def getTimeParserPolicy: TimeParserPolicy = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "311"}
{"spark": "312"}
{"spark": "313"}
{"spark": "320"}
{"spark": "321"}
{"spark": "321cdh"}
{"spark": "322"}
{"spark": "323"}
{"spark": "324"}
{"spark": "330"}
{"spark": "330cdh"}
{"spark": "330db"}
{"spark": "331"}
{"spark": "332"}
{"spark": "332cdh"}
{"spark": "332db"}
{"spark": "333"}
{"spark": "334"}
{"spark": "340"}
{"spark": "341"}
{"spark": "341db"}
{"spark": "342"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import com.nvidia.spark.rapids._

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.rapids.InMemoryTableScanMeta

object InMemoryTableScanOverrides {
val execs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = Seq(
GpuOverrides.exec[InMemoryTableScanExec](
"Implementation of InMemoryTableScanExec to use GPU accelerated caching",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.STRUCT + TypeSig.ARRAY +
TypeSig.MAP + GpuTypeShims.additionalCommonOperatorSupportedTypes).nested(), TypeSig.all),
(scan, conf, p, r) => new InMemoryTableScanMeta(scan, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "350"}
{"spark": "351"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import com.nvidia.spark.rapids.ExecRule

import org.apache.spark.sql.execution.SparkPlan

object InMemoryTableScanOverrides {
val execs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = Map.empty
}
1 change: 0 additions & 1 deletion tools/generated_files/350/operatorsScore.csv
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ AQEShuffleReadExec,3.0
HashAggregateExec,4.5
ObjectHashAggregateExec,3.0
SortAggregateExec,3.0
InMemoryTableScanExec,3.0
DataWritingCommandExec,3.0
ExecutedCommandExec,3.0
WriteFilesExec,3.0
Expand Down
1 change: 0 additions & 1 deletion tools/generated_files/350/supportedExecs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ AQEShuffleReadExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS
HashAggregateExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,PS,NS,PS,PS,PS,NS,NS,NS
ObjectHashAggregateExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,PS,NS,PS,PS,PS,NS,NS,NS
SortAggregateExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,PS,NS,PS,PS,PS,NS,NS,NS
InMemoryTableScanExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,NS,NS,NS,PS,PS,PS,NS,S,S
DataWritingCommandExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,PS,NS,S,NS,PS,PS,PS,NS,S,S
ExecutedCommandExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S,S,S
WriteFilesExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S,S,S
Expand Down

0 comments on commit 8d38f8a

Please sign in to comment.