Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flatten simple 4+ nesting of withResource #6833

Merged
merged 31 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
87fc915
wip
gerashegalov Oct 17, 2022
8d14610
Flatten simple 4+ nesting of withResource
gerashegalov Oct 18, 2022
70d049a
andThen
gerashegalov Oct 18, 2022
68e1427
parquet
gerashegalov Oct 18, 2022
15b32a9
wip
gerashegalov Oct 18, 2022
b3facf3
wip
gerashegalov Oct 18, 2022
c60d56a
wip
gerashegalov Oct 18, 2022
5edaa88
scalastyle and spark330
gerashegalov Oct 18, 2022
ce30bc3
unleak
gerashegalov Oct 18, 2022
b936c41
review
gerashegalov Oct 19, 2022
5c28444
Fix reduceLeft
gerashegalov Oct 19, 2022
e3b56b3
revert stringFunctions
gerashegalov Oct 19, 2022
ee3ed4c
Merge branch 'branch-22.12' into reduceWithResourceScope
gerashegalov Oct 19, 2022
b229af7
reformat for review
gerashegalov Oct 19, 2022
0c26da0
more reviews
gerashegalov Oct 19, 2022
aaad636
more formatting
gerashegalov Oct 19, 2022
27a9580
scalastyle
gerashegalov Oct 19, 2022
6f04fd0
Parquet condition flipped
gerashegalov Oct 20, 2022
fdda831
Merge remote-tracking branch 'origin/branch-22.12' into reduceWithRes…
gerashegalov Oct 20, 2022
bc7424d
restore RapidsErrorUtis
gerashegalov Oct 20, 2022
9d95272
Merge remote-tracking branch 'origin/branch-22.12' into reduceWithRes…
gerashegalov Oct 21, 2022
9164def
WIP on detangling withresource
gerashegalov Oct 21, 2022
7506ce5
more unnesting
gerashegalov Oct 21, 2022
537751a
unnest more
gerashegalov Oct 21, 2022
d12b773
wip
gerashegalov Oct 22, 2022
cad5763
regex
gerashegalov Oct 22, 2022
9b32ba3
review
gerashegalov Oct 24, 2022
00127ad
revews
gerashegalov Oct 25, 2022
fbdf705
Merge remote-tracking branch 'gerashegalov/reduceWithResourceScope' i…
gerashegalov Oct 25, 2022
7056c94
fix double close
gerashegalov Oct 25, 2022
050e3ad
another leak
gerashegalov Oct 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, ColumnView, DType, Scalar}
import com.nvidia.spark.rapids.{Arm, BoolUtils, CloseableHolder}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.sql.catalyst.util.DateTimeConstants.{MICROS_PER_DAY, MICROS_PER_HOUR, MICROS_PER_MINUTE, MICROS_PER_SECOND, MONTHS_PER_YEAR}
import org.apache.spark.sql.rapids.shims.IntervalUtils
Expand Down Expand Up @@ -292,17 +293,15 @@ object GpuIntervalUtils extends Arm {
// not close firstSignInTable and secondSignInTable here, outer table.close will close them
private def finalSign(
firstSignInTable: ColumnVector, secondSignInTable: ColumnVector): ColumnVector = {
withResource(Scalar.fromString("-")) { negScalar =>
withResource(negScalar.equalTo(firstSignInTable)) { neg1 =>
withResource(negScalar.equalTo(secondSignInTable)) { neg2 =>
withResource(neg1.bitXor(neg2)) { s =>
withResource(Scalar.fromLong(1L)) { one =>
withResource(Scalar.fromLong(-1L)) { negOne =>
s.ifElse(negOne, one)
}
}
}
}
val negatives = withResource(Scalar.fromString("-")) { negScalar =>
withResource(Seq(firstSignInTable, secondSignInTable).safeMap(negScalar.equalTo)) {
case Seq(neg1, neg2) => neg1.bitXor(neg2)
}
}

withResource(negatives) { _ =>
withResource(Seq(1L, -1L).safeMap(Scalar.fromLong)) { case Seq(posOne, negOne) =>
negatives.ifElse(negOne, posOne)
}
}
}
Expand All @@ -315,15 +314,18 @@ object GpuIntervalUtils extends Arm {
* @return micros column
*/
private def getMicrosFromDecimal(sign: ColumnVector, decimal: ColumnVector): ColumnVector = {
withResource(decimal.castTo(DType.create(DType.DTypeEnum.DECIMAL64, -6))) { decimal =>
withResource(Scalar.fromLong(1000000L)) { million =>
withResource(decimal.mul(million)) { r =>
withResource(r.asLongs()) { l =>
l.mul(sign)
}
}
val decimalType64_6 = DType.create(DType.DTypeEnum.DECIMAL64, -6)
val timesMillion = withResource(Scalar.fromLong(1000000L)) { million =>
withResource(decimal.castTo(decimalType64_6)) {
_.mul(million)
}
}
val timesMillionLongs = withResource(timesMillion) {
_.asLongs()
}
withResource(timesMillionLongs) {
_.mul(sign)
}
}

private def addFromDayToDay(
Expand Down
14 changes: 8 additions & 6 deletions sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ object RebaseHelper extends Arm {
// https://github.com/NVIDIA/spark-rapids/issues/1126
val dtype = column.getType
if (dtype == DType.TIMESTAMP_DAYS) {
withResource(Scalar.timestampDaysFromInt(startDay)) { minGood =>
withResource(column.lessThan(minGood)) { hasBad =>
withResource(hasBad.any()) { a =>
a.isValid && a.getBoolean
}
}
val hasBad = withResource(Scalar.timestampDaysFromInt(startDay)) {
column.lessThan
}
val anyBad = withResource(hasBad) {
_.any()
}
withResource(anyBad) { _ =>
anyBad.isValid && anyBad.getBoolean
}
} else {
false
Expand Down
31 changes: 17 additions & 14 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCSVScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -340,22 +340,25 @@ class CSVPartitionReader(
* CSV supports "true" and "false" (case-insensitive) as valid boolean values.
*/
override def castStringToBool(input: ColumnVector): ColumnVector = {
withResource(input.strip()) { stripped =>
withResource(stripped.lower()) { lower =>
withResource(Scalar.fromString("true")) { t =>
withResource(Scalar.fromString("false")) { f =>
withResource(lower.equalTo(t)) { isTrue =>
withResource(lower.equalTo(f)) { isFalse =>
withResource(isTrue.or(isFalse)) { isValidBool =>
withResource(Scalar.fromNull(DType.BOOL8)) { nullBool =>
isValidBool.ifElse(isTrue, nullBool)
}
}
}
}
}
val lowerStripped = withResource(input.strip()) {
_.lower()
}

val (isTrue, isValidBool) = withResource(lowerStripped) { _ =>
val isTrueRes = withResource(Scalar.fromString(true.toString)) {
lowerStripped.equalTo
}
val isValidBoolRes = closeOnExcept(isTrueRes) { _ =>
withResource(ColumnVector.fromStrings(true.toString, false.toString)) {
lowerStripped.contains
}
}
(isTrueRes, isValidBoolRes)
}
withResource(Seq(isTrue, isValidBool)) { _ =>
withResource(Scalar.fromNull(DType.BOOL8)) {
isValidBool.ifElse(isTrue, _)
}
}
}

Expand Down
Loading