From aba390e2909fc9e753a128095d97bc46e774e8ec Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Wed, 8 Jun 2022 15:42:14 -0500 Subject: [PATCH] Fix auto merge conflict 5789 (#5790) * Correct the error message for test_mod_pmod_by_zero (#5781) Signed-off-by: Firestarman * Update the error string for test_cast_neg_to_decimal_err on 330[databricks] (#5784) * Update the error string for test_cast_neg_to_decimal_err on 330 Signed-off-by: Firestarman * address comments Signed-off-by: Firestarman * Throw an exception when attempting to read columnar encrypted Parquet files on the GPU [databricks] (#5761) * Throw useful message when parquet columnar encryption enabled * update message * fix message * handle native encrypted * move variable * cleanup * fix native check * cleanup imports * fix import order * Sign off Signed-off-by: Thomas Graves * Shim the parquet crypto exception check Signed-off-by: Thomas Graves * shim 320cdh * Add test for parquet encryption Signed-off-by: Thomas Graves * fix rounds over decimal in Spark 330+ (#5786) Passes the datatype of round-like functions directly to GPU overrides, so as to adapt different Spark versions. Signed-off-by: sperlingxx * Fix the overflow of container type when casting floats to decimal (#5766) Fixes #5765 Fix the potential overflow when casting float/double to decimal. The overflow occurs on the container decimal for HALF_UP round. Signed-off-by: sperlingxx Co-authored-by: Liangcai Li Co-authored-by: Alfred Xu --- integration_tests/pom.xml | 32 ++++++++ integration_tests/run_pyspark_from_build.sh | 8 +- .../src/main/python/arithmetic_ops_test.py | 22 +++++- .../src/main/python/parquet_test.py | 37 ++++++++- .../src/main/python/spark_session.py | 3 + pom.xml | 13 ++++ .../spark/rapids/shims/GpuParquetCrypto.scala | 23 ++++++ .../spark/rapids/shims/GpuParquetCrypto.scala | 30 ++++++++ .../spark/rapids/shims/GpuParquetCrypto.scala | 23 ++++++ .../com/nvidia/spark/rapids/GpuCast.scala | 14 +++- .../nvidia/spark/rapids/GpuOverrides.scala | 4 +- .../nvidia/spark/rapids/GpuParquetScan.scala | 77 ++++++++++++------- .../spark/sql/rapids/mathExpressions.scala | 51 ++++++------ .../com/nvidia/spark/rapids/CastOpSuite.scala | 16 ++++ 14 files changed, 287 insertions(+), 66 deletions(-) create mode 100644 sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuParquetCrypto.scala create mode 100644 sql-plugin/src/main/320+-noncdh/scala/com/nvidia/spark/rapids/shims/GpuParquetCrypto.scala create mode 100644 sql-plugin/src/main/321cdh/scala/com/nvidia/spark/rapids/shims/GpuParquetCrypto.scala diff --git a/integration_tests/pom.xml b/integration_tests/pom.xml index 6006a9eb39b..edb5d127e5b 100644 --- a/integration_tests/pom.xml +++ b/integration_tests/pom.xml @@ -270,6 +270,32 @@ org.scalatest scalatest-maven-plugin + + org.apache.maven.plugins + maven-clean-plugin + ${maven.clean.plugin.version} + + + clean-copy + package + + clean + + + true + + + target/dependency + + parquet-hadoop*.jar + spark-avro*.jar + + + + + + + org.apache.maven.plugins maven-dependency-plugin @@ -288,6 +314,12 @@ spark-avro_${scala.binary.version} ${spark.version} + + org.apache.parquet + parquet-hadoop + ${parquet.hadoop.version} + tests + diff --git a/integration_tests/run_pyspark_from_build.sh b/integration_tests/run_pyspark_from_build.sh index 39295a1abb2..c3cd092d565 100755 --- a/integration_tests/run_pyspark_from_build.sh +++ b/integration_tests/run_pyspark_from_build.sh @@ -48,10 +48,16 @@ else if [ -d "$LOCAL_JAR_PATH" ]; then AVRO_JARS=$(echo "$LOCAL_JAR_PATH"/spark-avro*.jar) PLUGIN_JARS=$(echo "$LOCAL_JAR_PATH"/rapids-4-spark_*.jar) + # TODO - need to update jenkins scripts to upload this jar + # https://github.com/NVIDIA/spark-rapids/issues/5771 + export INCLUDE_PARQUET_HADOOP_TEST_JAR=false + PARQUET_HADOOP_TESTS= # the integration-test-spark3xx.jar, should not include the integration-test-spark3xxtest.jar TEST_JARS=$(echo "$LOCAL_JAR_PATH"/rapids-4-spark-integration-tests*-$INTEGRATION_TEST_VERSION.jar) else AVRO_JARS=$(echo "$SCRIPTPATH"/target/dependency/spark-avro*.jar) + PARQUET_HADOOP_TESTS=$(echo "$SCRIPTPATH"/target/dependency/parquet-hadoop*.jar) + export INCLUDE_PARQUET_HADOOP_TEST_JAR=true PLUGIN_JARS=$(echo "$SCRIPTPATH"/../dist/target/rapids-4-spark_*.jar) # the integration-test-spark3xx.jar, should not include the integration-test-spark3xxtest.jar TEST_JARS=$(echo "$SCRIPTPATH"/target/rapids-4-spark-integration-tests*-$INTEGRATION_TEST_VERSION.jar) @@ -70,7 +76,7 @@ else fi # Only 3 jars: dist.jar integration-test.jar avro.jar - ALL_JARS="$PLUGIN_JARS $TEST_JARS $AVRO_JARS" + ALL_JARS="$PLUGIN_JARS $TEST_JARS $AVRO_JARS $PARQUET_HADOOP_TESTS" echo "AND PLUGIN JARS: $ALL_JARS" if [[ "${TEST}" != "" ]]; then diff --git a/integration_tests/src/main/python/arithmetic_ops_test.py b/integration_tests/src/main/python/arithmetic_ops_test.py index 3010dc93354..3f469bb3f09 100644 --- a/integration_tests/src/main/python/arithmetic_ops_test.py +++ b/integration_tests/src/main/python/arithmetic_ops_test.py @@ -285,8 +285,12 @@ def test_mod_pmod_long_min_value(): 'cast(-12 as {}) % cast(0 as {})'], ids=idfn) def test_mod_pmod_by_zero(data_gen, overflow_exp): string_type = to_cast_string(data_gen.data_type) - exception_str = "java.lang.ArithmeticException: divide by zero" if is_before_spark_320() else \ - "org.apache.spark.SparkArithmeticException: divide by zero" + if is_before_spark_320(): + exception_str = 'java.lang.ArithmeticException: divide by zero' + elif is_before_spark_330(): + exception_str = 'SparkArithmeticException: divide by zero' + else: + exception_str = 'SparkArithmeticException: Division by zero' assert_gpu_and_cpu_error( lambda spark : unary_op_df(spark, data_gen).selectExpr( @@ -483,9 +487,19 @@ def test_shift_right_unsigned(data_gen): 'shiftrightunsigned(a, cast(null as INT))', 'shiftrightunsigned(a, b)')) +_arith_data_gens_for_round = numeric_gens + _arith_decimal_gens_no_neg_scale + [ + decimal_gen_32bit_neg_scale, + DecimalGen(precision=15, scale=-8), + DecimalGen(precision=30, scale=-5), + pytest.param(_decimal_gen_36_neg5, marks=pytest.mark.skipif( + is_spark_330_or_later(), reason='This case overflows in Spark 3.3.0+')), + pytest.param(_decimal_gen_38_neg10, marks=pytest.mark.skipif( + is_spark_330_or_later(), reason='This case overflows in Spark 3.3.0+')) +] + @incompat @approximate_float -@pytest.mark.parametrize('data_gen', _arith_data_gens, ids=idfn) +@pytest.mark.parametrize('data_gen', _arith_data_gens_for_round, ids=idfn) def test_decimal_bround(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, data_gen).selectExpr( @@ -497,7 +511,7 @@ def test_decimal_bround(data_gen): @incompat @approximate_float -@pytest.mark.parametrize('data_gen', _arith_data_gens, ids=idfn) +@pytest.mark.parametrize('data_gen', _arith_data_gens_for_round, ids=idfn) def test_decimal_round(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, data_gen).selectExpr( diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index ec7426d7814..0fc4bde51de 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os import pytest @@ -20,7 +21,7 @@ from marks import * from pyspark.sql.types import * from pyspark.sql.functions import * -from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330 +from spark_session import with_cpu_session, with_gpu_session, is_before_spark_320, is_before_spark_330, is_spark_321cdh from conftest import is_databricks_runtime @@ -969,3 +970,37 @@ def test_parquet_check_schema_compatibility_nested_types(spark_tmp_path): lambda: with_gpu_session( lambda spark: spark.read.schema(read_map_str_str_as_str_int).parquet(data_path).collect()), error_message='Parquet column cannot be converted') + +@pytest.mark.skipif(is_before_spark_320() or is_spark_321cdh(), reason='Encryption is not supported before Spark 3.2.0 or Parquet < 1.12') +@pytest.mark.skipif(os.environ.get('INCLUDE_PARQUET_HADOOP_TEST_JAR', 'false') == 'false', reason='INCLUDE_PARQUET_HADOOP_TEST_JAR is disabled') +@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) +def test_parquet_read_encryption(spark_tmp_path, reader_confs, v1_enabled_list): + + data_path = spark_tmp_path + '/PARQUET_DATA' + gen_list = [('one', int_gen), ('two', byte_gen), ('THREE', boolean_gen)] + + encryption_confs = { + 'parquet.encryption.kms.client.class': 'org.apache.parquet.crypto.keytools.mocks.InMemoryKMS', + 'parquet.encryption.key.list': 'keyA:AAECAwQFBgcICQoLDA0ODw== , keyB:AAECAAECAAECAAECAAECAA==', + 'parquet.crypto.factory.class': 'org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory' + } + + conf = copy_and_update(reader_confs, encryption_confs) + + with_cpu_session( + lambda spark : gen_df(spark, gen_list).write. + option("parquet.encryption.column.keys" , "keyA:one"). + option("parquet.encryption.footer.key" , "keyB"). + parquet(data_path), conf=encryption_confs) + + # test with missing encryption conf reading encrypted file + assert_py4j_exception( + lambda: with_gpu_session( + lambda spark: spark.read.parquet(data_path).collect()), + error_message='Could not read footer for file') + + assert_py4j_exception( + lambda: with_gpu_session( + lambda spark: spark.read.parquet(data_path).collect(), conf=conf), + error_message='The GPU does not support reading encrypted Parquet files') diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index e7830804edc..ac271be2f2e 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -151,6 +151,9 @@ def is_before_spark_340(): def is_spark_330_or_later(): return spark_version() >= "3.3.0" +def is_spark_321cdh(): + return "3.2.1.3.2.717" in spark_version() + def is_databricks_version_or_later(major, minor): spark = get_spark_i_know_what_i_am_doing() version = spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion", "0.0") diff --git a/pom.xml b/pom.xml index b5394391db1..5df1a886450 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,7 @@ ${spark311.version} ${spark311.version} + 1.10.1 @@ -166,6 +167,7 @@ ${spark312db.version} 2.7.4 true + 1.10.1 @@ -216,6 +218,7 @@ ${spark312.version} ${spark312.version} + 1.10.1 @@ -270,6 +273,7 @@ ${spark313.version} ${spark313.version} + 1.10.1 @@ -323,6 +327,7 @@ ${spark314.version} ${spark314.version} + 1.10.1 @@ -386,6 +391,7 @@ ${spark320.version} ${spark320.version} + 1.12.1 @@ -452,6 +458,7 @@ ${spark321.version} ${spark321.version} + 1.12.2 @@ -519,6 +526,7 @@ ${spark321cdh.version} ${spark321cdh.version} + 1.10.1 @@ -590,6 +598,7 @@ ${spark322.version} ${spark322.version} + 1.12.2 @@ -670,6 +679,7 @@ ${spark321db.version} 3.3.1 true + 1.12.0 @@ -723,6 +733,7 @@ ${spark330.version} ${spark330.version} + 1.12.2 @@ -785,6 +796,7 @@ ${spark340.version} ${spark340.version} + 1.12.3 @@ -887,6 +899,7 @@ 8 ${spark311.version} ${spark.version} + 1.10.1 spark${buildver} cuda11 22.08.0-SNAPSHOT diff --git a/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuParquetCrypto.scala b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuParquetCrypto.scala new file mode 100644 index 00000000000..ec208bba7e7 --- /dev/null +++ b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuParquetCrypto.scala @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims + +object GpuParquetCrypto { + /** + * Columnar encryption was added in Spark 3.2.0 + */ + def isColumnarCryptoException(e: Throwable): Boolean = false +} diff --git a/sql-plugin/src/main/320+-noncdh/scala/com/nvidia/spark/rapids/shims/GpuParquetCrypto.scala b/sql-plugin/src/main/320+-noncdh/scala/com/nvidia/spark/rapids/shims/GpuParquetCrypto.scala new file mode 100644 index 00000000000..bf7a67ff485 --- /dev/null +++ b/sql-plugin/src/main/320+-noncdh/scala/com/nvidia/spark/rapids/shims/GpuParquetCrypto.scala @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims + +import org.apache.parquet.crypto.ParquetCryptoRuntimeException + +object GpuParquetCrypto { + /** + * Columnar encryption was added in Spark 3.2.0 + */ + def isColumnarCryptoException(e: Throwable): Boolean = { + e match { + case crypto: ParquetCryptoRuntimeException => true + case _ => false + } + } +} diff --git a/sql-plugin/src/main/321cdh/scala/com/nvidia/spark/rapids/shims/GpuParquetCrypto.scala b/sql-plugin/src/main/321cdh/scala/com/nvidia/spark/rapids/shims/GpuParquetCrypto.scala new file mode 100644 index 00000000000..869966fcbf8 --- /dev/null +++ b/sql-plugin/src/main/321cdh/scala/com/nvidia/spark/rapids/shims/GpuParquetCrypto.scala @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims + +object GpuParquetCrypto { + /** + * Columnar encryption was added in Spark 3.2.0 but CDH doesn't have Parquet 1.12. + */ + def isColumnarCryptoException(e: Throwable): Boolean = false +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index eaeca2001ab..f11c9649510 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -1464,12 +1464,20 @@ object GpuCast extends Arm { val targetType = DecimalUtil.createCudfDecimal(dt) // If target scale reaches DECIMAL128_MAX_PRECISION, container DECIMAL can not // be created because of precision overflow. In this case, we perform casting op directly. - val casted = if (targetType.getDecimalMaxPrecision == dt.scale) { + val casted = if (DType.DECIMAL128_MAX_PRECISION == dt.scale) { checked.castTo(targetType) } else { - val containerType = DecimalUtils.createDecimalType(dt.precision, dt.scale + 1) + // Increase precision by one along with scale in case of overflow, which may lead to + // the upcast of cuDF decimal type. If precision already hits the max precision, it is safe + // to increase the scale solely because we have checked and replaced out of range values. + val containerType = DecimalUtils.createDecimalType( + dt.precision + 1 min DType.DECIMAL128_MAX_PRECISION, dt.scale + 1) withResource(checked.castTo(containerType)) { container => - container.round(dt.scale, cudf.RoundMode.HALF_UP) + withResource(container.round(dt.scale, cudf.RoundMode.HALF_UP)) { rd => + // The cast here is for cases that cuDF decimal type got promoted as precision + 1. + // Need to convert back to original cuDF type, to keep align with the precision. + rd.castTo(targetType) + } } } // Cast NaN values to nulls diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 6c897dbc29b..21641c48e4b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2401,7 +2401,7 @@ object GpuOverrides extends Logging { } } override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = - GpuBRound(lhs, rhs) + GpuBRound(lhs, rhs, a.dataType) }), expr[Round]( "Round an expression to d decimal places using HALF_UP rounding mode", @@ -2422,7 +2422,7 @@ object GpuOverrides extends Logging { } } override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = - GpuRound(lhs, rhs) + GpuRound(lhs, rhs, a.dataType) }), expr[PythonUDF]( "UDF run in an external python process. Does not actually run on the GPU, but " + diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index c8b7f522a4d..43b40bf2a21 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -37,7 +37,7 @@ import com.nvidia.spark.rapids.ParquetPartitionReader.CopyRange import com.nvidia.spark.rapids.RapidsConf.ParquetFooterReaderType import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.jni.ParquetFooter -import com.nvidia.spark.rapids.shims.{GpuTypeShims, ParquetFieldIdShims, ParquetSchemaClipShims, ParquetStringPredShims, ShimFilePartitionReaderFactory, SparkShimImpl} +import com.nvidia.spark.rapids.shims.{GpuParquetCrypto, GpuTypeShims, ParquetFieldIdShims, ParquetSchemaClipShims, ParquetStringPredShims, ShimFilePartitionReaderFactory, SparkShimImpl} import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataInputStream, Path} @@ -474,6 +474,10 @@ private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) exte private val useFieldId = ParquetSchemaClipShims.useFieldId(sqlConf) private val timestampNTZEnabled = ParquetSchemaClipShims.timestampNTZEnabled(sqlConf) + private val PARQUET_ENCRYPTION_CONFS = Seq("parquet.encryption.kms.client.class", + "parquet.encryption.kms.client.class", "parquet.crypto.factory.class") + private val PARQUET_MAGIC_ENCRYPTED = "PARE".getBytes(StandardCharsets.US_ASCII) + def isParquetTimeInInt96(parquetType: Type): Boolean = { parquetType match { case p:PrimitiveType => @@ -561,9 +565,15 @@ private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) exte val magic = new Array[Byte](MAGIC.length) inputStream.readFully(magic) if (!util.Arrays.equals(MAGIC, magic)) { - throw new RuntimeException(s"$filePath is not a Parquet file. " + + if (util.Arrays.equals(PARQUET_MAGIC_ENCRYPTED, magic)) { + throw new RuntimeException("The GPU does not support reading encrypted Parquet " + + "files. To read encrypted or columnar encrypted files, disable the GPU Parquet " + + s"reader via ${RapidsConf.ENABLE_PARQUET_READ.key}.") + } else { + throw new RuntimeException(s"$filePath is not a Parquet file. " + s"Expected magic number at tail ${util.Arrays.toString(MAGIC)} " + s"but found ${util.Arrays.toString(magic)}") + } } val footerIndex = footerLengthIndex - footerLength if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) { @@ -619,32 +629,47 @@ private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) exte readDataSchema: StructType): ParquetFileInfoWithBlockMeta = { withResource(new NvtxRange("filterBlocks", NvtxColor.PURPLE)) { _ => val filePath = new Path(new URI(file.filePath)) - val footer = footerReader match { - case ParquetFooterReaderType.NATIVE => - val serialized = withResource(readAndFilterFooter(file, conf, readDataSchema, filePath)) { - tableFooter => - if (tableFooter.getNumColumns <= 0) { - // Special case because java parquet reader does not like having 0 columns. - val numRows = tableFooter.getNumRows - val block = new BlockMetaData() - block.setRowCount(numRows) - val schema = new MessageType("root") - return ParquetFileInfoWithBlockMeta(filePath, Seq(block), file.partitionValues, - schema, false, false, false) - } - - tableFooter.serializeThriftFile() - } - withResource(serialized) { serialized => - withResource(new NvtxRange("readFilteredFooter", NvtxColor.YELLOW)) { _ => - val inputFile = new HMBInputFile(serialized) + // Make sure we aren't trying to read encrypted files. For now, remove the related + // parquet confs from the hadoop configuration and try to catch the resulting + // exception and print a useful message + PARQUET_ENCRYPTION_CONFS.foreach { encryptConf => + if (conf.get(encryptConf) != null) { + conf.unset(encryptConf) + } + } + val footer = try { + footerReader match { + case ParquetFooterReaderType.NATIVE => + val serialized = withResource(readAndFilterFooter(file, conf, + readDataSchema, filePath)) { tableFooter => + if (tableFooter.getNumColumns <= 0) { + // Special case because java parquet reader does not like having 0 columns. + val numRows = tableFooter.getNumRows + val block = new BlockMetaData() + block.setRowCount(numRows) + val schema = new MessageType("root") + return ParquetFileInfoWithBlockMeta(filePath, Seq(block), file.partitionValues, + schema, false, false, false) + } + + tableFooter.serializeThriftFile() + } + withResource(serialized) { serialized => + withResource(new NvtxRange("readFilteredFooter", NvtxColor.YELLOW)) { _ => + val inputFile = new HMBInputFile(serialized) - // We already filtered the ranges so no need to do more here... - ParquetFileReader.readFooter(inputFile, ParquetMetadataConverter.NO_FILTER) + // We already filtered the ranges so no need to do more here... + ParquetFileReader.readFooter(inputFile, ParquetMetadataConverter.NO_FILTER) + } } - } - case _ => - readAndSimpleFilterFooter(file, conf, filePath) + case _ => + readAndSimpleFilterFooter(file, conf, filePath) + } + } catch { + case e if GpuParquetCrypto.isColumnarCryptoException(e) => + throw new RuntimeException("The GPU does not support reading encrypted Parquet " + + "files. To read encrypted or columnar encrypted files, disable the GPU Parquet " + + s"reader via ${RapidsConf.ENABLE_PARQUET_READ.key}.", e) } val fileSchema = footer.getFileMetaData.getSchema diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala index c444e72090e..2e2d1686162 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala @@ -21,9 +21,8 @@ import java.io.Serializable import ai.rapids.cudf._ import ai.rapids.cudf.ast.BinaryOperator import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression -import org.apache.spark.sql.catalyst.expressions.{EmptyRow, Expression, ImplicitCastInputTypes} +import org.apache.spark.sql.catalyst.expressions.{Expression, ImplicitCastInputTypes} import org.apache.spark.sql.rapids.shims.RapidsFloorCeilUtils import org.apache.spark.sql.types._ @@ -556,32 +555,16 @@ abstract class CudfBinaryMathExpression(name: String) extends CudfBinaryExpressi override def dataType: DataType = DoubleType } -abstract class GpuRoundBase(child: Expression, scale: Expression) extends GpuBinaryExpression - with Serializable with ImplicitCastInputTypes { +// Due to SPARK-39226, the dataType of round-like functions differs by Spark versions. +abstract class GpuRoundBase(child: Expression, scale: Expression, outputType: DataType) + extends GpuBinaryExpression with Serializable with ImplicitCastInputTypes { override def left: Expression = child override def right: Expression = scale def roundMode: RoundMode - override lazy val dataType: DataType = child.dataType match { - // if the new scale is bigger which means we are scaling up, - // keep the original scale as `Decimal` does - case DecimalType.Fixed(p, s) => DecimalType(p, if (_scale > s) s else _scale) - case t => t - } - - // Avoid repeated evaluation since `scale` is a constant int, - // avoid unnecessary `child` evaluation in both codegen and non-codegen eval - // by checking if scaleV == null as well. - private lazy val scaleV: Any = scale match { - case _: GpuExpression => - withResource(scale.columnarEval(null).asInstanceOf[GpuScalar]) { s => - s.getValue - } - case _ => scale.eval(EmptyRow) - } - private lazy val _scale: Int = scaleV.asInstanceOf[Int] + override def dataType: DataType = outputType override def inputTypes: Seq[AbstractDataType] = Seq(NumericType, IntegerType) @@ -590,9 +573,19 @@ abstract class GpuRoundBase(child: Expression, scale: Expression) extends GpuBin val lhsValue = value.getBase val scaleVal = scale.getValue.asInstanceOf[Int] - dataType match { - case DecimalType.Fixed(_, scaleVal) => - lhsValue.round(scaleVal, roundMode) + child.dataType match { + case DecimalType.Fixed(_, s) => + // Only needs to perform round when required scale < input scale + val rounded = if (scaleVal < s) { + lhsValue.round(scaleVal, roundMode) + } else { + lhsValue.incRefCount() + } + withResource(rounded) { _ => + // Fit the output datatype + rounded.castTo( + DecimalUtil.createCudfDecimal(dataType.asInstanceOf[DecimalType])) + } case ByteType => fixUpOverflowInts(() => Scalar.fromByte(0.toByte), scaleVal, lhsValue) case ShortType => @@ -766,13 +759,13 @@ abstract class GpuRoundBase(child: Expression, scale: Expression) extends GpuBin } } -case class GpuBRound(child: Expression, scale: Expression) extends - GpuRoundBase(child, scale) { +case class GpuBRound(child: Expression, scale: Expression, outputType: DataType) extends + GpuRoundBase(child, scale, outputType) { override def roundMode: RoundMode = RoundMode.HALF_EVEN } -case class GpuRound(child: Expression, scale: Expression) extends - GpuRoundBase(child, scale) { +case class GpuRound(child: Expression, scale: Expression, outputType: DataType) extends + GpuRoundBase(child, scale, outputType) { override def roundMode: RoundMode = RoundMode.HALF_UP } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala index 23cbc961763..b34dfc18821 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala @@ -702,6 +702,22 @@ class CastOpSuite extends GpuExpressionTestSuite { } } + test("cast float/double to decimal (include upcast of cuDF decimal type)") { + val genFloats: SparkSession => DataFrame = (ss: SparkSession) => { + ss.createDataFrame(List(Tuple1(459.288333f), Tuple1(-123.456789f), Tuple1(789.100001f))) + .selectExpr("_1 AS col") + } + testCastToDecimal(DataTypes.FloatType, precision = 9, scale = 6, + customDataGenerator = Option(genFloats)) + + val genDoubles: SparkSession => DataFrame = (ss: SparkSession) => { + ss.createDataFrame(List(Tuple1(459.288333), Tuple1(-123.456789), Tuple1(789.100001))) + .selectExpr("_1 AS col") + } + testCastToDecimal(DataTypes.DoubleType, precision = 9, scale = 6, + customDataGenerator = Option(genDoubles)) + } + test("cast decimal to decimal") { // fromScale == toScale testCastToDecimal(DataTypes.createDecimalType(18, 0),