Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40876][SQL] Widening type promotions in Parquet readers #44368

Closed

Conversation

johanl-db
Copy link
Contributor

@johanl-db johanl-db commented Dec 15, 2023

What changes were proposed in this pull request?

This change adds the following conversions to the vectorized and non-vectorized Parquet readers corresponding to type promotions that are strictly widening without precision loss:

  • Int -> Long
  • Float -> Double
  • Int -> Double
  • Date -> TimestampNTZ (Timestamp without timezone only as a date has no timezone information)
  • Decimal with higher precision (already supported in non-vectorized reader)

Why are the changes needed?

These type promotions support two similar use cases:

  1. Reading a set of Parquet files with different types, e.g a mix of Int and Long for a given column. If the read schema is Long, the reader should be able to read all files and promote Ints to Longs instead of failing.
  2. Widening the type of a column in a table that already contains Parquet files, e.g. an Int column isn't large enough to accommodate IDs and is changed to Long. Existing Parquet files storing the value as Int should still be correctly read by upcasting values.

The second use case in particular will enable widening the type of columns or fields in existing Delta tables.

Does this PR introduce any user-facing change?

The following fails before this change:

Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()

With the Int->Long promotion in both the vectorized and non-vectorized parquet readers, it succeeds and produces correct results, without overflow or loss of precision.
The same is true for Float->Double, Int->Double, Decimal with higher precision and Date->TimestampNTZ

How was this patch tested?

  • Added ParquetTypeWideningSuite covering the promotions included in this change, in particular:
    • Non-dictionary encoded values / dictionary encoded values for each promotion
    • Timestamp rebase modes LEGACY and CORRECTED for Date -> TimestampNTZ
    • Promotions between decimal types with different physical storage: INT32, INT64, BINARY, FIXED_LEN_BYTE_ARRAY
    • Reading values written with Parquet V1 and V2 writers.
  • Updated/Removed two tests that expect type promotion to fail.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Dec 15, 2023
@johanl-db johanl-db force-pushed the SPARK-40876-parquet-type-promotion branch from 2a640f4 to a4e8048 Compare December 15, 2023 13:12
@johanl-db johanl-db changed the title [WIP][SPARK-40876] Widening type promotions in Parquet readers [SPARK-40876] Widening type promotions in Parquet readers Dec 15, 2023
@johanl-db johanl-db force-pushed the SPARK-40876-parquet-type-promotion branch from 6ddb60b to c113d8e Compare December 15, 2023 14:48
@johanl-db johanl-db force-pushed the SPARK-40876-parquet-type-promotion branch from c113d8e to cb1487e Compare December 18, 2023 08:54
@johanl-db johanl-db force-pushed the SPARK-40876-parquet-type-promotion branch from e1c73e7 to dc8b489 Compare December 18, 2023 13:54
@dongjoon-hyun dongjoon-hyun changed the title [SPARK-40876] Widening type promotions in Parquet readers [SPARK-40876][SQL] Widening type promotions in Parquet readers Dec 18, 2023
} else if (sparkType == DataTypes.DateType) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
return new IntegerUpdater();
} else {
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
return new IntegerWithRebaseUpdater(failIfRebase);
}
} else if (sparkType == DataTypes.TimestampNTZType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should support timestamp ltz as well, which is DataTypes.TimestmapType

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but this means the parquet reader needs to know the session timezone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, how do we know this INT32 is a logical DATE in parquet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a check to only allow reading INT32 with a date annotation.

I took a stab at Date->TimestampLTZ but it's not trivial and we would need to discuss the expected behavior, I can follow up in a different PR if we decide we need it. It's easy to get wrong and I'd rather disallow it for now.

@@ -1070,17 +1070,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

test("SPARK-35640: int as long should throw schema incompatible error") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also cc @sunchao

Will this pr solve the problem described in SPARK-35461 too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johanl-db
Copy link
Contributor Author

@cloud-fan Can you merge this PR?

} else if (sparkType == DataTypes.DateType) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
return new IntegerUpdater();
} else {
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
return new IntegerWithRebaseUpdater(failIfRebase);
}
} else if (sparkType == DataTypes.TimestampNTZType && isDateTypeMatched(descriptor)) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #44428 , ntz should never rebase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, this is date, and we need rebase

