Skip to content

Commit

Permalink
Update Spark 2.x explain API with changes in 22.04 (#5062)
Browse files Browse the repository at this point in the history
* Updated spark2 for changes in 22.04

* More Spark2 updates

* Document EqualNullSafe example in spark 2.x explain api docs

* remove TODO

* Sign off

Signed-off-by: Thomas Graves <tgraves@nvidia.com>
  • Loading branch information
tgravescs authored Mar 28, 2022
1 parent 0063053 commit f07a0aa
Show file tree
Hide file tree
Showing 67 changed files with 1,639 additions and 1,011 deletions.
7 changes: 6 additions & 1 deletion docs/get-started/getting-started-workload-qualification.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ the other is to modify your existing Spark application code to call a function d
Please note that if using adaptive execution in Spark the explain output may not be perfect
as the plan could have changed along the way in a way that we wouldn't see by looking at just
the CPU plan. The same applies if you are using an older version of Spark. Spark planning
may be slightly different if you go up to a newer version of Spark.
may be slightly different when you go up to a newer version of Spark. One example where we have
seen Spark 2.4.X plan differently is in the use of the EqualNullSafe expression. We have seen Spark 2.4.X
use EqualNullSafe but in Spark 3.X it used other expressions to do the same thing. In this case
it shows up as GPU doesn't support EqualNullSafe in the Spark 2.X explain output but when you
go to Spark 3.X those parts would run on the GPU because it is using different operators. This
is something to keep in mind when doing the analysis.

### Using the Configuration Flag for Explain Only Mode

Expand Down
115 changes: 63 additions & 52 deletions scripts/rundiffspark2.sh

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions scripts/spark2diffs/CastExprMeta.diff
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
< // 2.x doesn't have the SQLConf.LEGACY_COMPLEX_TYPES_TO_STRING config, so set it to true
< val legacyCastToString: Boolean = true
---
> val legacyCastToString: Boolean = ShimLoader.getSparkShims.getLegacyComplexTypeToString()
> val legacyCastToString: Boolean = SparkShimImpl.getLegacyComplexTypeToString()
46c45
< if (dt.precision > GpuOverrides.DECIMAL128_MAX_PRECISION) {
---
Expand All @@ -27,13 +27,13 @@
> YearParseUtil.tagParseStringAsDate(conf, this)
85,91c85
< // Spark 2.x: removed check for
< // !ShimLoader.getSparkShims.isCastingStringToNegDecimalScaleSupported
< // !SparkShimImpl.isCastingStringToNegDecimalScaleSupported
< // this dealt with handling a bug fix that is only in newer versions of Spark
< // (https://issues.apache.org/jira/browse/SPARK-37451)
< // Since we don't know what version of Spark 3 they will be using
< // just always say it won't work and they can hopefully figure it out from warning.
< if (dt.scale < 0) {
---
> if (dt.scale < 0 && !ShimLoader.getSparkShims.isCastingStringToNegDecimalScaleSupported) {
120a115
> if (dt.scale < 0 && !SparkShimImpl.isCastingStringToNegDecimalScaleSupported) {
124a119
>
24 changes: 17 additions & 7 deletions scripts/spark2diffs/DateUtils.diff
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
2c2
< * Copyright (c) 2022, NVIDIA CORPORATION.
---
> * Copyright (c) 2020-2021, NVIDIA CORPORATION.
> * Copyright (c) 2020-2022, NVIDIA CORPORATION.
19c19
< import java.time._
---
Expand All @@ -12,35 +12,35 @@
>
23a27
> import org.apache.spark.sql.catalyst.util.DateTimeUtils.localDateToDays
59,60c63,65
61,62c65,67
< // Spark 2.x - removed isSpark320orlater checks
< def specialDatesDays: Map[String, Int] = {
---
> def specialDatesDays: Map[String, Int] = if (isSpark320OrLater) {
> Map.empty
> } else {
71c76,78
73c78,80
< def specialDatesSeconds: Map[String, Long] = {
---
> def specialDatesSeconds: Map[String, Long] = if (isSpark320OrLater) {
> Map.empty
> } else {
73,74c80
75,76c82
< // spark 2.4 Date utils are different
< val now = DateTimeUtils.instantToMicros(Instant.now())
---
> val now = DateTimeUtils.currentTimestamp()
84c90,92
86c92,94
< def specialDatesMicros: Map[String, Long] = {
---
> def specialDatesMicros: Map[String, Long] = if (isSpark320OrLater) {
> Map.empty
> } else {
86c94
88c96
< val now = DateTimeUtils.instantToMicros(Instant.now())
---
> val now = DateTimeUtils.currentTimestamp()
96c104,121
98c106,123
< def currentDate(): Int = Math.toIntExact(LocalDate.now().toEpochDay)
---
> def fetchSpecialDates(unit: DType): Map[String, () => Scalar] = unit match {
Expand All @@ -61,3 +61,13 @@
> }
>
> def currentDate(): Int = localDateToDays(LocalDate.now())
193c218
< meta: RapidsMeta[_, _],
---
> meta: RapidsMeta[_, _, _],
209,211c234
< // Spark 2.x doesn't support, assume false
< val ansiEnabled = false
< if (ansiEnabled) {
---
> if (SQLConf.get.ansiEnabled) {
10 changes: 3 additions & 7 deletions scripts/spark2diffs/GpuCSVScan.diff
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,13 @@
< /*
113d103
< */
143c133
< dateFormatInRead(parsedOptions).foreach { dateFormat =>
---
> ShimLoader.getSparkShims.dateFormatInRead(parsedOptions).foreach { dateFormat =>
190,192c180
154,156c144
<
< // Spark 2.x doesn't have zoneId, so use timeZone and then to id
< if (!TypeChecks.areTimestampsSupported(parsedOptions.timeZone.toZoneId)) {
---
> if (!TypeChecks.areTimestampsSupported(parsedOptions.zoneId)) {
195c183
159c147
< timestampFormatInRead(parsedOptions).foreach { tsFormat =>
---
> ShimLoader.getSparkShims.timestampFormatInRead(parsedOptions).foreach { tsFormat =>
> FileOptionsShims.timestampFormatInRead(parsedOptions).foreach { tsFormat =>
5 changes: 5 additions & 0 deletions scripts/spark2diffs/GpuCsvUtils.diff
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
2,3c2
< // spark 2.x uses FastDateFormat, use getPattern
< def dateFormatInRead(options: CSVOptions): String = options.dateFormat.getPattern
---
> def dateFormatInRead(options: CSVOptions): String = options.dateFormat
14 changes: 9 additions & 5 deletions scripts/spark2diffs/GpuFileSourceScanExec.diff
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
11a12,22
8,10c8,18
< // SPARK 2.x - We leave off Avro here since its a datasource v2 thing and off by default
< case f =>
< meta.willNotWorkOnGpu(s"unsupported file format: ${f.getClass.getCanonicalName}")
---
> case _ => ExternalSource.tagSupportForGpuFileSourceScanExec(meta)
> }
> }
>
> def convertFileFormat(format: FileFormat): FileFormat = {
> format match {
> case _: CSVFileFormat => new GpuReadCSVFileFormat
> case f if GpuOrcFileFormat.isSparkOrcFormat(f) => new GpuReadOrcFileFormat
> case _: ParquetFileFormat => new GpuReadParquetFileFormat
> case _: JsonFileFormat => new GpuReadJsonFileFormat
> case f =>
> throw new IllegalArgumentException(s"${f.getClass.getCanonicalName} is not supported")
> }
> }
> case _ => ExternalSource.convertFileFormatForGpuFileSourceScanExec(format)
2 changes: 2 additions & 0 deletions scripts/spark2diffs/GpuGetArrayItemMeta.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
< parent: Option[RapidsMeta[_, _]],
---
> parent: Option[RapidsMeta[_, _, _]],
6a7
>
6 changes: 3 additions & 3 deletions scripts/spark2diffs/GpuHashJoin.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
< def tagForGpu(joinType: JoinType, meta: RapidsMeta[_, _]): Unit = {
---
> def tagForGpu(joinType: JoinType, meta: RapidsMeta[_, _, _]): Unit = {
69c69
72c72
< object GpuHashJoin {
---
> object GpuHashJoin extends Arm {
101a102
98a99
>
122c123
119c120
< }
---
>
6 changes: 3 additions & 3 deletions scripts/spark2diffs/GpuJoinUtils.diff
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
16,18d15
< package com.nvidia.spark.rapids.shims.v2
< package com.nvidia.spark.rapids.shims
<
< import com.nvidia.spark.rapids.shims.v2._
< import com.nvidia.spark.rapids.shims._
20,26c17
< import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
<
Expand All @@ -11,7 +11,7 @@
< */
< sealed abstract class GpuBuildSide
---
> package com.nvidia.spark.rapids.shims.v2
> package com.nvidia.spark.rapids.shims
28c19
< case object GpuBuildRight extends GpuBuildSide
---
Expand Down
26 changes: 11 additions & 15 deletions scripts/spark2diffs/GpuJsonScan.diff
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
3,16d2
< def dateFormatInRead(fileOptions: Serializable): Option[String] = {
< fileOptions match {
< case jsonOpts: JSONOptions => Option(jsonOpts.dateFormat.getPattern)
< case _ => throw new RuntimeException("Wrong file options.")
< }
< }
3,12d2
< // spark 2.x uses FastDateFormat, use getPattern
< def dateFormatInRead(options: JSONOptions): String = options.dateFormat.getPattern
<
< def timestampFormatInRead(fileOptions: Serializable): Option[String] = {
< fileOptions match {
Expand All @@ -13,7 +9,7 @@
< }
< }
<
41a28,37
37a28,37
> def tagSupport(scanMeta: ScanMeta[JsonScan]) : Unit = {
> val scan = scanMeta.wrapped
> tagSupport(
Expand All @@ -24,20 +20,20 @@
> scanMeta)
> }
>
47c43
43c43
< meta: RapidsMeta[_, _]): Unit = {
---
> meta: RapidsMeta[_, _, _]): Unit = {
109c105
< dateFormatInRead(parsedOptions).foreach { dateFormat =>
106c106
< dateFormatInRead(parsedOptions), parseString = true)
---
> ShimLoader.getSparkShims.dateFormatInRead(parsedOptions).foreach { dateFormat =>
117,118c113
> GpuJsonUtils.dateFormatInRead(parsedOptions), parseString = true)
110,111c110
< // Spark 2.x doesn't have zoneId, so use timeZone and then to id
< if (!TypeChecks.areTimestampsSupported(parsedOptions.timeZone.toZoneId)) {
---
> if (!TypeChecks.areTimestampsSupported(parsedOptions.zoneId)) {
121c116
114c113
< timestampFormatInRead(parsedOptions).foreach { tsFormat =>
---
> ShimLoader.getSparkShims.timestampFormatInRead(parsedOptions).foreach { tsFormat =>
> FileOptionsShims.timestampFormatInRead(parsedOptions).foreach { tsFormat =>
Loading

0 comments on commit f07a0aa

Please sign in to comment.