Skip to content

Commit

Permalink
[SPARK-40876][SQL] Widening type promotions in Parquet readers
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This change adds the following conversions to the vectorized and non-vectorized Parquet readers corresponding to type promotions that are strictly widening without precision loss:
- Int -> Long
- Float -> Double
- Int -> Double
- Date -> TimestampNTZ (Timestamp without timezone only as a date has no timezone information)
- Decimal with higher precision (already supported in non-vectorized reader)

### Why are the changes needed?
These type promotions support two similar use cases:
1. Reading a set of Parquet files with different types, e.g a mix of Int and Long for a given column. If the read schema is Long, the reader should be able to read all files and promote Ints to Longs instead of failing.
2. Widening the type of a column in a table that already contains Parquet files, e.g. an Int column isn't large enough to accommodate IDs and is changed to Long. Existing Parquet files storing the value as Int should still be correctly read by upcasting values.

The second use case in particular will enable widening the type of columns or fields in existing Delta tables.

### Does this PR introduce _any_ user-facing change?
The following fails before this change:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()
```
With the Int->Long promotion in both the vectorized and non-vectorized parquet readers, it succeeds and produces correct results, without overflow or loss of precision.
The same is true for Float->Double, Int->Double, Decimal with higher precision and Date->TimestampNTZ

### How was this patch tested?
- Added `ParquetTypeWideningSuite` covering the promotions included in this change, in particular:
  - Non-dictionary encoded values / dictionary encoded values for each promotion
  - Timestamp rebase modes `LEGACY` and `CORRECTED` for Date -> TimestampNTZ
  - Promotions between decimal types with different physical storage: `INT32`, `INT64`, `BINARY`, `FIXED_LEN_BYTE_ARRAY`
  - Reading values written with Parquet V1 and V2 writers.
- Updated/Removed two tests that expect type promotion to fail.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44368 from johanl-db/SPARK-40876-parquet-type-promotion.

Authored-by: Johan Lasperas <johan.lasperas@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
johanl-db authored and cloud-fan committed Dec 22, 2023
1 parent 43f7932 commit 3361f25
Show file tree
Hide file tree
Showing 6 changed files with 596 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
Expand Down Expand Up @@ -81,17 +82,30 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
// For unsigned int32, it stores as plain signed int32 in Parquet when dictionary
// fallbacks. We read them as long values.
return new UnsignedIntegerUpdater();
} else if (sparkType == DataTypes.LongType || canReadAsLongDecimal(descriptor, sparkType)) {
return new IntegerToLongUpdater();
} else if (canReadAsBinaryDecimal(descriptor, sparkType)) {
return new IntegerToBinaryUpdater();
} else if (sparkType == DataTypes.ByteType) {
return new ByteUpdater();
} else if (sparkType == DataTypes.ShortType) {
return new ShortUpdater();
} else if (sparkType == DataTypes.DoubleType) {
return new IntegerToDoubleUpdater();
} else if (sparkType == DataTypes.DateType) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
return new IntegerUpdater();
} else {
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
return new IntegerWithRebaseUpdater(failIfRebase);
}
} else if (sparkType == DataTypes.TimestampNTZType && isDateTypeMatched(descriptor)) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
return new DateToTimestampNTZUpdater();
} else {
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
return new DateToTimestampNTZWithRebaseUpdater(failIfRebase);
}
} else if (sparkType instanceof YearMonthIntervalType) {
return new IntegerUpdater();
}
Expand All @@ -104,6 +118,8 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
} else {
return new LongUpdater();
}
} else if (canReadAsBinaryDecimal(descriptor, sparkType)) {
return new LongToBinaryUpdater();
} else if (isLongDecimal(sparkType) && isUnsignedIntTypeMatched(64)) {
// In `ParquetToSparkSchemaConverter`, we map parquet UINT64 to our Decimal(20, 0).
// For unsigned int64, it stores as plain signed int64 in Parquet when dictionary
Expand Down Expand Up @@ -142,6 +158,8 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
case FLOAT -> {
if (sparkType == DataTypes.FloatType) {
return new FloatUpdater();
} else if (sparkType == DataTypes.DoubleType) {
return new FloatToDoubleUpdater();
}
}
case DOUBLE -> {
Expand Down Expand Up @@ -288,6 +306,157 @@ public void decodeSingleDictionaryId(
}
}

static class IntegerToLongUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; ++i) {
values.putLong(offset + i, valuesReader.readInteger());
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
values.putLong(offset, valuesReader.readInteger());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
values.putLong(offset, dictionary.decodeToInt(dictionaryIds.getDictId(offset)));
}
}

static class IntegerToDoubleUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; ++i) {
values.putDouble(offset + i, valuesReader.readInteger());
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
values.putDouble(offset, valuesReader.readInteger());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
values.putDouble(offset, dictionary.decodeToInt(dictionaryIds.getDictId(offset)));
}
}

static class DateToTimestampNTZUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; ++i) {
readValue(offset + i, values, valuesReader);
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
long days = DateTimeUtils.daysToMicros(valuesReader.readInteger(), ZoneOffset.UTC);
values.putLong(offset, days);
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
int days = dictionary.decodeToInt(dictionaryIds.getDictId(offset));
values.putLong(offset, DateTimeUtils.daysToMicros(days, ZoneOffset.UTC));
}
}

private static class DateToTimestampNTZWithRebaseUpdater implements ParquetVectorUpdater {
private final boolean failIfRebase;

DateToTimestampNTZWithRebaseUpdater(boolean failIfRebase) {
this.failIfRebase = failIfRebase;
}

@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; ++i) {
readValue(offset + i, values, valuesReader);
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
int rebasedDays = rebaseDays(valuesReader.readInteger(), failIfRebase);
values.putLong(offset, DateTimeUtils.daysToMicros(rebasedDays, ZoneOffset.UTC));
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
int rebasedDays =
rebaseDays(dictionary.decodeToInt(dictionaryIds.getDictId(offset)), failIfRebase);
values.putLong(offset, DateTimeUtils.daysToMicros(rebasedDays, ZoneOffset.UTC));
}
}

private static class UnsignedIntegerUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
Expand Down Expand Up @@ -691,6 +860,41 @@ public void decodeSingleDictionaryId(
}
}

static class FloatToDoubleUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; ++i) {
values.putDouble(offset + i, valuesReader.readFloat());
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipFloats(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
values.putDouble(offset, valuesReader.readFloat());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
values.putDouble(offset, dictionary.decodeToFloat(dictionaryIds.getDictId(offset)));
}
}

private static class DoubleUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
Expand Down Expand Up @@ -758,6 +962,84 @@ public void decodeSingleDictionaryId(
}
}

private static class IntegerToBinaryUpdater implements ParquetVectorUpdater {

@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; i++) {
readValue(offset + i, values, valuesReader);
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
BigInteger value = BigInteger.valueOf(valuesReader.readInteger());
values.putByteArray(offset, value.toByteArray());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
BigInteger value =
BigInteger.valueOf(dictionary.decodeToInt(dictionaryIds.getDictId(offset)));
values.putByteArray(offset, value.toByteArray());
}
}

private static class LongToBinaryUpdater implements ParquetVectorUpdater {

@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; i++) {
readValue(offset + i, values, valuesReader);
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipLongs(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
BigInteger value = BigInteger.valueOf(valuesReader.readLong());
values.putByteArray(offset, value.toByteArray());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
BigInteger value =
BigInteger.valueOf(dictionary.decodeToLong(dictionaryIds.getDictId(offset)));
values.putByteArray(offset, value.toByteArray());
}
}

private static class BinaryToSQLTimestampUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
Expand Down Expand Up @@ -1156,6 +1438,11 @@ private static boolean isLongDecimal(DataType dt) {
return false;
}

private static boolean isDateTypeMatched(ColumnDescriptor descriptor) {
LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation();
return typeAnnotation instanceof DateLogicalTypeAnnotation;
}

private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor, DataType dt) {
DecimalType d = (DecimalType) dt;
LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation();
Expand Down
Loading

0 comments on commit 3361f25

Please sign in to comment.