Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40876][SQL] Widening type promotion for decimals with larger scale in Parquet readers #44513

Conversation

johanl-db
Copy link
Contributor

What changes were proposed in this pull request?

This is a follow-up from #44368 implementing an additional type promotion to decimals with larger precision and scale.
As long as the precision increases by at least as much as the scale, the decimal values can be promoted without loss of precision: Decimal(6, 2) -> Decimal(8, 4): 1234.56 -> 1234.5600.

The non-vectorized reader (parquet-mr) is already able to do this type promotion, this PR implements it for the vectorized reader.

Why are the changes needed?

This allows reading multiple parquet files that contain decimal with different precision/scales

Does this PR introduce any user-facing change?

Yes, the following now succeeds when using the vectorized Parquet reader:

  Seq(20).toDF($"a".cast(DecimalType(4, 2))).write.parquet(path)
  spark.read.schema("a decimal(6, 4)").parquet(path).collect()

It failed before with the vectorized reader and succeeded with the non-vectorized reader.

How was this patch tested?

  • Tests added to ParquetWideningTypeSuite to cover decimal promotion between decimals with different physical types: INT32, INT64, FIXED_LEN_BYTE_ARRAY.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Dec 27, 2023
Comment on lines +156 to +158
boolean needsUpcast = sparkType == LongType || sparkType == DoubleType ||
(isDate && sparkType == TimestampNTZType) ||
(isDecimal && !DecimalType.is32BitDecimalType(sparkType));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes an issue from #44368, we were incorrectly disabling lazy dictionary decoding for any non-decimal (INT32) type

@johanl-db johanl-db force-pushed the SPARK-40876-parquet-type-promotion-decimal-scale branch from be33b03 to 2966027 Compare December 27, 2023 17:35
@LuciferYang
Copy link
Contributor

 - SPARK-34212 Parquet should read decimals correctly *** FAILED *** (364 milliseconds)
[info]   Expected exception org.apache.spark.SparkException to be thrown, but no exception was thrown (ParquetQuerySuite.scala:1055)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Assertions.intercept(Assertions.scala:766)
[info]   at org.scalatest.Assertions.intercept$(Assertions.scala:746)
[info]   at org.scalatest.funsuite.AnyFunSuite.intercept(AnyFunSuite.scala:1564)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetQuerySuite.$anonfun$new$213(ParquetQuerySuite.scala:1055)
[info]   at scala.collection.immutable.List.foreach(List.scala:333)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetQuerySuite.$anonfun$new$211(ParquetQuerySuite.scala:1054)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info]   at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf(SQLConfHelper.scala:56)
[info]   at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf$(SQLConfHelper.scala:38)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetQuerySuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(ParquetQuerySuite.scala:47)

The failed test case seems to be related to this PR, could you check it? @johanl-db

@johanl-db
Copy link
Contributor Author

 - SPARK-34212 Parquet should read decimals correctly *** FAILED *** (364 milliseconds)
[info]   Expected exception org.apache.spark.SparkException to be thrown, but no exception was thrown (ParquetQuerySuite.scala:1055)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Assertions.intercept(Assertions.scala:766)
[info]   at org.scalatest.Assertions.intercept$(Assertions.scala:746)
[info]   at org.scalatest.funsuite.AnyFunSuite.intercept(AnyFunSuite.scala:1564)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetQuerySuite.$anonfun$new$213(ParquetQuerySuite.scala:1055)
[info]   at scala.collection.immutable.List.foreach(List.scala:333)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetQuerySuite.$anonfun$new$211(ParquetQuerySuite.scala:1054)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info]   at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf(SQLConfHelper.scala:56)
[info]   at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf$(SQLConfHelper.scala:38)
[info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetQuerySuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(ParquetQuerySuite.scala:47)

The failed test case seems to be related to this PR, could you check it? @johanl-db

I fixed it, the last check timed out on another unrelated test though.

@johanl-db
Copy link
Contributor Author

@LuciferYang or @cloud-fan since you approved the previous similar change #44368, could you take a look at this PR?

@@ -1444,14 +1641,29 @@ private static boolean isDateTypeMatched(ColumnDescriptor descriptor) {
}

private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor, DataType dt) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name should be isDecimalTypeCompatible?


@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
valuesReader.skipIntegers(total);
valuesReader.skipIntegers(total);

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in d439e34 Jan 8, 2024
dongjoon-hyun pushed a commit that referenced this pull request Jan 9, 2024
… when ANSI mode is on

### What changes were proposed in this pull request?

This PR is a followup of #44513 that excludes `Decimal(5, 4)` for `10.34` that cannot be represented with ANSI mode on.

### Why are the changes needed?

ANSI build is broken (https://github.com/apache/spark/actions/runs/7455394893/job/20284415710):

```
org.apache.spark.SparkArithmeticException: [NUMERIC_VALUE_OUT_OF_RANGE] 10.34 cannot be represented as Decimal(5, 4). If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error, and return NULL instead. SQLSTATE: 22003
== DataFrame ==
"cast" was called from
org.apache.spark.sql.execution.datasources.parquet.ParquetTypeWideningSuite.writeParquetFiles(ParquetTypeWideningSuite.scala:113)

	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotChangeDecimalPrecisionError(QueryExecutionErrors.scala:116)
	at org.apache.spark.sql.errors.QueryExecutionErrors.cannotChangeDecimalPrecisionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing test cases should cover.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44632 from HyukjinKwon/SPARK-40876-followup.

Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
dongjoon-hyun pushed a commit that referenced this pull request Jan 23, 2024
…n Parquet vectorized reader

### What changes were proposed in this pull request?
This is a follow-up from #44368 and #44513, implementing an additional type promotion from integers to decimals in the parquet vectorized reader, bringing it at parity with the non-vectorized reader in that regard.

### Why are the changes needed?
This allows reading parquet files that have different schemas and mix decimals and integers - e.g reading files containing either `Decimal(15, 2)` and `INT32` as `Decimal(15, 2)` - as long as the requested decimal type is large enough to accommodate the integer values without precision loss.

### Does this PR introduce _any_ user-facing change?
Yes, the following now succeeds when using the vectorized Parquet reader:
```
  Seq(20).toDF($"a".cast(IntegerType)).write.parquet(path)
  spark.read.schema("a decimal(12, 0)").parquet(path).collect()
```
It failed before with the vectorized reader and succeeded with the non-vectorized reader.

### How was this patch tested?
- Tests added to `ParquetWideningTypeSuite`
- Updated relevant `ParquetQuerySuite` test.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44803 from johanl-db/SPARK-40876-widening-promotion-int-to-decimal.

Authored-by: Johan Lasperas <johan.lasperas@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants