Skip to content

Commit

Permalink
Add bytes-based solution and shims
Browse files Browse the repository at this point in the history
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
  • Loading branch information
thirtiseven committed Dec 27, 2023
1 parent 8b03fa1 commit e6d3f97
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 90 deletions.
2 changes: 1 addition & 1 deletion docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.ArrayUnion"></a>spark.rapids.sql.expression.ArrayUnion|`array_union`|Returns an array of the elements in the union of array1 and array2, without duplicates.|true|This is not 100% compatible with the Spark version because the GPU implementation treats -0.0 and 0.0 as equal, but the CPU implementation currently does not (see SPARK-39845). Also, Apache Spark 3.1.3 fixed issue SPARK-36741 where NaNs in these set like operators were not treated as being equal. We have chosen to break with compatibility for the older versions of Spark in this instance and handle NaNs the same as 3.1.3+|
<a name="sql.expression.ArraysOverlap"></a>spark.rapids.sql.expression.ArraysOverlap|`arrays_overlap`|Returns true if a1 contains at least a non-null element present also in a2. If the arrays have no common element and they are both non-empty and either of them contains a null element null is returned, false otherwise.|true|This is not 100% compatible with the Spark version because the GPU implementation treats -0.0 and 0.0 as equal, but the CPU implementation currently does not (see SPARK-39845). Also, Apache Spark 3.1.3 fixed issue SPARK-36741 where NaNs in these set like operators were not treated as being equal. We have chosen to break with compatibility for the older versions of Spark in this instance and handle NaNs the same as 3.1.3+|
<a name="sql.expression.ArraysZip"></a>spark.rapids.sql.expression.ArraysZip|`arrays_zip`|Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays.|true|None|
<a name="sql.expression.Ascii"></a>spark.rapids.sql.expression.Ascii|`ascii`|The numeric value of the first character of string data.|false|This is disabled by default because it only supports strings starting with ASCII or Latin-1 characters. Otherwise the results will not match the CPU.|
<a name="sql.expression.Ascii"></a>spark.rapids.sql.expression.Ascii|`ascii`|The numeric value of the first character of string data.|false|This is disabled by default because it only supports strings starting with ASCII or Latin-1 characters after Spark 3.2.3, 3.3.1 and 3.4.0. Otherwise the results will not match the CPU.|
<a name="sql.expression.Asin"></a>spark.rapids.sql.expression.Asin|`asin`|Inverse sine|true|None|
<a name="sql.expression.Asinh"></a>spark.rapids.sql.expression.Asinh|`asinh`|Inverse hyperbolic sine|true|None|
<a name="sql.expression.AtLeastNNonNulls"></a>spark.rapids.sql.expression.AtLeastNNonNulls| |Checks if number of non null/Nan values is greater than a given value|true|None|
Expand Down
2 changes: 1 addition & 1 deletion docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -2866,7 +2866,7 @@ are limited.
<td rowSpan="2">Ascii</td>
<td rowSpan="2">`ascii`</td>
<td rowSpan="2">The numeric value of the first character of string data.</td>
<td rowSpan="2">This is disabled by default because it only supports strings starting with ASCII or Latin-1 characters. Otherwise the results will not match the CPU.</td>
<td rowSpan="2">This is disabled by default because it only supports strings starting with ASCII or Latin-1 characters after Spark 3.2.3, 3.3.1 and 3.4.0. Otherwise the results will not match the CPU.</td>
<td rowSpan="2">project</td>
<td>input</td>
<td> </td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ import org.apache.spark.sql.rapids.catalyst.expressions.GpuRand
import org.apache.spark.sql.rapids.execution._
import org.apache.spark.sql.rapids.execution.python._
import org.apache.spark.sql.rapids.execution.python.GpuFlatMapGroupsInPandasExecMeta
import org.apache.spark.sql.rapids.shims.GpuTimeAdd
import org.apache.spark.sql.rapids.shims.{GpuAscii, GpuTimeAdd}
import org.apache.spark.sql.rapids.zorder.ZOrderRules
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
Expand Down Expand Up @@ -3722,8 +3722,8 @@ object GpuOverrides extends Logging {
ExprChecks.unaryProject(TypeSig.INT, TypeSig.INT, TypeSig.STRING, TypeSig.STRING),
(a, conf, p, r) => new UnaryExprMeta[Ascii](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression = GpuAscii(child)
}).disabledByDefault("it only supports strings starting with ASCII or Latin-1 characters." +
" Otherwise the results will not match the CPU."),
}).disabledByDefault("it only supports strings starting with ASCII or Latin-1 characters " +
"after Spark 3.2.3, 3.3.1 and 3.4.0. Otherwise the results will not match the CPU."),
expr[GetArrayStructFields](
"Extracts the `ordinal`-th fields of all array elements for the data with the type of" +
" array of struct",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,89 +92,6 @@ case class GpuOctetLength(child: Expression) extends GpuUnaryExpression with Exp
input.getBase.getByteCount
}

