Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Inheritance Shadowing to add support for Spark 4.0.0 [databricks] #10829

Merged
merged 2 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,7 @@ object GpuOverrides extends Logging {
ExprChecks.mathUnaryWithAst,
(a, conf, p, r) => new UnaryAstExprMeta[Acosh](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression =
if (conf.includeImprovedFloat) {
if (this.conf.includeImprovedFloat) {
GpuAcoshImproved(child)
} else {
GpuAcoshCompat(child)
Expand All @@ -1232,14 +1232,14 @@ object GpuOverrides extends Logging {
ExprChecks.mathUnaryWithAst,
(a, conf, p, r) => new UnaryAstExprMeta[Asinh](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression =
if (conf.includeImprovedFloat) {
if (this.conf.includeImprovedFloat) {
GpuAsinhImproved(child)
} else {
GpuAsinhCompat(child)
}

override def tagSelfForAst(): Unit = {
if (!conf.includeImprovedFloat) {
if (!this.conf.includeImprovedFloat) {
// AST is not expressive enough yet to implement the conditional expression needed
// to emulate Spark's behavior
willNotWorkInAst("asinh is not AST compatible unless " +
Expand Down Expand Up @@ -2068,9 +2068,9 @@ object GpuOverrides extends Logging {
Some(RepeatingParamCheck("filter", TypeSig.BOOLEAN, TypeSig.BOOLEAN))),
(a, conf, p, r) => new ExprMeta[AggregateExpression](a, conf, p, r) {
private val filter: Option[BaseExprMeta[_]] =
a.filter.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
a.filter.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
private val childrenExprMeta: Seq[BaseExprMeta[Expression]] =
a.children.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
a.children.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
override val childExprs: Seq[BaseExprMeta[_]] =
childrenExprMeta ++ filter.toSeq

Expand Down Expand Up @@ -2224,7 +2224,7 @@ object GpuOverrides extends Logging {
(a, conf, p, r) => new AggExprMeta[Sum](a, conf, p, r) {
override def tagAggForGpu(): Unit = {
val inputDataType = a.child.dataType
checkAndTagFloatAgg(inputDataType, conf, this)
checkAndTagFloatAgg(inputDataType, this.conf, this)
}

override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
Expand Down Expand Up @@ -2297,7 +2297,7 @@ object GpuOverrides extends Logging {
(a, conf, p, r) => new BinaryExprMeta[BRound](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
a.child.dataType match {
case FloatType | DoubleType if !conf.isIncompatEnabled =>
case FloatType | DoubleType if !this.conf.isIncompatEnabled =>
willNotWorkOnGpu("rounding floating point numbers may be slightly off " +
s"compared to Spark's result, to enable set ${RapidsConf.INCOMPATIBLE_OPS}")
case _ => // NOOP
Expand All @@ -2318,7 +2318,7 @@ object GpuOverrides extends Logging {
(a, conf, p, r) => new BinaryExprMeta[Round](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
a.child.dataType match {
case FloatType | DoubleType if !conf.isIncompatEnabled =>
case FloatType | DoubleType if !this.conf.isIncompatEnabled =>
willNotWorkOnGpu("rounding floating point numbers may be slightly off " +
s"compared to Spark's result, to enable set ${RapidsConf.INCOMPATIBLE_OPS}")
case _ => // NOOP
Expand Down Expand Up @@ -3138,7 +3138,7 @@ object GpuOverrides extends Logging {
(in, conf, p, r) => new BinaryExprMeta[FormatNumber](in, conf, p, r) {
override def tagExprForGpu(): Unit = {
in.children.head.dataType match {
case FloatType | DoubleType if !conf.isFloatFormatNumberEnabled =>
case FloatType | DoubleType if !this.conf.isFloatFormatNumberEnabled =>
willNotWorkOnGpu("format_number with floating point types on the GPU returns " +
"results that have a different precision than the default results of Spark. " +
"To enable this operation on the GPU, set" +
Expand Down Expand Up @@ -3191,7 +3191,7 @@ object GpuOverrides extends Logging {
TypeSig.all))),
(a, conf, p, r) => new ExprMeta[Murmur3Hash](a, conf, p, r) {
override val childExprs: Seq[BaseExprMeta[_]] = a.children
.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))

override def tagExprForGpu(): Unit = {
val arrayWithStructsHashing = a.children.exists(e =>
Expand All @@ -3216,7 +3216,7 @@ object GpuOverrides extends Logging {
XxHash64Shims.supportedTypes, TypeSig.all))),
(a, conf, p, r) => new ExprMeta[XxHash64](a, conf, p, r) {
override val childExprs: Seq[BaseExprMeta[_]] = a.children
.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))

def convertToGpu(): GpuExpression =
GpuXxHash64(childExprs.map(_.convertToGpu()), a.seed)
Expand Down Expand Up @@ -3429,7 +3429,7 @@ object GpuOverrides extends Logging {
TypeSig.all))),
(c, conf, p, r) => new TypedImperativeAggExprMeta[CollectList](c, conf, p, r) {
override def tagAggForGpu(): Unit = {
if (context == WindowAggExprContext && !conf.isWindowCollectListEnabled) {
if (context == WindowAggExprContext && !this.conf.isWindowCollectListEnabled) {
willNotWorkOnGpu("collect_list is disabled for window operations because " +
"the output explodes in size proportional to the window size squared. If " +
"you know the window is small you can try it by setting " +
Expand Down Expand Up @@ -3472,7 +3472,7 @@ object GpuOverrides extends Logging {
TypeSig.all))),
(c, conf, p, r) => new TypedImperativeAggExprMeta[CollectSet](c, conf, p, r) {
override def tagAggForGpu(): Unit = {
if (context == WindowAggExprContext && !conf.isWindowCollectSetEnabled) {
if (context == WindowAggExprContext && !this.conf.isWindowCollectSetEnabled) {
willNotWorkOnGpu("collect_set is disabled for window operations because " +
"the output can explode in size proportional to the window size squared. If " +
"you know the window is small you can try it by setting " +
Expand Down Expand Up @@ -3867,9 +3867,9 @@ object GpuOverrides extends Logging {
a.options,
a.partitionFilters,
a.dataFilters,
conf.maxReadBatchSizeRows,
conf.maxReadBatchSizeBytes,
conf.maxGpuColumnSizeBytes)
this.conf.maxReadBatchSizeRows,
this.conf.maxReadBatchSizeBytes,
this.conf.maxGpuColumnSizeBytes)
}),
GpuOverrides.scan[JsonScan](
"Json parsing",
Expand All @@ -3885,9 +3885,9 @@ object GpuOverrides extends Logging {
a.options,
a.partitionFilters,
a.dataFilters,
conf.maxReadBatchSizeRows,
conf.maxReadBatchSizeBytes,
conf.maxGpuColumnSizeBytes)
this.conf.maxReadBatchSizeRows,
this.conf.maxReadBatchSizeBytes,
this.conf.maxGpuColumnSizeBytes)
})).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

val scans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] =
Expand All @@ -3913,7 +3913,7 @@ object GpuOverrides extends Logging {
),
(hp, conf, p, r) => new PartMeta[HashPartitioning](hp, conf, p, r) {
override val childExprs: Seq[BaseExprMeta[_]] =
hp.expressions.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
hp.expressions.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))

override def tagPartForGpu(): Unit = {
val arrayWithStructsHashing = hp.expressions.exists(e =>
Expand All @@ -3939,7 +3939,7 @@ object GpuOverrides extends Logging {
TypeSig.orderable)),
(rp, conf, p, r) => new PartMeta[RangePartitioning](rp, conf, p, r) {
override val childExprs: Seq[BaseExprMeta[_]] =
rp.ordering.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
rp.ordering.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))

override def convertToGpu(): GpuPartitioning = {
if (rp.numPartitions > 1) {
Expand Down Expand Up @@ -4047,7 +4047,7 @@ object GpuOverrides extends Logging {
new SparkPlanMeta[RangeExec](range, conf, p, r) {
override def convertToGpu(): GpuExec =
GpuRangeExec(range.start, range.end, range.step, range.numSlices, range.output,
conf.gpuTargetBatchSizeBytes)
this.conf.gpuTargetBatchSizeBytes)
}
}),
exec[BatchScanExec](
Expand Down Expand Up @@ -4075,7 +4075,7 @@ object GpuOverrides extends Logging {
TypeSig.all),
(p, conf, parent, r) => new SparkPlanMeta[DataWritingCommandExec](p, conf, parent, r) {
override val childDataWriteCmds: scala.Seq[DataWritingCommandMeta[_]] =
Seq(GpuOverrides.wrapDataWriteCmds(p.cmd, conf, Some(this)))
Seq(GpuOverrides.wrapDataWriteCmds(p.cmd, this.conf, Some(this)))

override def convertToGpu(): GpuExec =
GpuDataWritingCommandExec(childDataWriteCmds.head.convertToGpu(),
Expand All @@ -4094,9 +4094,9 @@ object GpuOverrides extends Logging {
(takeExec, conf, p, r) =>
new SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, conf, p, r) {
val sortOrder: Seq[BaseExprMeta[SortOrder]] =
takeExec.sortOrder.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
takeExec.sortOrder.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
val projectList: Seq[BaseExprMeta[NamedExpression]] =
takeExec.projectList.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
takeExec.projectList.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
override val childExprs: Seq[BaseExprMeta[_]] = sortOrder ++ projectList

override def convertToGpu(): GpuExec = {
Expand Down Expand Up @@ -4162,7 +4162,7 @@ object GpuOverrides extends Logging {
(filter, conf, p, r) => new SparkPlanMeta[FilterExec](filter, conf, p, r) {
override def convertToGpu(): GpuExec = {
GpuFilterExec(childExprs.head.convertToGpu(),
childPlans.head.convertIfNeeded())(useTieredProject = conf.isTieredProjectEnabled)
childPlans.head.convertIfNeeded())(useTieredProject = this.conf.isTieredProjectEnabled)
}
}),
exec[ShuffleExchangeExec](
Expand Down Expand Up @@ -4215,7 +4215,7 @@ object GpuOverrides extends Logging {
TypeSig.all),
(join, conf, p, r) => new SparkPlanMeta[CartesianProductExec](join, conf, p, r) {
val condition: Option[BaseExprMeta[_]] =
join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
join.condition.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))

override val childExprs: Seq[BaseExprMeta[_]] = condition.toSeq

Expand All @@ -4225,11 +4225,11 @@ object GpuOverrides extends Logging {
left,
right,
None,
conf.gpuTargetBatchSizeBytes)
this.conf.gpuTargetBatchSizeBytes)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(),
joinExec)(useTieredProject = conf.isTieredProjectEnabled)).getOrElse(joinExec)
joinExec)(useTieredProject = this.conf.isTieredProjectEnabled)).getOrElse(joinExec)
}
}),
exec[HashAggregateExec](
Expand Down Expand Up @@ -4340,9 +4340,9 @@ object GpuOverrides extends Logging {
(e, conf, p, r) =>
new SparkPlanMeta[ArrowEvalPythonExec](e, conf, p, r) {
val udfs: Seq[BaseExprMeta[PythonUDF]] =
e.udfs.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
e.udfs.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
val resultAttrs: Seq[BaseExprMeta[Attribute]] =
e.resultAttrs.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
e.resultAttrs.map(GpuOverrides.wrapExpr(_, this.conf, Some(this)))
override val childExprs: Seq[BaseExprMeta[_]] = udfs ++ resultAttrs

override def replaceMessage: String = "partially run on GPU"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,18 +30,18 @@ class IcebergProviderImpl extends IcebergProvider {
Seq(new ScanRule[Scan](
(a, conf, p, r) => new ScanMeta[Scan](a, conf, p, r) {
private lazy val convertedScan: Try[GpuSparkBatchQueryScan] = Try {
GpuSparkBatchQueryScan.fromCpu(a, conf)
GpuSparkBatchQueryScan.fromCpu(a, this.conf)
}

override def supportsRuntimeFilters: Boolean = true

override def tagSelfForGpu(): Unit = {
if (!conf.isIcebergEnabled) {
if (!this.conf.isIcebergEnabled) {
willNotWorkOnGpu("Iceberg input and output has been disabled. To enable set " +
s"${RapidsConf.ENABLE_ICEBERG} to true")
}

if (!conf.isIcebergReadEnabled) {
if (!this.conf.isIcebergReadEnabled) {
willNotWorkOnGpu("Iceberg input has been disabled. To enable set " +
s"${RapidsConf.ENABLE_ICEBERG_READ} to true")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,7 +59,7 @@ class HiveProviderImpl extends HiveProviderCmdShims {
}

override def tagExprForGpu(): Unit = {
if (opRapidsFunc.isEmpty && !conf.isCpuBasedUDFEnabled) {
if (opRapidsFunc.isEmpty && !this.conf.isCpuBasedUDFEnabled) {
willNotWorkOnGpu(s"Hive SimpleUDF ${a.name} implemented by " +
s"${a.funcWrapper.functionClassName} does not provide a GPU implementation " +
s"and CPU-based UDFs are not enabled by `${RapidsConf.ENABLE_CPU_BASED_UDF.key}`")
Expand All @@ -78,7 +78,7 @@ class HiveProviderImpl extends HiveProviderCmdShims {
a.deterministic)
}.getOrElse {
// This `require` is just for double check.
require(conf.isCpuBasedUDFEnabled)
require(this.conf.isCpuBasedUDFEnabled)
GpuRowBasedHiveSimpleUDF(
a.name,
a.funcWrapper,
Expand All @@ -101,7 +101,7 @@ class HiveProviderImpl extends HiveProviderCmdShims {
}

override def tagExprForGpu(): Unit = {
if (opRapidsFunc.isEmpty && !conf.isCpuBasedUDFEnabled) {
if (opRapidsFunc.isEmpty && !this.conf.isCpuBasedUDFEnabled) {
willNotWorkOnGpu(s"Hive GenericUDF ${a.name} implemented by " +
s"${a.funcWrapper.functionClassName} does not provide a GPU implementation " +
s"and CPU-based UDFs are not enabled by `${RapidsConf.ENABLE_CPU_BASED_UDF.key}`")
Expand All @@ -121,7 +121,7 @@ class HiveProviderImpl extends HiveProviderCmdShims {
a.foldable)
}.getOrElse {
// This `require` is just for double check.
require(conf.isCpuBasedUDFEnabled)
require(this.conf.isCpuBasedUDFEnabled)
GpuRowBasedHiveGenericUDF(
a.name,
a.funcWrapper,
Expand Down Expand Up @@ -195,11 +195,11 @@ class HiveProviderImpl extends HiveProviderCmdShims {
}

private def checkIfEnabled(): Unit = {
if (!conf.isHiveDelimitedTextEnabled) {
if (!this.conf.isHiveDelimitedTextEnabled) {
willNotWorkOnGpu("Hive text I/O has been disabled. To enable this, " +
s"set ${RapidsConf.ENABLE_HIVE_TEXT} to true")
}
if (!conf.isHiveDelimitedTextReadEnabled) {
if (!this.conf.isHiveDelimitedTextReadEnabled) {
willNotWorkOnGpu("reading Hive delimited text tables has been disabled, " +
s"to enable this, set ${RapidsConf.ENABLE_HIVE_TEXT_READ} to true")
}
Expand Down Expand Up @@ -268,7 +268,7 @@ class HiveProviderImpl extends HiveProviderCmdShims {
TrampolineUtil.dataTypeExistsRecursively(att.dataType, dt => dt == FloatType)
}

if (!conf.shouldHiveReadFloats && hasFloats) {
if (!this.conf.shouldHiveReadFloats && hasFloats) {
willNotWorkOnGpu("reading of floats has been disabled set " +
s"${RapidsConf.ENABLE_READ_HIVE_FLOATS} to true to enable this.")
}
Expand All @@ -277,7 +277,7 @@ class HiveProviderImpl extends HiveProviderCmdShims {
TrampolineUtil.dataTypeExistsRecursively(att.dataType, dt => dt == DoubleType)
}

if (!conf.shouldHiveReadDoubles && hasDoubles) {
if (!this.conf.shouldHiveReadDoubles && hasDoubles) {
willNotWorkOnGpu("reading of doubles has been disabled set " +
s"${RapidsConf.ENABLE_READ_HIVE_DOUBLES} to true to enable this.")
}
Expand All @@ -287,7 +287,7 @@ class HiveProviderImpl extends HiveProviderCmdShims {
dt => dt.isInstanceOf[DecimalType])
}

if (!conf.shouldHiveReadDecimals && hasDecimals) {
if (!this.conf.shouldHiveReadDecimals && hasDecimals) {
willNotWorkOnGpu("reading of decimal typed values has been disabled set " +
s"${RapidsConf.ENABLE_READ_HIVE_DECIMALS} to true to enable this.")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -88,7 +88,7 @@ class AvroProviderImpl extends AvroProvider {
a.readPartitionSchema,
a.options,
a.pushedFilters,
conf,
this.conf,
a.partitionFilters,
a.dataFilters)
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,7 +53,7 @@ object GpuScalaUDFMeta {
lazy val opRapidsFunc = GpuScalaUDF.getRapidsUDFInstance(expr.function)

override def tagExprForGpu(): Unit = {
if (opRapidsFunc.isEmpty && !conf.isCpuBasedUDFEnabled) {
if (opRapidsFunc.isEmpty && !this.conf.isCpuBasedUDFEnabled) {
val udfName = expr.udfName.getOrElse("UDF")
val udfClass = expr.function.getClass
willNotWorkOnGpu(s"neither $udfName implemented by $udfClass provides " +
Expand All @@ -76,7 +76,7 @@ object GpuScalaUDFMeta {
expr.udfDeterministic)
}.getOrElse {
// This `require` is just for double check.
require(conf.isCpuBasedUDFEnabled)
require(this.conf.isCpuBasedUDFEnabled)
GpuRowBasedScalaUDF(
expr.function,
expr.dataType,
Expand Down
Loading