From 62a3868b93a51a4dc424e87d6fd06df914158e1b Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Fri, 27 Oct 2023 10:48:44 +0800 Subject: [PATCH] [SPARK-45481][SQL] Introduce a mapper for parquet compression codecs ### What changes were proposed in this pull request? Currently, Spark supported all the parquet compression codecs, but the parquet supported compression codecs and spark supported are not completely one-on-one due to Spark introduce a fake compression codecs none. On the other hand, there are a lot of magic strings copy from parquet compression codecs. This issue lead to developers need to manually maintain its consistency. It is easy to make mistakes and reduce development efficiency. The `CompressionCodecName`, refer: https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java ### Why are the changes needed? Let developers easy to use parquet compression codecs. ### Does this PR introduce _any_ user-facing change? 'No'. Introduce a new class. ### How was this patch tested? Exists test cases. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43308 from beliefer/SPARK-45481. Authored-by: Jiaan Geng Signed-off-by: Jiaan Geng --- .../parquet/ParquetCompressionCodec.java | 62 +++++++++++++++++++ .../datasources/parquet/ParquetOptions.scala | 15 ++--- .../BuiltInDataSourceWriteBenchmark.scala | 6 +- .../benchmark/DataSourceReadBenchmark.scala | 9 ++- .../benchmark/FilterPushdownBenchmark.scala | 5 +- .../benchmark/TPCDSQueryBenchmark.scala | 6 +- .../datasources/FileSourceCodecSuite.scala | 12 +++- ...rquetCompressionCodecPrecedenceSuite.scala | 41 ++++++------ .../datasources/parquet/ParquetIOSuite.scala | 22 +++---- .../sql/hive/CompressionCodecSuite.scala | 23 +++++-- .../spark/sql/hive/HiveParquetSuite.scala | 6 +- .../sql/hive/execution/HiveDDLSuite.scala | 10 +-- .../ParquetHadoopFsRelationSuite.scala | 3 +- 13 files changed, 155 insertions(+), 65 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java new file mode 100644 index 0000000000000..1a37c7a33f20c --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; + +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +/** + * A mapper class from Spark supported parquet compression codecs to parquet compression codecs. + */ +public enum ParquetCompressionCodec { + NONE(CompressionCodecName.UNCOMPRESSED), + UNCOMPRESSED(CompressionCodecName.UNCOMPRESSED), + SNAPPY(CompressionCodecName.SNAPPY), + GZIP(CompressionCodecName.GZIP), + LZO(CompressionCodecName.LZO), + BROTLI(CompressionCodecName.BROTLI), + LZ4(CompressionCodecName.LZ4), + LZ4_RAW(CompressionCodecName.LZ4_RAW), + ZSTD(CompressionCodecName.ZSTD); + + private final CompressionCodecName compressionCodec; + + ParquetCompressionCodec(CompressionCodecName compressionCodec) { + this.compressionCodec = compressionCodec; + } + + public CompressionCodecName getCompressionCodec() { + return this.compressionCodec; + } + + public static ParquetCompressionCodec fromString(String s) { + return ParquetCompressionCodec.valueOf(s.toUpperCase(Locale.ROOT)); + } + + public static final List availableCodecs = + Arrays.asList( + ParquetCompressionCodec.UNCOMPRESSED, + ParquetCompressionCodec.SNAPPY, + ParquetCompressionCodec.GZIP, + ParquetCompressionCodec.ZSTD, + ParquetCompressionCodec.LZ4, + ParquetCompressionCodec.LZ4_RAW); +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 559a994319d3e..ae110fdd0d3a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.Locale import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -88,16 +87,10 @@ class ParquetOptions( object ParquetOptions extends DataSourceOptions { // The parquet compression short names - private val shortParquetCompressionCodecNames = Map( - "none" -> CompressionCodecName.UNCOMPRESSED, - "uncompressed" -> CompressionCodecName.UNCOMPRESSED, - "snappy" -> CompressionCodecName.SNAPPY, - "gzip" -> CompressionCodecName.GZIP, - "lzo" -> CompressionCodecName.LZO, - "brotli" -> CompressionCodecName.BROTLI, - "lz4" -> CompressionCodecName.LZ4, - "lz4_raw" -> CompressionCodecName.LZ4_RAW, - "zstd" -> CompressionCodecName.ZSTD) + private val shortParquetCompressionCodecNames = + ParquetCompressionCodec.values().map { + codec => codec.name().toLowerCase(Locale.ROOT) -> codec.getCompressionCodec + }.toMap def getParquetCompressionCodecName(name: String): String = { shortParquetCompressionCodecNames(name).name() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala index 4752787c501bf..ba3228878ecee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala @@ -16,9 +16,12 @@ */ package org.apache.spark.sql.execution.benchmark +import java.util.Locale + import org.apache.parquet.column.ParquetProperties import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.internal.SQLConf /** @@ -51,7 +54,8 @@ object BuiltInDataSourceWriteBenchmark extends DataSourceWriteBenchmark { mainArgs } - spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, "snappy") + spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, + ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy") formats.foreach { format => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala index 771f944f1f6c5..a8736c041517f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.benchmark import java.io.File +import java.util.Locale import scala.jdk.CollectionConverters._ import scala.util.Random @@ -28,7 +29,7 @@ import org.apache.spark.{SparkConf, TestUtils} import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader +import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, VectorizedParquetRecordReader} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnVector @@ -99,15 +100,17 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark { spark.read.json(dir).createOrReplaceTempView("jsonTable") } + val parquetCodec = ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT) + private def saveAsParquetV1Table(df: DataFrameWriter[Row], dir: String): Unit = { - df.mode("overwrite").option("compression", "snappy").parquet(dir) + df.mode("overwrite").option("compression", parquetCodec).parquet(dir) spark.read.parquet(dir).createOrReplaceTempView("parquetV1Table") } private def saveAsParquetV2Table(df: DataFrameWriter[Row], dir: String): Unit = { withSQLConf(ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString) { - df.mode("overwrite").option("compression", "snappy").parquet(dir) + df.mode("overwrite").option("compression", parquetCodec).parquet(dir) spark.read.parquet(dir).createOrReplaceTempView("parquetV2Table") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index 4862571b9c1be..10781ec90fa00 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql.execution.benchmark import java.io.File +import java.util.Locale import scala.util.Random import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.functions.{monotonically_increasing_id, timestamp_seconds} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType @@ -50,7 +52,8 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark { .setIfMissing("spark.driver.memory", "3g") .setIfMissing("spark.executor.memory", "3g") .setIfMissing("orc.compression", "snappy") - .setIfMissing("spark.sql.parquet.compression.codec", "snappy") + .setIfMissing("spark.sql.parquet.compression.codec", + ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) SparkSession.builder().config(conf).getOrCreate() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index c26272d1dcd63..f01cfea62a958 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.benchmark +import java.util.Locale + import scala.util.Try import org.apache.spark.SparkConf @@ -29,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -51,7 +54,8 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark with Logging { val conf = new SparkConf() .setMaster(System.getProperty("spark.sql.test.master", "local[1]")) .setAppName("test-sql-context") - .set("spark.sql.parquet.compression.codec", "snappy") + .set("spark.sql.parquet.compression.codec", + ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) .set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4")) .set("spark.driver.memory", "3g") .set("spark.executor.memory", "3g") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala index 11e9f4665a9cf..1f1805a02d765 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala @@ -17,7 +17,12 @@ package org.apache.spark.sql.execution.datasources +import java.util.Locale + +import scala.jdk.CollectionConverters._ + import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} @@ -58,9 +63,10 @@ class ParquetCodecSuite extends FileSourceCodecSuite { // Exclude "lzo" because it is GPL-licenced so not included in Hadoop. // Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available // on Maven Central. - override protected def availableCodecs: Seq[String] = { - Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4_raw") - } + override protected def availableCodecs: Seq[String] = + (ParquetCompressionCodec.NONE +: + ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq)) + .map(_.name().toLowerCase(Locale.ROOT)).iterator.to(Seq) } class OrcCodecSuite extends FileSourceCodecSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala index 1a387b7d2de63..28ea430635a2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File +import java.util.Locale import scala.jdk.CollectionConverters._ @@ -29,18 +30,9 @@ import org.apache.spark.sql.test.SharedSparkSession class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession { test("Test `spark.sql.parquet.compression.codec` config") { - Seq( - "NONE", - "UNCOMPRESSED", - "SNAPPY", - "GZIP", - "LZO", - "LZ4", - "BROTLI", - "ZSTD", - "LZ4_RAW").foreach { c => - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) { - val expected = if (c == "NONE") "UNCOMPRESSED" else c + ParquetCompressionCodec.values().foreach { codec => + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) { + val expected = codec.getCompressionCodec.name() val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf) assert(option.compressionCodecClassName == expected) } @@ -49,25 +41,32 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet in right order.") { // When "compression" is configured, it should be the first choice. - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { - val props = Map("compression" -> "uncompressed", ParquetOutputFormat.COMPRESSION -> "gzip") + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> + ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { + val props = Map( + "compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT), + ParquetOutputFormat.COMPRESSION -> + ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT)) val option = new ParquetOptions(props, spark.sessionState.conf) - assert(option.compressionCodecClassName == "UNCOMPRESSED") + assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name) } // When "compression" is not configured, "parquet.compression" should be the preferred choice. - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { - val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip") + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> + ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { + val props = Map(ParquetOutputFormat.COMPRESSION -> + ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT)) val option = new ParquetOptions(props, spark.sessionState.conf) - assert(option.compressionCodecClassName == "GZIP") + assert(option.compressionCodecClassName == ParquetCompressionCodec.GZIP.name) } // When both "compression" and "parquet.compression" are not configured, // spark.sql.parquet.compression.codec should be the right choice. - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> + ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { val props = Map.empty[String, String] val option = new ParquetOptions(props, spark.sessionState.conf) - assert(option.compressionCodecClassName == "SNAPPY") + assert(option.compressionCodecClassName == ParquetCompressionCodec.SNAPPY.name) } } @@ -113,8 +112,8 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar } test("Create parquet table with compression") { + val codecs = ParquetCompressionCodec.availableCodecs.asScala.map(_.name()) Seq(true, false).foreach { isPartitioned => - val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", "LZ4_RAW") codecs.foreach { compressionCodec => checkCompressionCodec(compressionCodec, isPartitioned) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 95a45e52bfb49..a5d5f8ce30f0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -34,8 +34,6 @@ import org.apache.parquet.example.data.Group import org.apache.parquet.example.data.simple.{SimpleGroup, SimpleGroupFactory} import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.example.ExampleParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP import org.apache.parquet.io.api.Binary import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -845,7 +843,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession val data = (0 until 10).map(i => (i, i.toString)) - def checkCompressionCodec(codec: CompressionCodecName): Unit = { + def checkCompressionCodec(codec: ParquetCompressionCodec): Unit = { withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) { withParquetFile(data) { path => assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION).toUpperCase(Locale.ROOT)) { @@ -857,12 +855,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession // Checks default compression codec checkCompressionCodec( - CompressionCodecName.fromConf(spark.conf.get(SQLConf.PARQUET_COMPRESSION))) + ParquetCompressionCodec.fromString(spark.conf.get(SQLConf.PARQUET_COMPRESSION))) - checkCompressionCodec(CompressionCodecName.UNCOMPRESSED) - checkCompressionCodec(CompressionCodecName.GZIP) - checkCompressionCodec(CompressionCodecName.SNAPPY) - checkCompressionCodec(CompressionCodecName.ZSTD) + ParquetCompressionCodec.availableCodecs.asScala.foreach(checkCompressionCodec(_)) } private def createParquetWriter( @@ -878,7 +873,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession .withDictionaryEncoding(dictionaryEnabled) .withType(schema) .withWriterVersion(PARQUET_1_0) - .withCompressionCodec(GZIP) + .withCompressionCodec(ParquetCompressionCodec.GZIP.getCompressionCodec) .withRowGroupSize(1024 * 1024) .withPageSize(pageSize) .withDictionaryPageSize(dictionaryPageSize) @@ -1507,9 +1502,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { - val option = new ParquetOptions(Map("Compression" -> "uncompressed"), spark.sessionState.conf) - assert(option.compressionCodecClassName == "UNCOMPRESSED") + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> + ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { + val option = new ParquetOptions( + Map("Compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT)), + spark.sessionState.conf) + assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala index a5d11f6e0e14d..df28e7b4485a6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala @@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.execution.datasources.orc.OrcOptions -import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetTest} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetOptions, ParquetTest} import org.apache.spark.sql.hive.orc.OrcFileOperator import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -289,8 +289,14 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo test("both table-level and session-level compression are set") { checkForTableWithCompressProp("parquet", - tableCompressCodecs = List("UNCOMPRESSED", "SNAPPY", "GZIP"), - sessionCompressCodecs = List("SNAPPY", "GZIP", "SNAPPY")) + tableCompressCodecs = List( + ParquetCompressionCodec.UNCOMPRESSED.name, + ParquetCompressionCodec.SNAPPY.name, + ParquetCompressionCodec.GZIP.name), + sessionCompressCodecs = List( + ParquetCompressionCodec.SNAPPY.name, + ParquetCompressionCodec.GZIP.name, + ParquetCompressionCodec.SNAPPY.name)) checkForTableWithCompressProp("orc", tableCompressCodecs = List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name), @@ -301,7 +307,10 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo test("table-level compression is not set but session-level compressions is set ") { checkForTableWithCompressProp("parquet", tableCompressCodecs = List.empty, - sessionCompressCodecs = List("UNCOMPRESSED", "SNAPPY", "GZIP")) + sessionCompressCodecs = List( + ParquetCompressionCodec.UNCOMPRESSED.name, + ParquetCompressionCodec.SNAPPY.name, + ParquetCompressionCodec.GZIP.name)) checkForTableWithCompressProp("orc", tableCompressCodecs = List.empty, sessionCompressCodecs = @@ -339,7 +348,11 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo } test("test table containing mixed compression codec") { - checkTableWriteWithCompressionCodecs("parquet", List("UNCOMPRESSED", "SNAPPY", "GZIP")) + checkTableWriteWithCompressionCodecs("parquet", + List( + ParquetCompressionCodec.UNCOMPRESSED.name, + ParquetCompressionCodec.SNAPPY.name, + ParquetCompressionCodec.GZIP.name)) checkTableWriteWithCompressionCodecs( "orc", List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala index 2a3c77a56e6db..45dd8da6e0200 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql.hive import java.time.{Duration, Period} import java.time.temporal.ChronoUnit +import java.util.Locale import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -import org.apache.spark.sql.execution.datasources.parquet.ParquetTest +import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetTest} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -157,7 +158,8 @@ class HiveParquetSuite extends QueryTest test("SPARK-37098: Alter table properties should invalidate cache") { // specify the compression in case we change it in future - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> + ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) { withTempPath { dir => withTable("t") { sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION '${dir.getCanonicalPath}'") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 05d2ca1e210f3..78365d25c8984 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} -import org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader +import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} @@ -2709,7 +2709,9 @@ class HiveDDLSuite assert(compression === actualCompression) } - Seq(("orc", "ZLIB"), ("parquet", "GZIP")).foreach { case (fileFormat, compression) => + Seq( + ("orc", "ZLIB"), + ("parquet", ParquetCompressionCodec.GZIP.name)).foreach { case (fileFormat, compression) => test(s"SPARK-22158 convertMetastore should not ignore table property - $fileFormat") { withSQLConf(CONVERT_METASTORE_ORC.key -> "true", CONVERT_METASTORE_PARQUET.key -> "true") { withTable("t") { @@ -2804,14 +2806,14 @@ class HiveDDLSuite assert(DDLUtils.isHiveTable(table)) assert(table.storage.serde.get.contains("parquet")) val properties = table.properties - assert(properties.get("parquet.compression") == Some("GZIP")) + assert(properties.get("parquet.compression") == Some(ParquetCompressionCodec.GZIP.name)) assert(spark.table("t").collect().isEmpty) sql("INSERT INTO t SELECT 1") checkAnswer(spark.table("t"), Row(1)) val maybeFile = path.listFiles().find(_.getName.startsWith("part")) - assertCompression(maybeFile, "parquet", "GZIP") + assertCompression(maybeFile, "parquet", ParquetCompressionCodec.GZIP.name) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 18e8401ee3d2b..84ee19e62bca2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -199,7 +200,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { } test("SPARK-13543: Support for specifying compression codec for Parquet via option()") { - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "UNCOMPRESSED") { + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> ParquetCompressionCodec.UNCOMPRESSED.name) { withTempPath { dir => val path = s"${dir.getCanonicalPath}/table1" val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b")