case class GpuAscii(child: Expression) extends GpuUnaryExpression with ImplicitCastInputTypes
with NullIntolerant {

override def dataType: DataType = IntegerType
override def inputTypes: Seq[AbstractDataType] = Seq(StringType)

private def utf8CodePointsToAscii(codePoints: ColumnVector): ColumnVector = {
// Currently we only support ASCII characters, so we need to convert the UTF8 code points
// to ASCII code points. Results for code points outside range [0, 255] are undefined.
// seg A: 0 <= codePoints < 128, already ASCII
// seg B: 49792 <= codePoints < 49856, ASCII = codePoints - 49664
// seg C: 50048 <= codePoints < 50112, ASCII = codePoints - 49856
//
// To reduce cuDF API calling, following algorithm will be performed:
// 1. For anything above 49792, we subtract 49664, now seg A and B are correct.
// 2. seg C: 50048 <= current + 49664 < 50112 => 384 <= current < 448, ASCII = current - 192
// So for anything above 384, we subtract 192, now seg C is correct too.
val greaterThan49792 = withResource(Scalar.fromInt(49792)) { segBLeftEnd =>
codePoints.greaterOrEqualTo(segBLeftEnd)
}
val segAB = withResource(greaterThan49792) { _ =>
val sub1 = withResource(Scalar.fromInt(49664)) { segBValue =>
codePoints.sub(segBValue)
}
withResource(sub1) { _ =>
greaterThan49792.ifElse(sub1, codePoints)
}
}
withResource(segAB) { _ =>
val geraterThan384 = withResource(Scalar.fromInt(384)) { segCLeftEnd =>
segAB.greaterOrEqualTo(segCLeftEnd)
}
withResource(geraterThan384) { _ =>
val sub2 = withResource(Scalar.fromInt(192)) { segCValue =>
segAB.sub(segCValue)
}
withResource(sub2) { _ =>
geraterThan384.ifElse(sub2, segAB)
}
}
}
}

override def doColumnar(input: GpuColumnVector): ColumnVector = {
// replace empty strings with 'NUL' (which will convert to ascii 0)
val emptyMask = withResource(Scalar.fromString("")) { emptyScalar =>
input.getBase.equalTo(emptyScalar)
}
val emptyReplaced = withResource(emptyMask) { _ =>
// replace empty strings with 'NUL' (which will convert to ascii 0)
withResource(Scalar.fromString('\u0000'.toString)) { zeroScalar =>
emptyMask.ifElse(zeroScalar, input.getBase)
}
}
// replace nulls with 'n' and save the null mask
val nullMask = closeOnExcept(emptyReplaced) { _ =>
input.getBase.isNull
}
withResource(nullMask) { _ =>
val nullsReplaced = withResource(emptyReplaced) { _ =>
withResource(Scalar.fromString("n")) { nullScalar =>
nullMask.ifElse(nullScalar, emptyReplaced)
}
}
val substr = withResource(nullsReplaced) { _ =>
nullsReplaced.substring(0, 1)
}
val codePoints = withResource(substr) { _ =>
substr.codePoints()
}
val segABC = withResource(codePoints) { _ =>
utf8CodePointsToAscii(codePoints)
}
// replace nulls with null
withResource(segABC) { _ =>
withResource(Scalar.fromNull(DType.INT32)) { nullScalar =>
nullMask.ifElse(nullScalar, segABC)
}
}
}
}
}

