Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40876][SQL] Widening type promotions in Parquet readers #44368

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
Expand Down Expand Up @@ -81,17 +82,30 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
// For unsigned int32, it stores as plain signed int32 in Parquet when dictionary
// fallbacks. We read them as long values.
return new UnsignedIntegerUpdater();
} else if (sparkType == DataTypes.LongType || canReadAsLongDecimal(descriptor, sparkType)) {
return new IntegerToLongUpdater();
} else if (canReadAsBinaryDecimal(descriptor, sparkType)) {
return new IntegerToBinaryUpdater();
} else if (sparkType == DataTypes.ByteType) {
return new ByteUpdater();
} else if (sparkType == DataTypes.ShortType) {
return new ShortUpdater();
} else if (sparkType == DataTypes.DoubleType) {
return new IntegerToDoubleUpdater();
} else if (sparkType == DataTypes.DateType) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
return new IntegerUpdater();
} else {
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
return new IntegerWithRebaseUpdater(failIfRebase);
}
} else if (sparkType == DataTypes.TimestampNTZType && isDateTypeMatched(descriptor)) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #44428 , ntz should never rebase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, this is date, and we need rebase

return new DateToTimestampNTZUpdater();
} else {
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
return new DateToTimestampNTZWithRebaseUpdater(failIfRebase);
}
} else if (sparkType instanceof YearMonthIntervalType) {
return new IntegerUpdater();
}
Expand All @@ -104,6 +118,8 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
} else {
return new LongUpdater();
}
} else if (canReadAsBinaryDecimal(descriptor, sparkType)) {
return new LongToBinaryUpdater();
} else if (isLongDecimal(sparkType) && isUnsignedIntTypeMatched(64)) {
// In `ParquetToSparkSchemaConverter`, we map parquet UINT64 to our Decimal(20, 0).
// For unsigned int64, it stores as plain signed int64 in Parquet when dictionary
Expand Down Expand Up @@ -142,6 +158,8 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
case FLOAT -> {
if (sparkType == DataTypes.FloatType) {
return new FloatUpdater();
} else if (sparkType == DataTypes.DoubleType) {
return new FloatToDoubleUpdater();
}
}
case DOUBLE -> {
Expand Down Expand Up @@ -288,6 +306,157 @@ public void decodeSingleDictionaryId(
}
}

static class IntegerToLongUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; ++i) {
values.putLong(offset + i, valuesReader.readInteger());
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
values.putLong(offset, valuesReader.readInteger());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
values.putLong(offset, dictionary.decodeToInt(dictionaryIds.getDictId(offset)));
}
}

static class IntegerToDoubleUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; ++i) {
values.putDouble(offset + i, valuesReader.readInteger());
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
values.putDouble(offset, valuesReader.readInteger());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
values.putDouble(offset, dictionary.decodeToInt(dictionaryIds.getDictId(offset)));
}
}

static class DateToTimestampNTZUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; ++i) {
readValue(offset + i, values, valuesReader);
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
long days = DateTimeUtils.daysToMicros(valuesReader.readInteger(), ZoneOffset.UTC);
values.putLong(offset, days);
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
int days = dictionary.decodeToInt(dictionaryIds.getDictId(offset));
values.putLong(offset, DateTimeUtils.daysToMicros(days, ZoneOffset.UTC));
}
}

private static class DateToTimestampNTZWithRebaseUpdater implements ParquetVectorUpdater {
private final boolean failIfRebase;

DateToTimestampNTZWithRebaseUpdater(boolean failIfRebase) {
this.failIfRebase = failIfRebase;
}

@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; ++i) {
readValue(offset + i, values, valuesReader);
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
int rebasedDays = rebaseDays(valuesReader.readInteger(), failIfRebase);
values.putLong(offset, DateTimeUtils.daysToMicros(rebasedDays, ZoneOffset.UTC));
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
int rebasedDays =
rebaseDays(dictionary.decodeToInt(dictionaryIds.getDictId(offset)), failIfRebase);
values.putLong(offset, DateTimeUtils.daysToMicros(rebasedDays, ZoneOffset.UTC));
}
}

private static class UnsignedIntegerUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
Expand Down Expand Up @@ -691,6 +860,41 @@ public void decodeSingleDictionaryId(
}
}

static class FloatToDoubleUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; ++i) {
values.putDouble(offset + i, valuesReader.readFloat());
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipFloats(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
values.putDouble(offset, valuesReader.readFloat());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
values.putDouble(offset, dictionary.decodeToFloat(dictionaryIds.getDictId(offset)));
}
}

private static class DoubleUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
Expand Down Expand Up @@ -758,6 +962,84 @@ public void decodeSingleDictionaryId(
}
}

private static class IntegerToBinaryUpdater implements ParquetVectorUpdater {

@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; i++) {
readValue(offset + i, values, valuesReader);
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
BigInteger value = BigInteger.valueOf(valuesReader.readInteger());
values.putByteArray(offset, value.toByteArray());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
BigInteger value =
BigInteger.valueOf(dictionary.decodeToInt(dictionaryIds.getDictId(offset)));
values.putByteArray(offset, value.toByteArray());
}
}

private static class LongToBinaryUpdater implements ParquetVectorUpdater {

@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; i++) {
readValue(offset + i, values, valuesReader);
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipLongs(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
BigInteger value = BigInteger.valueOf(valuesReader.readLong());
values.putByteArray(offset, value.toByteArray());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
BigInteger value =
BigInteger.valueOf(dictionary.decodeToLong(dictionaryIds.getDictId(offset)));
values.putByteArray(offset, value.toByteArray());
}
}

private static class BinaryToSQLTimestampUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
Expand Down Expand Up @@ -1156,6 +1438,11 @@ private static boolean isLongDecimal(DataType dt) {
return false;
}

private static boolean isDateTypeMatched(ColumnDescriptor descriptor) {
LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation();
return typeAnnotation instanceof DateLogicalTypeAnnotation;
}

private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor, DataType dt) {
DecimalType d = (DecimalType) dt;
LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation();
Expand Down
Loading