Skip to content

Commit

Permalink
Fix auto merge conflict 5789 (#5790)
Browse files Browse the repository at this point in the history
* Correct the error message for test_mod_pmod_by_zero (#5781)

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Update the error string for test_cast_neg_to_decimal_err on 330[databricks] (#5784)

* Update the error string for test_cast_neg_to_decimal_err on 330

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* address comments

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Throw an exception when attempting to read columnar encrypted Parquet files on the GPU [databricks] (#5761)

* Throw useful message when parquet columnar encryption enabled

* update message

* fix message

* handle native encrypted

* move variable

* cleanup

* fix native check

* cleanup imports

* fix import order

* Sign off

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* Shim the parquet crypto exception check

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* shim 320cdh

* Add test for parquet encryption

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* fix rounds over decimal in Spark 330+ (#5786)

Passes the datatype of round-like functions directly to GPU overrides, so as to adapt different Spark versions.

Signed-off-by: sperlingxx <lovedreamf@gmail.com>

* Fix the overflow of container type when casting floats to decimal (#5766)

Fixes #5765

Fix the potential overflow when casting float/double to decimal. The overflow occurs on the container decimal for HALF_UP round.
 
Signed-off-by: sperlingxx <lovedreamf@gmail.com>

Co-authored-by: Liangcai Li <firestarmanllc@gmail.com>
Co-authored-by: Alfred Xu <lovedreamf@gmail.com>
  • Loading branch information
3 people authored Jun 8, 2022
1 parent f546edd commit aba390e
Show file tree
Hide file tree
Showing 14 changed files with 287 additions and 66 deletions.
32 changes: 32 additions & 0 deletions integration_tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,32 @@
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
<version>${maven.clean.plugin.version}</version>
<executions>
<execution>
<id>clean-copy</id>
<phase>package</phase>
<goals>
<goal>clean</goal>
</goals>
<configuration>
<excludeDefaultDirectories>true</excludeDefaultDirectories>
<filesets>
<filesets>
<directory>target/dependency</directory>
<includes>
<include>parquet-hadoop*.jar</include>
<include>spark-avro*.jar</include>
</includes>
</filesets>
</filesets>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
Expand All @@ -288,6 +314,12 @@
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.hadoop.version}</version>
<classifier>tests</classifier>
</artifactItem>
</artifactItems>
</configuration>
</execution>
Expand Down
8 changes: 7 additions & 1 deletion integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,16 @@ else
if [ -d "$LOCAL_JAR_PATH" ]; then
AVRO_JARS=$(echo "$LOCAL_JAR_PATH"/spark-avro*.jar)
PLUGIN_JARS=$(echo "$LOCAL_JAR_PATH"/rapids-4-spark_*.jar)
# TODO - need to update jenkins scripts to upload this jar
# https://github.com/NVIDIA/spark-rapids/issues/5771
export INCLUDE_PARQUET_HADOOP_TEST_JAR=false
PARQUET_HADOOP_TESTS=
# the integration-test-spark3xx.jar, should not include the integration-test-spark3xxtest.jar
TEST_JARS=$(echo "$LOCAL_JAR_PATH"/rapids-4-spark-integration-tests*-$INTEGRATION_TEST_VERSION.jar)
else
AVRO_JARS=$(echo "$SCRIPTPATH"/target/dependency/spark-avro*.jar)
PARQUET_HADOOP_TESTS=$(echo "$SCRIPTPATH"/target/dependency/parquet-hadoop*.jar)
export INCLUDE_PARQUET_HADOOP_TEST_JAR=true
PLUGIN_JARS=$(echo "$SCRIPTPATH"/../dist/target/rapids-4-spark_*.jar)
# the integration-test-spark3xx.jar, should not include the integration-test-spark3xxtest.jar
TEST_JARS=$(echo "$SCRIPTPATH"/target/rapids-4-spark-integration-tests*-$INTEGRATION_TEST_VERSION.jar)
Expand All @@ -70,7 +76,7 @@ else
fi

# Only 3 jars: dist.jar integration-test.jar avro.jar
ALL_JARS="$PLUGIN_JARS $TEST_JARS $AVRO_JARS"
ALL_JARS="$PLUGIN_JARS $TEST_JARS $AVRO_JARS $PARQUET_HADOOP_TESTS"
echo "AND PLUGIN JARS: $ALL_JARS"
if [[ "${TEST}" != "" ]];
then
Expand Down
22 changes: 18 additions & 4 deletions integration_tests/src/main/python/arithmetic_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,12 @@ def test_mod_pmod_long_min_value():
'cast(-12 as {}) % cast(0 as {})'], ids=idfn)
def test_mod_pmod_by_zero(data_gen, overflow_exp):
string_type = to_cast_string(data_gen.data_type)
exception_str = "java.lang.ArithmeticException: divide by zero" if is_before_spark_320() else \
"org.apache.spark.SparkArithmeticException: divide by zero"
if is_before_spark_320():
exception_str = 'java.lang.ArithmeticException: divide by zero'
elif is_before_spark_330():
exception_str = 'SparkArithmeticException: divide by zero'
else:
exception_str = 'SparkArithmeticException: Division by zero'

assert_gpu_and_cpu_error(
lambda spark : unary_op_df(spark, data_gen).selectExpr(
Expand Down Expand Up @@ -483,9 +487,19 @@ def test_shift_right_unsigned(data_gen):
'shiftrightunsigned(a, cast(null as INT))',
'shiftrightunsigned(a, b)'))

_arith_data_gens_for_round = numeric_gens + _arith_decimal_gens_no_neg_scale + [
decimal_gen_32bit_neg_scale,
DecimalGen(precision=15, scale=-8),
DecimalGen(precision=30, scale=-5),
pytest.param(_decimal_gen_36_neg5, marks=pytest.mark.skipif(
is_spark_330_or_later(), reason='This case overflows in Spark 3.3.0+')),
pytest.param(_decimal_gen_38_neg10, marks=pytest.mark.skipif(
is_spark_330_or_later(), reason='This case overflows in Spark 3.3.0+'))
]

@incompat
@approximate_float
@pytest.mark.parametrize('data_gen', _arith_data_gens, ids=idfn)
@pytest.mark.parametrize('data_gen', _arith_data_gens_for_round, ids=idfn)
def test_decimal_bround(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen).selectExpr(
Expand All @@ -497,7 +511,7 @@ def test_decimal_bround(data_gen):

@incompat
@approximate_float
@pytest.mark.parametrize('data_gen', _arith_data_gens, ids=idfn)
@pytest.mark.parametrize('data_gen', _arith_data_gens_for_round, ids=idfn)
def test_decimal_round(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen).selectExpr(
Expand Down
37 changes: 36 additions & 1 deletion integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

import pytest

Expand All @@ -20,7 +21,7 @@
from marks import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_320, is_before_spark_330, is_spark_321cdh
from conftest import is_databricks_runtime


Expand Down Expand Up @@ -969,3 +970,37 @@ def test_parquet_check_schema_compatibility_nested_types(spark_tmp_path):
lambda: with_gpu_session(
lambda spark: spark.read.schema(read_map_str_str_as_str_int).parquet(data_path).collect()),
error_message='Parquet column cannot be converted')

@pytest.mark.skipif(is_before_spark_320() or is_spark_321cdh(), reason='Encryption is not supported before Spark 3.2.0 or Parquet < 1.12')
@pytest.mark.skipif(os.environ.get('INCLUDE_PARQUET_HADOOP_TEST_JAR', 'false') == 'false', reason='INCLUDE_PARQUET_HADOOP_TEST_JAR is disabled')
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
def test_parquet_read_encryption(spark_tmp_path, reader_confs, v1_enabled_list):

data_path = spark_tmp_path + '/PARQUET_DATA'
gen_list = [('one', int_gen), ('two', byte_gen), ('THREE', boolean_gen)]

encryption_confs = {
'parquet.encryption.kms.client.class': 'org.apache.parquet.crypto.keytools.mocks.InMemoryKMS',
'parquet.encryption.key.list': 'keyA:AAECAwQFBgcICQoLDA0ODw== , keyB:AAECAAECAAECAAECAAECAA==',
'parquet.crypto.factory.class': 'org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory'
}

conf = copy_and_update(reader_confs, encryption_confs)

with_cpu_session(
lambda spark : gen_df(spark, gen_list).write.
option("parquet.encryption.column.keys" , "keyA:one").
option("parquet.encryption.footer.key" , "keyB").
parquet(data_path), conf=encryption_confs)

# test with missing encryption conf reading encrypted file
assert_py4j_exception(
lambda: with_gpu_session(
lambda spark: spark.read.parquet(data_path).collect()),
error_message='Could not read footer for file')

assert_py4j_exception(
lambda: with_gpu_session(
lambda spark: spark.read.parquet(data_path).collect(), conf=conf),
error_message='The GPU does not support reading encrypted Parquet files')
3 changes: 3 additions & 0 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ def is_before_spark_340():
def is_spark_330_or_later():
return spark_version() >= "3.3.0"

def is_spark_321cdh():
return "3.2.1.3.2.717" in spark_version()

def is_databricks_version_or_later(major, minor):
spark = get_spark_i_know_what_i_am_doing()
version = spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion", "0.0")
Expand Down
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
<properties>
<spark.version>${spark311.version}</spark.version>
<spark.test.version>${spark311.version}</spark.test.version>
<parquet.hadoop.version>1.10.1</parquet.hadoop.version>
</properties>
<build>
<plugins>
Expand Down Expand Up @@ -166,6 +167,7 @@
<spark.test.version>${spark312db.version}</spark.test.version>
<hadoop.client.version>2.7.4</hadoop.client.version>
<rat.consoleOutput>true</rat.consoleOutput>
<parquet.hadoop.version>1.10.1</parquet.hadoop.version>
</properties>
<build>
<plugins>
Expand Down Expand Up @@ -216,6 +218,7 @@
<properties>
<spark.version>${spark312.version}</spark.version>
<spark.test.version>${spark312.version}</spark.test.version>
<parquet.hadoop.version>1.10.1</parquet.hadoop.version>
</properties>
<build>
<plugins>
Expand Down Expand Up @@ -270,6 +273,7 @@
<properties>
<spark.version>${spark313.version}</spark.version>
<spark.test.version>${spark313.version}</spark.test.version>
<parquet.hadoop.version>1.10.1</parquet.hadoop.version>
</properties>
<build>
<plugins>
Expand Down Expand Up @@ -323,6 +327,7 @@
<properties>
<spark.version>${spark314.version}</spark.version>
<spark.test.version>${spark314.version}</spark.test.version>
<parquet.hadoop.version>1.10.1</parquet.hadoop.version>
</properties>
<build>
<plugins>
Expand Down Expand Up @@ -386,6 +391,7 @@
<properties>
<spark.version>${spark320.version}</spark.version>
<spark.test.version>${spark320.version}</spark.test.version>
<parquet.hadoop.version>1.12.1</parquet.hadoop.version>
</properties>
<build>
<plugins>
Expand Down Expand Up @@ -452,6 +458,7 @@
<properties>
<spark.version>${spark321.version}</spark.version>
<spark.test.version>${spark321.version}</spark.test.version>
<parquet.hadoop.version>1.12.2</parquet.hadoop.version>
</properties>
<build>
<plugins>
Expand Down Expand Up @@ -519,6 +526,7 @@
<properties>
<spark.version>${spark321cdh.version}</spark.version>
<spark.test.version>${spark321cdh.version}</spark.test.version>
<parquet.hadoop.version>1.10.1</parquet.hadoop.version>
</properties>
<build>
<plugins>
Expand Down Expand Up @@ -590,6 +598,7 @@
<properties>
<spark.version>${spark322.version}</spark.version>
<spark.test.version>${spark322.version}</spark.test.version>
<parquet.hadoop.version>1.12.2</parquet.hadoop.version>
</properties>
<build>
<plugins>
Expand Down Expand Up @@ -670,6 +679,7 @@
<spark.test.version>${spark321db.version}</spark.test.version>
<hadoop.client.version>3.3.1</hadoop.client.version>
<rat.consoleOutput>true</rat.consoleOutput>
<parquet.hadoop.version>1.12.0</parquet.hadoop.version>
</properties>
<build>
<plugins>
Expand Down Expand Up @@ -723,6 +733,7 @@
<properties>
<spark.version>${spark330.version}</spark.version>
<spark.test.version>${spark330.version}</spark.test.version>
<parquet.hadoop.version>1.12.2</parquet.hadoop.version>
</properties>
<build>
<plugins>
Expand Down Expand Up @@ -785,6 +796,7 @@
<properties>
<spark.version>${spark340.version}</spark.version>
<spark.test.version>${spark340.version}</spark.test.version>
<parquet.hadoop.version>1.12.3</parquet.hadoop.version>
</properties>
<build>
<plugins>
Expand Down Expand Up @@ -887,6 +899,7 @@
<java.major.version>8</java.major.version>
<spark.version>${spark311.version}</spark.version>
<spark.test.version>${spark.version}</spark.test.version>
<parquet.hadoop.version>1.10.1</parquet.hadoop.version>
<spark.version.classifier>spark${buildver}</spark.version.classifier>
<cuda.version>cuda11</cuda.version>
<spark-rapids-jni.version>22.08.0-SNAPSHOT</spark-rapids-jni.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.shims

object GpuParquetCrypto {
/**
* Columnar encryption was added in Spark 3.2.0
*/
def isColumnarCryptoException(e: Throwable): Boolean = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.shims

import org.apache.parquet.crypto.ParquetCryptoRuntimeException

object GpuParquetCrypto {
/**
* Columnar encryption was added in Spark 3.2.0
*/
def isColumnarCryptoException(e: Throwable): Boolean = {
e match {
case crypto: ParquetCryptoRuntimeException => true
case _ => false
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.shims

object GpuParquetCrypto {
/**
* Columnar encryption was added in Spark 3.2.0 but CDH doesn't have Parquet 1.12.
*/
def isColumnarCryptoException(e: Throwable): Boolean = false
}
14 changes: 11 additions & 3 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1464,12 +1464,20 @@ object GpuCast extends Arm {
val targetType = DecimalUtil.createCudfDecimal(dt)
// If target scale reaches DECIMAL128_MAX_PRECISION, container DECIMAL can not
// be created because of precision overflow. In this case, we perform casting op directly.
val casted = if (targetType.getDecimalMaxPrecision == dt.scale) {
val casted = if (DType.DECIMAL128_MAX_PRECISION == dt.scale) {
checked.castTo(targetType)
} else {
val containerType = DecimalUtils.createDecimalType(dt.precision, dt.scale + 1)
// Increase precision by one along with scale in case of overflow, which may lead to
// the upcast of cuDF decimal type. If precision already hits the max precision, it is safe
// to increase the scale solely because we have checked and replaced out of range values.
val containerType = DecimalUtils.createDecimalType(
dt.precision + 1 min DType.DECIMAL128_MAX_PRECISION, dt.scale + 1)
withResource(checked.castTo(containerType)) { container =>
container.round(dt.scale, cudf.RoundMode.HALF_UP)
withResource(container.round(dt.scale, cudf.RoundMode.HALF_UP)) { rd =>
// The cast here is for cases that cuDF decimal type got promoted as precision + 1.
// Need to convert back to original cuDF type, to keep align with the precision.
rd.castTo(targetType)
}
}
}
// Cast NaN values to nulls
Expand Down
Loading

0 comments on commit aba390e

Please sign in to comment.