From cb1487edc57ab47827db245d741be408d300682a Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Fri, 15 Dec 2023 15:11:56 +0100 Subject: [PATCH] Fix formatting --- .../parquet/ParquetVectorUpdaterFactory.java | 15 ++++++++++----- .../parquet/VectorizedColumnReader.java | 6 ++++-- .../parquet/ParquetTypeWideningSuite.scala | 7 ++++--- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 0ece8fefac09a..729812252b102 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -339,7 +339,8 @@ public void readValues( WritableColumnVector values, VectorizedValuesReader valuesReader) { for (int i = 0; i < total; ++i) { - values.putLong(offset + i, DateTimeUtils.daysToMicros(valuesReader.readInteger(), ZoneOffset.UTC)); + long days = DateTimeUtils.daysToMicros(valuesReader.readInteger(), ZoneOffset.UTC); + values.putLong(offset + i, days); } } @@ -353,7 +354,8 @@ public void readValue( int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { - values.putLong(offset, DateTimeUtils.daysToMicros(valuesReader.readInteger(), ZoneOffset.UTC)); + long days = DateTimeUtils.daysToMicros(valuesReader.readInteger(), ZoneOffset.UTC); + values.putLong(offset, days); } @Override @@ -406,7 +408,8 @@ public void decodeSingleDictionaryId( WritableColumnVector values, WritableColumnVector dictionaryIds, Dictionary dictionary) { - int rebasedDays = rebaseDays(dictionary.decodeToInt(dictionaryIds.getDictId(offset)), failIfRebase); + int rebasedDays = + rebaseDays(dictionary.decodeToInt(dictionaryIds.getDictId(offset)), failIfRebase); values.putLong(offset, DateTimeUtils.daysToMicros(rebasedDays, ZoneOffset.UTC)); } } @@ -949,7 +952,8 @@ public void decodeSingleDictionaryId( WritableColumnVector values, WritableColumnVector dictionaryIds, Dictionary dictionary) { - BigInteger value = BigInteger.valueOf(dictionary.decodeToInt(dictionaryIds.getDictId(offset))); + BigInteger value = + BigInteger.valueOf(dictionary.decodeToInt(dictionaryIds.getDictId(offset))); values.putByteArray(offset, value.toByteArray()); } } @@ -987,7 +991,8 @@ public void decodeSingleDictionaryId( WritableColumnVector values, WritableColumnVector dictionaryIds, Dictionary dictionary) { - BigInteger value = BigInteger.valueOf(dictionary.decodeToLong(dictionaryIds.getDictId(offset))); + BigInteger value = + BigInteger.valueOf(dictionary.decodeToLong(dictionaryIds.getDictId(offset))); values.putByteArray(offset, value.toByteArray()); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 33b9412c37663..ba4667d709791 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -151,13 +151,15 @@ private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName case INT32 -> { boolean needsUpcast = sparkType == LongType || sparkType == TimestampNTZType || !DecimalType.is32BitDecimalType(sparkType); - boolean needsRebase = logicalTypeAnnotation instanceof DateLogicalTypeAnnotation && !"CORRECTED".equals(datetimeRebaseMode); + boolean needsRebase = logicalTypeAnnotation instanceof DateLogicalTypeAnnotation && + !"CORRECTED".equals(datetimeRebaseMode); yield !needsUpcast && !needsRebase && !needsDecimalScaleRebase(sparkType); } case INT64 -> { boolean needsUpcast = !DecimalType.is64BitDecimalType(sparkType) || updaterFactory.isTimestampTypeMatched(TimeUnit.MILLIS); - boolean needsRebase = updaterFactory.isTimestampTypeMatched(TimeUnit.MICROS) && !"CORRECTED".equals(datetimeRebaseMode); + boolean needsRebase = updaterFactory.isTimestampTypeMatched(TimeUnit.MICROS) && + !"CORRECTED".equals(datetimeRebaseMode); yield !needsUpcast && !needsRebase && !needsDecimalScaleRebase(sparkType); } case FLOAT -> sparkType == FloatType; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala index e612ada038f38..b862c3385592b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala @@ -21,11 +21,12 @@ import java.io.File import org.apache.hadoop.fs.Path import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat} + import org.apache.spark.SparkException +import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.col -import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -139,7 +140,7 @@ class ParquetTypeWideningSuite } for { - case (values: Seq[String], fromType: DataType, toType: DataType) <- Seq( + (values: Seq[String], fromType: DataType, toType: DataType) <- Seq( (Seq("1", "2", Short.MinValue.toString), ShortType, IntegerType), // Int->Short isn't a widening conversion but Parquet stores both as INT32 so it just works. (Seq("1", "2", Short.MinValue.toString), IntegerType, ShortType), @@ -152,7 +153,7 @@ class ParquetTypeWideningSuite } for { - case (values: Seq[String], fromType: DataType, toType: DataType) <- Seq( + (values: Seq[String], fromType: DataType, toType: DataType) <- Seq( (Seq("1", "2", Int.MinValue.toString), LongType, IntegerType), // Test different timestamp types (Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType),