Skip to content

Commit

Permalink
Type promotion in Parquet readers
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Dec 15, 2023
1 parent c0caee7 commit a4e8048
Show file tree
Hide file tree
Showing 6 changed files with 513 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
// For unsigned int32, it stores as plain signed int32 in Parquet when dictionary
// fallbacks. We read them as long values.
return new UnsignedIntegerUpdater();
} else if (sparkType == DataTypes.LongType || canReadAsLongDecimal(descriptor, sparkType)) {
return new IntegerToLongUpdater();
} else if (canReadAsBinaryDecimal(descriptor, sparkType)) {
return new IntegerToBinaryUpdater();
} else if (sparkType == DataTypes.ByteType) {
return new ByteUpdater();
} else if (sparkType == DataTypes.ShortType) {
Expand All @@ -92,6 +96,13 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
return new IntegerWithRebaseUpdater(failIfRebase);
}
} else if (sparkType == DataTypes.TimestampNTZType) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
return new DateToTimestampNTZUpdater();
} else {
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
return new DateToTimestampNTZWithRebaseUpdater(failIfRebase);
}
} else if (sparkType instanceof YearMonthIntervalType) {
return new IntegerUpdater();
}
Expand All @@ -104,6 +115,8 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
} else {
return new LongUpdater();
}
} else if (canReadAsBinaryDecimal(descriptor, sparkType)) {
return new LongToBinaryUpdater();
} else if (isLongDecimal(sparkType) && isUnsignedIntTypeMatched(64)) {
// In `ParquetToSparkSchemaConverter`, we map parquet UINT64 to our Decimal(20, 0).
// For unsigned int64, it stores as plain signed int64 in Parquet when dictionary
Expand Down Expand Up @@ -134,6 +147,8 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
case FLOAT -> {
if (sparkType == DataTypes.FloatType) {
return new FloatUpdater();
} else if (sparkType == DataTypes.DoubleType) {
return new FloatToDoubleUpdater();
}
}
case DOUBLE -> {
Expand Down Expand Up @@ -281,6 +296,121 @@ public void decodeSingleDictionaryId(
}
}

static class IntegerToLongUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; ++i) {
values.putLong(offset + i, valuesReader.readInteger());
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
values.putLong(offset, valuesReader.readInteger());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
values.putLong(offset, dictionary.decodeToInt(dictionaryIds.getDictId(offset)));
}
}

static class DateToTimestampNTZUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; ++i) {
values.putLong(offset + i, DateTimeUtils.daysToMicros(valuesReader.readInteger(), ZoneOffset.UTC));
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
values.putLong(offset, DateTimeUtils.daysToMicros(valuesReader.readInteger(), ZoneOffset.UTC));
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
int days = dictionary.decodeToInt(dictionaryIds.getDictId(offset));
values.putLong(offset, DateTimeUtils.daysToMicros(days, ZoneOffset.UTC));
}
}

private static class DateToTimestampNTZWithRebaseUpdater implements ParquetVectorUpdater {
private final boolean failIfRebase;

DateToTimestampNTZWithRebaseUpdater(boolean failIfRebase) {
this.failIfRebase = failIfRebase;
}

@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; ++i) {
int rebasedDays = rebaseDays(valuesReader.readInteger(), failIfRebase);
values.putLong(offset + i, DateTimeUtils.daysToMicros(rebasedDays, ZoneOffset.UTC));
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
int rebasedDays = rebaseDays(valuesReader.readInteger(), failIfRebase);
values.putLong(offset, DateTimeUtils.daysToMicros(rebasedDays, ZoneOffset.UTC));
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
int rebasedDays = rebaseDays(dictionary.decodeToInt(dictionaryIds.getDictId(offset)), failIfRebase);
values.putLong(offset, DateTimeUtils.daysToMicros(rebasedDays, ZoneOffset.UTC));
}
}

private static class UnsignedIntegerUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
Expand Down Expand Up @@ -684,6 +814,41 @@ public void decodeSingleDictionaryId(
}
}

static class FloatToDoubleUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; ++i) {
values.putDouble(offset + i, valuesReader.readFloat());
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipFloats(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
values.putDouble(offset, valuesReader.readFloat());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
values.putDouble(offset, dictionary.decodeToFloat(dictionaryIds.getDictId(offset)));
}
}

private static class DoubleUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
Expand Down Expand Up @@ -751,6 +916,82 @@ public void decodeSingleDictionaryId(
}
}

private static class IntegerToBinaryUpdater implements ParquetVectorUpdater {

@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; i++) {
readValue(offset + i, values, valuesReader);
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
BigInteger value = BigInteger.valueOf(valuesReader.readInteger());
values.putByteArray(offset, value.toByteArray());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
BigInteger value = BigInteger.valueOf(dictionary.decodeToInt(dictionaryIds.getDictId(offset)));
values.putByteArray(offset, value.toByteArray());
}
}

private static class LongToBinaryUpdater implements ParquetVectorUpdater {

@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; i++) {
readValue(offset + i, values, valuesReader);
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipLongs(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
BigInteger value = BigInteger.valueOf(valuesReader.readLong());
values.putByteArray(offset, value.toByteArray());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
BigInteger value = BigInteger.valueOf(dictionary.decodeToLong(dictionaryIds.getDictId(offset)));
values.putByteArray(offset, value.toByteArray());
}
}

private static class BinaryToSQLTimestampUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@
import org.apache.parquet.schema.PrimitiveType;

import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.DecimalType;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.spark.sql.types.DataTypes.*;

/**
* Decoder to return values from a single column.
Expand Down Expand Up @@ -140,23 +143,42 @@ public VectorizedColumnReader(
this.writerVersion = writerVersion;
}

private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName) {
private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName,
DataType sparkType) {
// Don't use lazy dictionary decoding if the column needs extra processing: upcasting or date /
// decimal scale rebasing.
return switch (typeName) {
case INT32 ->
!(logicalTypeAnnotation instanceof DateLogicalTypeAnnotation) ||
"CORRECTED".equals(datetimeRebaseMode);
case INT32 -> {
boolean needsUpcast = sparkType == LongType || sparkType == TimestampNTZType ||
!DecimalType.is32BitDecimalType(sparkType);
boolean needsRebase = logicalTypeAnnotation instanceof DateLogicalTypeAnnotation && !"CORRECTED".equals(datetimeRebaseMode);
yield !needsUpcast && !needsRebase && !needsDecimalScaleRebase(sparkType);
}
case INT64 -> {
if (updaterFactory.isTimestampTypeMatched(TimeUnit.MICROS)) {
yield "CORRECTED".equals(datetimeRebaseMode);
} else {
yield !updaterFactory.isTimestampTypeMatched(TimeUnit.MILLIS);
}
boolean needsUpcast = !DecimalType.is64BitDecimalType(sparkType) ||
updaterFactory.isTimestampTypeMatched(TimeUnit.MILLIS);
boolean needsRebase = updaterFactory.isTimestampTypeMatched(TimeUnit.MICROS) && !"CORRECTED".equals(datetimeRebaseMode);
yield !needsUpcast && !needsRebase && !needsDecimalScaleRebase(sparkType);
}
case FLOAT, DOUBLE, BINARY -> true;
case FLOAT -> sparkType == FloatType;
case DOUBLE, BINARY -> !needsDecimalScaleRebase(sparkType);
default -> false;
};
}

/**
* Returns whether the Parquet type of this column and the given spark type are two decimal types
* with different scales.
*/
private boolean needsDecimalScaleRebase(DataType sparkType) {
LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation();
if (!(typeAnnotation instanceof DecimalLogicalTypeAnnotation)) return false;
if (!(sparkType instanceof DecimalType)) return false;
DecimalLogicalTypeAnnotation parquetDecimal = (DecimalLogicalTypeAnnotation) typeAnnotation;
DecimalType sparkDecimal = (DecimalType) sparkType;
return parquetDecimal.getScale() != sparkDecimal.scale();
}

/**
* Reads `total` rows from this columnReader into column.
*/
Expand Down Expand Up @@ -205,7 +227,7 @@ void readBatch(
// TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post process
// the values to add microseconds precision.
if (column.hasDictionary() || (startRowId == pageFirstRowIndex &&
isLazyDecodingSupported(typeName))) {
isLazyDecodingSupported(typeName, column.dataType()))) {
// Column vector supports lazy decoding of dictionary values so just set the dictionary.
// We can't do this if startRowId is not the first row index in the page AND the column
// doesn't have a dictionary (i.e. some non-dictionary encoded values have already been
Expand Down
Loading

0 comments on commit a4e8048

Please sign in to comment.