Skip to content

Commit

Permalink
[SPARK-45481][SQL] Introduce a mapper for parquet compression codecs
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Currently, Spark supported all the parquet compression codecs, but the parquet supported compression codecs and spark supported are not completely one-on-one due to Spark introduce a fake compression codecs none.
On the other hand, there are a lot of magic strings copy from parquet compression codecs. This issue lead to developers need to manually maintain its consistency. It is easy to make mistakes and reduce development efficiency.

The `CompressionCodecName`, refer: https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java

### Why are the changes needed?
Let developers easy to use parquet compression codecs.

### Does this PR introduce _any_ user-facing change?
'No'.
Introduce a new class.

### How was this patch tested?
Exists test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43308 from beliefer/SPARK-45481.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Jiaan Geng <beliefer@163.com>
  • Loading branch information
beliefer committed Oct 27, 2023
1 parent 929405a commit 62a3868
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;

import org.apache.parquet.hadoop.metadata.CompressionCodecName;

/**
* A mapper class from Spark supported parquet compression codecs to parquet compression codecs.
*/
public enum ParquetCompressionCodec {
NONE(CompressionCodecName.UNCOMPRESSED),
UNCOMPRESSED(CompressionCodecName.UNCOMPRESSED),
SNAPPY(CompressionCodecName.SNAPPY),
GZIP(CompressionCodecName.GZIP),
LZO(CompressionCodecName.LZO),
BROTLI(CompressionCodecName.BROTLI),
LZ4(CompressionCodecName.LZ4),
LZ4_RAW(CompressionCodecName.LZ4_RAW),
ZSTD(CompressionCodecName.ZSTD);

private final CompressionCodecName compressionCodec;

ParquetCompressionCodec(CompressionCodecName compressionCodec) {
this.compressionCodec = compressionCodec;
}

public CompressionCodecName getCompressionCodec() {
return this.compressionCodec;
}

public static ParquetCompressionCodec fromString(String s) {
return ParquetCompressionCodec.valueOf(s.toUpperCase(Locale.ROOT));
}

public static final List<ParquetCompressionCodec> availableCodecs =
Arrays.asList(
ParquetCompressionCodec.UNCOMPRESSED,
ParquetCompressionCodec.SNAPPY,
ParquetCompressionCodec.GZIP,
ParquetCompressionCodec.ZSTD,
ParquetCompressionCodec.LZ4,
ParquetCompressionCodec.LZ4_RAW);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.util.Locale

import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
Expand Down Expand Up @@ -88,16 +87,10 @@ class ParquetOptions(

object ParquetOptions extends DataSourceOptions {
// The parquet compression short names
private val shortParquetCompressionCodecNames = Map(
"none" -> CompressionCodecName.UNCOMPRESSED,
"uncompressed" -> CompressionCodecName.UNCOMPRESSED,
"snappy" -> CompressionCodecName.SNAPPY,
"gzip" -> CompressionCodecName.GZIP,
"lzo" -> CompressionCodecName.LZO,
"brotli" -> CompressionCodecName.BROTLI,
"lz4" -> CompressionCodecName.LZ4,
"lz4_raw" -> CompressionCodecName.LZ4_RAW,
"zstd" -> CompressionCodecName.ZSTD)
private val shortParquetCompressionCodecNames =
ParquetCompressionCodec.values().map {
codec => codec.name().toLowerCase(Locale.ROOT) -> codec.getCompressionCodec
}.toMap

def getParquetCompressionCodecName(name: String): String = {
shortParquetCompressionCodecNames(name).name()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package org.apache.spark.sql.execution.benchmark

import java.util.Locale

import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf

/**
Expand Down Expand Up @@ -51,7 +54,8 @@ object BuiltInDataSourceWriteBenchmark extends DataSourceWriteBenchmark {
mainArgs
}

spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, "snappy")
spark.conf.set(SQLConf.PARQUET_COMPRESSION.key,
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy")

formats.foreach { format =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.benchmark

import java.io.File
import java.util.Locale

import scala.jdk.CollectionConverters._
import scala.util.Random
Expand All @@ -28,7 +29,7 @@ import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, VectorizedParquetRecordReader}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnVector
Expand Down Expand Up @@ -99,15 +100,17 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
spark.read.json(dir).createOrReplaceTempView("jsonTable")
}

val parquetCodec = ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)

private def saveAsParquetV1Table(df: DataFrameWriter[Row], dir: String): Unit = {
df.mode("overwrite").option("compression", "snappy").parquet(dir)
df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
spark.read.parquet(dir).createOrReplaceTempView("parquetV1Table")
}

private def saveAsParquetV2Table(df: DataFrameWriter[Row], dir: String): Unit = {
withSQLConf(ParquetOutputFormat.WRITER_VERSION ->
ParquetProperties.WriterVersion.PARQUET_2_0.toString) {
df.mode("overwrite").option("compression", "snappy").parquet(dir)
df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
spark.read.parquet(dir).createOrReplaceTempView("parquetV2Table")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.spark.sql.execution.benchmark

import java.io.File
import java.util.Locale

import scala.util.Random

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.functions.{monotonically_increasing_id, timestamp_seconds}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
Expand All @@ -50,7 +52,8 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
.setIfMissing("spark.driver.memory", "3g")
.setIfMissing("spark.executor.memory", "3g")
.setIfMissing("orc.compression", "snappy")
.setIfMissing("spark.sql.parquet.compression.codec", "snappy")
.setIfMissing("spark.sql.parquet.compression.codec",
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))

SparkSession.builder().config(conf).getOrCreate()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.benchmark

import java.util.Locale

import scala.util.Try

import org.apache.spark.SparkConf
Expand All @@ -29,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

Expand All @@ -51,7 +54,8 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark with Logging {
val conf = new SparkConf()
.setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
.setAppName("test-sql-context")
.set("spark.sql.parquet.compression.codec", "snappy")
.set("spark.sql.parquet.compression.codec",
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
.set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4"))
.set("spark.driver.memory", "3g")
.set("spark.executor.memory", "3g")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

package org.apache.spark.sql.execution.datasources

import java.util.Locale

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}

Expand Down Expand Up @@ -58,9 +63,10 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
// Exclude "lzo" because it is GPL-licenced so not included in Hadoop.
// Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available
// on Maven Central.
override protected def availableCodecs: Seq[String] = {
Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4_raw")
}
override protected def availableCodecs: Seq[String] =
(ParquetCompressionCodec.NONE +:
ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq))
.map(_.name().toLowerCase(Locale.ROOT)).iterator.to(Seq)
}

class OrcCodecSuite extends FileSourceCodecSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet

import java.io.File
import java.util.Locale

import scala.jdk.CollectionConverters._

Expand All @@ -29,18 +30,9 @@ import org.apache.spark.sql.test.SharedSparkSession

class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession {
test("Test `spark.sql.parquet.compression.codec` config") {
Seq(
"NONE",
"UNCOMPRESSED",
"SNAPPY",
"GZIP",
"LZO",
"LZ4",
"BROTLI",
"ZSTD",
"LZ4_RAW").foreach { c =>
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
val expected = if (c == "NONE") "UNCOMPRESSED" else c
ParquetCompressionCodec.values().foreach { codec =>
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
val expected = codec.getCompressionCodec.name()
val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf)
assert(option.compressionCodecClassName == expected)
}
Expand All @@ -49,25 +41,32 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar

test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet in right order.") {
// When "compression" is configured, it should be the first choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
val props = Map("compression" -> "uncompressed", ParquetOutputFormat.COMPRESSION -> "gzip")
withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
val props = Map(
"compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT),
ParquetOutputFormat.COMPRESSION ->
ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == "UNCOMPRESSED")
assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name)
}

// When "compression" is not configured, "parquet.compression" should be the preferred choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip")
withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
val props = Map(ParquetOutputFormat.COMPRESSION ->
ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == "GZIP")
assert(option.compressionCodecClassName == ParquetCompressionCodec.GZIP.name)
}

// When both "compression" and "parquet.compression" are not configured,
// spark.sql.parquet.compression.codec should be the right choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
val props = Map.empty[String, String]
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == "SNAPPY")
assert(option.compressionCodecClassName == ParquetCompressionCodec.SNAPPY.name)
}
}

Expand Down Expand Up @@ -113,8 +112,8 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar
}

test("Create parquet table with compression") {
val codecs = ParquetCompressionCodec.availableCodecs.asScala.map(_.name())
Seq(true, false).foreach { isPartitioned =>
val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", "LZ4_RAW")
codecs.foreach { compressionCodec =>
checkCompressionCodec(compressionCodec, isPartitioned)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ import org.apache.parquet.example.data.Group
import org.apache.parquet.example.data.simple.{SimpleGroup, SimpleGroupFactory}
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.example.ExampleParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

Expand Down Expand Up @@ -845,7 +843,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession

val data = (0 until 10).map(i => (i, i.toString))

def checkCompressionCodec(codec: CompressionCodecName): Unit = {
def checkCompressionCodec(codec: ParquetCompressionCodec): Unit = {
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
withParquetFile(data) { path =>
assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION).toUpperCase(Locale.ROOT)) {
Expand All @@ -857,12 +855,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession

// Checks default compression codec
checkCompressionCodec(
CompressionCodecName.fromConf(spark.conf.get(SQLConf.PARQUET_COMPRESSION)))
ParquetCompressionCodec.fromString(spark.conf.get(SQLConf.PARQUET_COMPRESSION)))

checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
checkCompressionCodec(CompressionCodecName.GZIP)
checkCompressionCodec(CompressionCodecName.SNAPPY)
checkCompressionCodec(CompressionCodecName.ZSTD)
ParquetCompressionCodec.availableCodecs.asScala.foreach(checkCompressionCodec(_))
}

private def createParquetWriter(
Expand All @@ -878,7 +873,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
.withDictionaryEncoding(dictionaryEnabled)
.withType(schema)
.withWriterVersion(PARQUET_1_0)
.withCompressionCodec(GZIP)
.withCompressionCodec(ParquetCompressionCodec.GZIP.getCompressionCodec)
.withRowGroupSize(1024 * 1024)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
Expand Down Expand Up @@ -1507,9 +1502,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}

test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
val option = new ParquetOptions(Map("Compression" -> "uncompressed"), spark.sessionState.conf)
assert(option.compressionCodecClassName == "UNCOMPRESSED")
withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
val option = new ParquetOptions(
Map("Compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT)),
spark.sessionState.conf)
assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name)
}
}

Expand Down
Loading

0 comments on commit 62a3868

Please sign in to comment.