case class GpuStringLocate(substr: Expression, col: Expression, start: Expression)
extends GpuTernaryExpressionArgsScalarAnyScalar
with ImplicitCastInputTypes {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "311"}
{"spark": "312"}
{"spark": "313"}
{"spark": "320"}
{"spark": "321"}
{"spark": "321cdh"}
{"spark": "321db"}
{"spark": "322"}
{"spark": "330"}
{"spark": "330cdh"}
{"spark": "330db"}
spark-rapids-shim-json-lines ***/

package org.apache.spark.sql.rapids.shims

import ai.rapids.cudf.{ColumnVector, DType, Scalar}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm._

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

case class GpuAscii(child: Expression) extends GpuUnaryExpression with ImplicitCastInputTypes
with NullIntolerant {

override def dataType: DataType = IntegerType
override def inputTypes: Seq[AbstractDataType] = Seq(StringType)

override def doColumnar(input: GpuColumnVector): ColumnVector = {
// convert to byte lists
val firstBytes = withResource(input.getBase.asByteList) { bytes =>
bytes.extractListElement(0)
}
val firstBytesInt = withResource(firstBytes) { _ =>
firstBytes.castTo(DType.INT32)
}
withResource(firstBytesInt) { _ =>
val greaterThan127 = withResource(Scalar.fromInt(127)) { scalar =>
firstBytesInt.greaterThan(scalar)
}
withResource(greaterThan127) { _ =>
val sub256 = withResource(Scalar.fromInt(256)) { scalar =>
firstBytesInt.sub(scalar)
}
withResource(sub256) { _ =>
greaterThan127.ifElse(sub256, firstBytesInt)
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "323"}
{"spark": "324"}
{"spark": "331"}
{"spark": "332"}
{"spark": "332cdh"}
{"spark": "332db"}
{"spark": "333"}
{"spark": "340"}
{"spark": "341"}
{"spark": "341db"}
{"spark": "350"}
spark-rapids-shim-json-lines ***/

package org.apache.spark.sql.rapids.shims

import ai.rapids.cudf.{ColumnVector, DType, Scalar}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm._

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

case class GpuAscii(child: Expression) extends GpuUnaryExpression with ImplicitCastInputTypes
with NullIntolerant {

override def dataType: DataType = IntegerType
override def inputTypes: Seq[AbstractDataType] = Seq(StringType)

private def utf8CodePointsToAscii(codePoints: ColumnVector): ColumnVector = {
// Currently we only support ASCII characters, so we need to convert the UTF8 code points
// to ASCII code points. Results for code points outside range [0, 255] are undefined.
// seg A: 0 <= codePoints < 128, already ASCII
// seg B: 49792 <= codePoints < 49856, ASCII = codePoints - 49664
// seg C: 50048 <= codePoints < 50112, ASCII = codePoints - 49856
//
// To reduce cuDF API calling, following algorithm will be performed:
// 1. For anything above 49792, we subtract 49664, now seg A and B are correct.
// 2. seg C: 50048 <= current + 49664 < 50112 => 384 <= current < 448, ASCII = current - 192
// So for anything above 384, we subtract 192, now seg C is correct too.
val greaterThan49792 = withResource(Scalar.fromInt(49792)) { segBLeftEnd =>
codePoints.greaterOrEqualTo(segBLeftEnd)
}
val segAB = withResource(greaterThan49792) { _ =>
val sub1 = withResource(Scalar.fromInt(49664)) { segBValue =>
codePoints.sub(segBValue)
}
withResource(sub1) { _ =>
greaterThan49792.ifElse(sub1, codePoints)
}
}
withResource(segAB) { _ =>
val geraterThan384 = withResource(Scalar.fromInt(384)) { segCLeftEnd =>
segAB.greaterOrEqualTo(segCLeftEnd)
}
withResource(geraterThan384) { _ =>
val sub2 = withResource(Scalar.fromInt(192)) { segCValue =>
segAB.sub(segCValue)
}
withResource(sub2) { _ =>
geraterThan384.ifElse(sub2, segAB)
}
}
}
}

override def doColumnar(input: GpuColumnVector): ColumnVector = {
// replace empty strings with 'NUL' (which will convert to ascii 0)
val emptyMask = withResource(Scalar.fromString("")) { emptyScalar =>
input.getBase.equalTo(emptyScalar)
}
val emptyReplaced = withResource(emptyMask) { _ =>
// replace empty strings with 'NUL' (which will convert to ascii 0)
withResource(Scalar.fromString('\u0000'.toString)) { zeroScalar =>
emptyMask.ifElse(zeroScalar, input.getBase)
}
}
// replace nulls with 'n' and save the null mask
val nullMask = closeOnExcept(emptyReplaced) { _ =>
input.getBase.isNull
}
withResource(nullMask) { _ =>
val nullsReplaced = withResource(emptyReplaced) { _ =>
withResource(Scalar.fromString("n")) { nullScalar =>
nullMask.ifElse(nullScalar, emptyReplaced)
}
}
val substr = withResource(nullsReplaced) { _ =>
nullsReplaced.substring(0, 1)
}
val codePoints = withResource(substr) { _ =>
substr.codePoints()
}
val segABC = withResource(codePoints) { _ =>
utf8CodePointsToAscii(codePoints)
}
// replace nulls with null
withResource(segABC) { _ =>
withResource(Scalar.fromNull(DType.INT32)) { nullScalar =>
nullMask.ifElse(nullScalar, segABC)
}
}
}
}
}
4 changes: 2 additions & 2 deletions tools/generated_files/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ ArraysOverlap,S,`arrays_overlap`,This is not 100% compatible with the Spark vers
ArraysOverlap,S,`arrays_overlap`,This is not 100% compatible with the Spark version because the GPU implementation treats -0.0 and 0.0 as equal; but the CPU implementation currently does not (see SPARK-39845). Also; Apache Spark 3.1.3 fixed issue SPARK-36741 where NaNs in these set like operators were not treated as being equal. We have chosen to break with compatibility for the older versions of Spark in this instance and handle NaNs the same as 3.1.3+,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
ArraysZip,S,`arrays_zip`,None,project,children,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA
ArraysZip,S,`arrays_zip`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA
Ascii,NS,`ascii`,This is disabled by default because it only supports strings starting with ASCII or Latin-1 characters. Otherwise the results will not match the CPU.,project,input,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
Ascii,NS,`ascii`,This is disabled by default because it only supports strings starting with ASCII or Latin-1 characters. Otherwise the results will not match the CPU.,project,result,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Ascii,NS,`ascii`,This is disabled by default because it only supports strings starting with ASCII or Latin-1 characters after Spark 3.2.3; 3.3.1 and 3.4.0. Otherwise the results will not match the CPU.,project,input,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
Ascii,NS,`ascii`,This is disabled by default because it only supports strings starting with ASCII or Latin-1 characters after Spark 3.2.3; 3.3.1 and 3.4.0. Otherwise the results will not match the CPU.,project,result,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Asin,S,`asin`,None,project,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Asin,S,`asin`,None,project,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Asin,S,`asin`,None,AST,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Expand Down

0 comments on commit e6d3f97

Please sign in to comment.