boolean needsUpcast = sparkType == LongType || (isDate && sparkType == TimestampNTZType) ||
!DecimalType.is32BitDecimalType(sparkType);
boolean needsRebase = logicalTypeAnnotation instanceof DateLogicalTypeAnnotation &&
!"CORRECTED".equals(datetimeRebaseMode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

parquetType.getLogicalTypeAnnotation.isInstanceOf[DateLogicalTypeAnnotation] =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = {
this.updater.set(DateTimeUtils.daysToMicros(dateRebaseFunc(value), ZoneOffset.UTC))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 3361f25 Dec 22, 2023
@LuciferYang
Copy link
Contributor

There are 3 test failed in ParquetTypeWideningSuite with spark.sql.ansi.enabled=true, cloud you take a look? @johanl-db

[info] - unsupported parquet conversion IntegerType -> TimestampType *** FAILED *** (68 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 261.0 failed 1 times, most recent failure: Lost task 1.0 in stage 261.0 (TID 523) (localhost executor driver): org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value '1.23' of the type "STRING" cannot be cast to "INT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22018
[info] == DataFrame ==
[info] "cast" was called from
[info] org.apache.spark.sql.execution.datasources.parquet.ParquetTypeWideningSuite.writeParquetFiles(ParquetTypeWideningSuite.scala:113)
[info] 
[info] 	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:145)
[info] 	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.withException(UTF8StringUtils.scala:51)
[info] 	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.toIntExact(UTF8StringUtils.scala:34)
[info] 	at org.apache.spark.sql.catalyst.util.UTF8StringUtils.toIntExact(UTF8StringUtils.scala)
[info] 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
[info] 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[info] 	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
[info] 	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:388)
[info] 	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:101)
[info] 	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:891)
[info] 	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:891)
[info] 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info] 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
[info] 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
[info] 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
[info] 	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
[info] 	at org.apache.spark.scheduler.Task.run(Task.scala:141)
[info] 	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:628)
[info] 	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
[info] 	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
[info] 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
[info] 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:631)
[info] 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] 	at java.base/java.lang.Thread.run(Thread.java:840)

@cloud-fan
Copy link
Contributor

@LuciferYang thanks for catching it! Does it block PR merging? We may need to wait for a few days as it's the holiday season. If you can fix it then it's even better. We can revert it first if it affects other PRs.

@LuciferYang
Copy link
Contributor

@cloud-fan This is not a serious issue, just ansi mode daily test failed, I don't think we need to revert.

@LuciferYang
Copy link
Contributor

@LuciferYang thanks for catching it! Does it block PR merging? We may need to wait for a few days as it's the holiday season. If you can fix it then it's even better. We can revert it first if it affects other PRs.

Just marking this, the issue has been fixed by #44481

cloud-fan pushed a commit that referenced this pull request Jan 8, 2024
…cale in Parquet readers

### What changes were proposed in this pull request?
This is a follow-up from #44368 implementing an additional type promotion to decimals with larger precision and scale.
As long as the precision increases by at least as much as the scale, the decimal values can be promoted without loss of precision: Decimal(6, 2) -> Decimal(8, 4):  1234.56 -> 1234.5600.

The non-vectorized reader (parquet-mr) is already able to do this type promotion, this PR implements it for the vectorized reader.

### Why are the changes needed?
This allows reading multiple parquet files that contain decimal with different precision/scales

### Does this PR introduce _any_ user-facing change?
Yes, the following now succeeds when using the vectorized Parquet reader:
```
  Seq(20).toDF($"a".cast(DecimalType(4, 2))).write.parquet(path)
  spark.read.schema("a decimal(6, 4)").parquet(path).collect()
```
It failed before with the vectorized reader and succeeded with the non-vectorized reader.

### How was this patch tested?
- Tests added to `ParquetWideningTypeSuite` to cover decimal promotion between decimals with different physical types: INT32, INT64, FIXED_LEN_BYTE_ARRAY.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44513 from johanl-db/SPARK-40876-parquet-type-promotion-decimal-scale.

Authored-by: Johan Lasperas <johan.lasperas@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
dongjoon-hyun pushed a commit that referenced this pull request Jan 23, 2024
…n Parquet vectorized reader

### What changes were proposed in this pull request?
This is a follow-up from #44368 and #44513, implementing an additional type promotion from integers to decimals in the parquet vectorized reader, bringing it at parity with the non-vectorized reader in that regard.

### Why are the changes needed?
This allows reading parquet files that have different schemas and mix decimals and integers - e.g reading files containing either `Decimal(15, 2)` and `INT32` as `Decimal(15, 2)` - as long as the requested decimal type is large enough to accommodate the integer values without precision loss.

### Does this PR introduce _any_ user-facing change?
Yes, the following now succeeds when using the vectorized Parquet reader:
```
  Seq(20).toDF($"a".cast(IntegerType)).write.parquet(path)
  spark.read.schema("a decimal(12, 0)").parquet(path).collect()
```
It failed before with the vectorized reader and succeeded with the non-vectorized reader.

### How was this patch tested?
- Tests added to `ParquetWideningTypeSuite`
- Updated relevant `ParquetQuerySuite` test.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44803 from johanl-db/SPARK-40876-widening-promotion-int-to-decimal.

Authored-by: Johan Lasperas <johan.lasperas@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
cloud-fan pushed a commit that referenced this pull request Aug 7, 2024
### What changes were proposed in this pull request?

This PR aims to widen type promotions in `AvroDeserializer`. Supported as following(Avro Type -> Spark Type):

- Int -> Long ;
- Int -> Double ;
- Float -> Double;

### Why are the changes needed?

Similar to PR #44368 for `Parquet` reader, we'd better to enable type promotion/widening for `Avro` deserializer.

### Does this PR introduce _any_ user-facing change?

Yes, but more convenient for users.

### How was this patch tested?

Pass GA and add a new test case.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47582 from wayneguow/SPARK-49082.

Authored-by: Wei Guo <guow93@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants