Skip to content

Commit

Permalink
Fix Inheritance Shadowing to add support for Spark 4.0.0 [databrick…
Browse files Browse the repository at this point in the history
…s] (#10829)

* Scala 2.13: Inheritance shadowing

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* Signing off

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

---------

Signed-off-by: Raza Jafri <rjafri@nvidia.com>
  • Loading branch information
razajafri authored May 17, 2024
1 parent b88d19f commit 4276bd8
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,7 @@ object GpuOverrides extends Logging {
ExprChecks.mathUnaryWithAst,
(a, conf, p, r) => new UnaryAstExprMeta[Acosh](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression =
if (conf.includeImprovedFloat) {
if (this.conf.includeImprovedFloat) {
GpuAcoshImproved(child)
} else {
GpuAcoshCompat(child)
Expand All @@ -1232,14 +1232,14 @@ object GpuOverrides extends Logging {
ExprChecks.mathUnaryWithAst,
(a, conf, p, r) => new UnaryAstExprMeta[Asinh](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression =
if (conf.includeImprovedFloat) {
if (this.conf.includeImprovedFloat) {
GpuAsinhImproved(child)
} else {
GpuAsinhCompat(child)
}

override def tagSelfForAst(): Unit = {
if (!conf.includeImprovedFloat) {
if (!this.conf.includeImprovedFloat) {
// AST is not expressive enough yet to implement the conditional expression needed
// to emulate Spark's behavior
willNotWorkInAst("asinh is not AST compatible unless " +
Expand Down Expand Up @@ -2068,9 +2068,9 @@ object GpuOverrides extends Logging {
Some(RepeatingParamCheck("filter", TypeSig.BOOLEAN, TypeSig.BOOLEAN))),
(a, conf, p, r) => new ExprMeta[AggregateExpression](a, conf, p, r) {
private val filter: Option[BaseExprMeta[_]] =
a.filter.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
a.filter.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
private val childrenExprMeta: Seq[BaseExprMeta[Expression]] =
a.children.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
a.children.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
override val childExprs: Seq[BaseExprMeta[_]] =
childrenExprMeta ++ filter.toSeq

Expand Down Expand Up @@ -2224,7 +2224,7 @@ object GpuOverrides extends Logging {
(a, conf, p, r) => new AggExprMeta[Sum](a, conf, p, r) {
override def tagAggForGpu(): Unit = {
val inputDataType = a.child.dataType
checkAndTagFloatAgg(inputDataType, conf, this)
checkAndTagFloatAgg(inputDataType, this.conf, this)
}

override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
Expand Down Expand Up @@ -2297,7 +2297,7 @@ object GpuOverrides extends Logging {
(a, conf, p, r) => new BinaryExprMeta[BRound](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
a.child.dataType match {
case FloatType | DoubleType if !conf.isIncompatEnabled =>
case FloatType | DoubleType if !this.conf.isIncompatEnabled =>
willNotWorkOnGpu("rounding floating point numbers may be slightly off " +
s"compared to Spark's result, to enable set ${RapidsConf.INCOMPATIBLE_OPS}")
case _ => // NOOP
Expand All @@ -2318,7 +2318,7 @@ object GpuOverrides extends Logging {
(a, conf, p, r) => new BinaryExprMeta[Round](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
a.child.dataType match {
case FloatType | DoubleType if !conf.isIncompatEnabled =>
case FloatType | DoubleType if !this.conf.isIncompatEnabled =>
willNotWorkOnGpu("rounding floating point numbers may be slightly off " +
s"compared to Spark's result, to enable set ${RapidsConf.INCOMPATIBLE_OPS}")
case _ => // NOOP
Expand Down Expand Up @@ -3138,7 +3138,7 @@ object GpuOverrides extends Logging {
(in, conf, p, r) => new BinaryExprMeta[FormatNumber](in, conf, p, r) {
override def tagExprForGpu(): Unit = {
in.children.head.dataType match {
case FloatType | DoubleType if !conf.isFloatFormatNumberEnabled =>
case FloatType | DoubleType if !this.conf.isFloatFormatNumberEnabled =>
willNotWorkOnGpu("format_number with floating point types on the GPU returns " +
"results that have a different precision than the default results of Spark. " +
"To enable this operation on the GPU, set" +
Expand Down Expand Up @@ -3191,7 +3191,7 @@ object GpuOverrides extends Logging {
TypeSig.all))),
(a, conf, p, r) => new ExprMeta[Murmur3Hash](a, conf, p, r) {
override val childExprs: Seq[BaseExprMeta[_]] = a.children
.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))

override def tagExprForGpu(): Unit = {
val arrayWithStructsHashing = a.children.exists(e =>
Expand All @@ -3216,7 +3216,7 @@ object GpuOverrides extends Logging {
XxHash64Shims.supportedTypes, TypeSig.all))),
(a, conf, p, r) => new ExprMeta[XxHash64](a, conf, p, r) {
override val childExprs: Seq[BaseExprMeta[_]] = a.children
.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))

def convertToGpu(): GpuExpression =
GpuXxHash64(childExprs.map(_.convertToGpu()), a.seed)
Expand Down Expand Up @@ -3429,7 +3429,7 @@ object GpuOverrides extends Logging {
TypeSig.all))),
(c, conf, p, r) => new TypedImperativeAggExprMeta[CollectList](c, conf, p, r) {
override def tagAggForGpu(): Unit = {
if (context == WindowAggExprContext && !conf.isWindowCollectListEnabled) {
if (context == WindowAggExprContext && !this.conf.isWindowCollectListEnabled) {
willNotWorkOnGpu("collect_list is disabled for window operations because " +
"the output explodes in size proportional to the window size squared. If " +
"you know the window is small you can try it by setting " +
Expand Down Expand Up @@ -3472,7 +3472,7 @@ object GpuOverrides extends Logging {
TypeSig.all))),
(c, conf, p, r) => new TypedImperativeAggExprMeta[CollectSet](c, conf, p, r) {
override def tagAggForGpu(): Unit = {
if (context == WindowAggExprContext && !conf.isWindowCollectSetEnabled) {
if (context == WindowAggExprContext && !this.conf.isWindowCollectSetEnabled) {
willNotWorkOnGpu("collect_set is disabled for window operations because " +
"the output can explode in size proportional to the window size squared. If " +
"you know the window is small you can try it by setting " +
Expand Down Expand Up @@ -3867,9 +3867,9 @@ object GpuOverrides extends Logging {
a.options,
a.partitionFilters,
a.dataFilters,
conf.maxReadBatchSizeRows,
conf.maxReadBatchSizeBytes,
conf.maxGpuColumnSizeBytes)
this.conf.maxReadBatchSizeRows,
this.conf.maxReadBatchSizeBytes,
this.conf.maxGpuColumnSizeBytes)
}),
GpuOverrides.scan[JsonScan](
"Json parsing",
Expand All @@ -3885,9 +3885,9 @@ object GpuOverrides extends Logging {
a.options,
a.partitionFilters,
a.dataFilters,
conf.maxReadBatchSizeRows,
conf.maxReadBatchSizeBytes,
conf.maxGpuColumnSizeBytes)
this.conf.maxReadBatchSizeRows,
this.conf.maxReadBatchSizeBytes,
this.conf.maxGpuColumnSizeBytes)
})).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

val scans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] =
Expand All @@ -3913,7 +3913,7 @@ object GpuOverrides extends Logging {
),
(hp, conf, p, r) => new PartMeta[HashPartitioning](hp, conf, p, r) {
override val childExprs: Seq[BaseExprMeta[_]] =
hp.expressions.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
hp.expressions.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))

override def tagPartForGpu(): Unit = {
val arrayWithStructsHashing = hp.expressions.exists(e =>
Expand All @@ -3939,7 +3939,7 @@ object GpuOverrides extends Logging {
TypeSig.orderable)),
(rp, conf, p, r) => new PartMeta[RangePartitioning](rp, conf, p, r) {
override val childExprs: Seq[BaseExprMeta[_]] =
rp.ordering.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
rp.ordering.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))

override def convertToGpu(): GpuPartitioning = {
if (rp.numPartitions > 1) {
Expand Down Expand Up @@ -4047,7 +4047,7 @@ object GpuOverrides extends Logging {
new SparkPlanMeta[RangeExec](range, conf, p, r) {
override def convertToGpu(): GpuExec =
GpuRangeExec(range.start, range.end, range.step, range.numSlices, range.output,
conf.gpuTargetBatchSizeBytes)
this.conf.gpuTargetBatchSizeBytes)
}
}),
exec[BatchScanExec](
Expand Down Expand Up @@ -4075,7 +4075,7 @@ object GpuOverrides extends Logging {
TypeSig.all),
(p, conf, parent, r) => new SparkPlanMeta[DataWritingCommandExec](p, conf, parent, r) {
override val childDataWriteCmds: scala.Seq[DataWritingCommandMeta[_]] =
Seq(GpuOverrides.wrapDataWriteCmds(p.cmd, conf, Some(this)))
Seq(GpuOverrides.wrapDataWriteCmds(p.cmd, this.conf, Some(this)))

override def convertToGpu(): GpuExec =
GpuDataWritingCommandExec(childDataWriteCmds.head.convertToGpu(),
Expand All @@ -4094,9 +4094,9 @@ object GpuOverrides extends Logging {
(takeExec, conf, p, r) =>
new SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, conf, p, r) {
val sortOrder: Seq[BaseExprMeta[SortOrder]] =
takeExec.sortOrder.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
takeExec.sortOrder.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
val projectList: Seq[BaseExprMeta[NamedExpression]] =
takeExec.projectList.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
takeExec.projectList.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
override val childExprs: Seq[BaseExprMeta[_]] = sortOrder ++ projectList

override def convertToGpu(): GpuExec = {
Expand Down Expand Up @@ -4162,7 +4162,7 @@ object GpuOverrides extends Logging {
(filter, conf, p, r) => new SparkPlanMeta[FilterExec](filter, conf, p, r) {
override def convertToGpu(): GpuExec = {
GpuFilterExec(childExprs.head.convertToGpu(),
childPlans.head.convertIfNeeded())(useTieredProject = conf.isTieredProjectEnabled)
childPlans.head.convertIfNeeded())(useTieredProject = this.conf.isTieredProjectEnabled)
}
}),
exec[ShuffleExchangeExec](
Expand Down Expand Up @@ -4215,7 +4215,7 @@ object GpuOverrides extends Logging {
TypeSig.all),
(join, conf, p, r) => new SparkPlanMeta[CartesianProductExec](join, conf, p, r) {
val condition: Option[BaseExprMeta[_]] =
join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
join.condition.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))

override val childExprs: Seq[BaseExprMeta[_]] = condition.toSeq

Expand All @@ -4225,11 +4225,11 @@ object GpuOverrides extends Logging {
left,
right,
None,
conf.gpuTargetBatchSizeBytes)
this.conf.gpuTargetBatchSizeBytes)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(),
joinExec)(useTieredProject = conf.isTieredProjectEnabled)).getOrElse(joinExec)
joinExec)(useTieredProject = this.conf.isTieredProjectEnabled)).getOrElse(joinExec)
}
}),
exec[HashAggregateExec](
Expand Down Expand Up @@ -4340,9 +4340,9 @@ object GpuOverrides extends Logging {
(e, conf, p, r) =>
new SparkPlanMeta[ArrowEvalPythonExec](e, conf, p, r) {
val udfs: Seq[BaseExprMeta[PythonUDF]] =
e.udfs.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
e.udfs.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
val resultAttrs: Seq[BaseExprMeta[Attribute]] =
e.resultAttrs.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
e.resultAttrs.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
override val childExprs: Seq[BaseExprMeta[_]] = udfs ++ resultAttrs

override def replaceMessage: String = "partially run on GPU"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,18 +30,18 @@ class IcebergProviderImpl extends IcebergProvider {
Seq(new ScanRule[Scan](
(a, conf, p, r) => new ScanMeta[Scan](a, conf, p, r) {
private lazy val convertedScan: Try[GpuSparkBatchQueryScan] = Try {
GpuSparkBatchQueryScan.fromCpu(a, conf)
GpuSparkBatchQueryScan.fromCpu(a, this.conf)
}

override def supportsRuntimeFilters: Boolean = true

override def tagSelfForGpu(): Unit = {
if (!conf.isIcebergEnabled) {
if (!this.conf.isIcebergEnabled) {
willNotWorkOnGpu("Iceberg input and output has been disabled. To enable set " +
s"${RapidsConf.ENABLE_ICEBERG} to true")
}

if (!conf.isIcebergReadEnabled) {
if (!this.conf.isIcebergReadEnabled) {
willNotWorkOnGpu("Iceberg input has been disabled. To enable set " +
s"${RapidsConf.ENABLE_ICEBERG_READ} to true")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,7 +59,7 @@ class HiveProviderImpl extends HiveProviderCmdShims {
}

override def tagExprForGpu(): Unit = {
if (opRapidsFunc.isEmpty && !conf.isCpuBasedUDFEnabled) {
if (opRapidsFunc.isEmpty && !this.conf.isCpuBasedUDFEnabled) {
willNotWorkOnGpu(s"Hive SimpleUDF ${a.name} implemented by " +
s"${a.funcWrapper.functionClassName} does not provide a GPU implementation " +
s"and CPU-based UDFs are not enabled by `${RapidsConf.ENABLE_CPU_BASED_UDF.key}`")
Expand All @@ -78,7 +78,7 @@ class HiveProviderImpl extends HiveProviderCmdShims {
a.deterministic)
}.getOrElse {
// This `require` is just for double check.
require(conf.isCpuBasedUDFEnabled)
require(this.conf.isCpuBasedUDFEnabled)
GpuRowBasedHiveSimpleUDF(
a.name,
a.funcWrapper,
Expand All @@ -101,7 +101,7 @@ class HiveProviderImpl extends HiveProviderCmdShims {
}

override def tagExprForGpu(): Unit = {
if (opRapidsFunc.isEmpty && !conf.isCpuBasedUDFEnabled) {
if (opRapidsFunc.isEmpty && !this.conf.isCpuBasedUDFEnabled) {
willNotWorkOnGpu(s"Hive GenericUDF ${a.name} implemented by " +
s"${a.funcWrapper.functionClassName} does not provide a GPU implementation " +
s"and CPU-based UDFs are not enabled by `${RapidsConf.ENABLE_CPU_BASED_UDF.key}`")
Expand All @@ -121,7 +121,7 @@ class HiveProviderImpl extends HiveProviderCmdShims {
a.foldable)
}.getOrElse {
// This `require` is just for double check.
require(conf.isCpuBasedUDFEnabled)
require(this.conf.isCpuBasedUDFEnabled)
GpuRowBasedHiveGenericUDF(
a.name,
a.funcWrapper,
Expand Down Expand Up @@ -195,11 +195,11 @@ class HiveProviderImpl extends HiveProviderCmdShims {
}

private def checkIfEnabled(): Unit = {
if (!conf.isHiveDelimitedTextEnabled) {
if (!this.conf.isHiveDelimitedTextEnabled) {
willNotWorkOnGpu("Hive text I/O has been disabled. To enable this, " +
s"set ${RapidsConf.ENABLE_HIVE_TEXT} to true")
}
if (!conf.isHiveDelimitedTextReadEnabled) {
if (!this.conf.isHiveDelimitedTextReadEnabled) {
willNotWorkOnGpu("reading Hive delimited text tables has been disabled, " +
s"to enable this, set ${RapidsConf.ENABLE_HIVE_TEXT_READ} to true")
}
Expand Down Expand Up @@ -268,7 +268,7 @@ class HiveProviderImpl extends HiveProviderCmdShims {
TrampolineUtil.dataTypeExistsRecursively(att.dataType, dt => dt == FloatType)
}

if (!conf.shouldHiveReadFloats && hasFloats) {
if (!this.conf.shouldHiveReadFloats && hasFloats) {
willNotWorkOnGpu("reading of floats has been disabled set " +
s"${RapidsConf.ENABLE_READ_HIVE_FLOATS} to true to enable this.")
}
Expand All @@ -277,7 +277,7 @@ class HiveProviderImpl extends HiveProviderCmdShims {
TrampolineUtil.dataTypeExistsRecursively(att.dataType, dt => dt == DoubleType)
}

if (!conf.shouldHiveReadDoubles && hasDoubles) {
if (!this.conf.shouldHiveReadDoubles && hasDoubles) {
willNotWorkOnGpu("reading of doubles has been disabled set " +
s"${RapidsConf.ENABLE_READ_HIVE_DOUBLES} to true to enable this.")
}
Expand All @@ -287,7 +287,7 @@ class HiveProviderImpl extends HiveProviderCmdShims {
dt => dt.isInstanceOf[DecimalType])
}

if (!conf.shouldHiveReadDecimals && hasDecimals) {
if (!this.conf.shouldHiveReadDecimals && hasDecimals) {
willNotWorkOnGpu("reading of decimal typed values has been disabled set " +
s"${RapidsConf.ENABLE_READ_HIVE_DECIMALS} to true to enable this.")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -88,7 +88,7 @@ class AvroProviderImpl extends AvroProvider {
a.readPartitionSchema,
a.options,
a.pushedFilters,
conf,
this.conf,
a.partitionFilters,
a.dataFilters)
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,7 +53,7 @@ object GpuScalaUDFMeta {
lazy val opRapidsFunc = GpuScalaUDF.getRapidsUDFInstance(expr.function)

override def tagExprForGpu(): Unit = {
if (opRapidsFunc.isEmpty && !conf.isCpuBasedUDFEnabled) {
if (opRapidsFunc.isEmpty && !this.conf.isCpuBasedUDFEnabled) {
val udfName = expr.udfName.getOrElse("UDF")
val udfClass = expr.function.getClass
willNotWorkOnGpu(s"neither $udfName implemented by $udfClass provides " +
Expand All @@ -76,7 +76,7 @@ object GpuScalaUDFMeta {
expr.udfDeterministic)
}.getOrElse {
// This `require` is just for double check.
require(conf.isCpuBasedUDFEnabled)
require(this.conf.isCpuBasedUDFEnabled)
GpuRowBasedScalaUDF(
expr.function,
expr.dataType,
Expand Down
Loading

0 comments on commit 4276bd8

Please sign in to comment.