Skip to content

Commit

Permalink
Add int->double + more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Dec 18, 2023
1 parent cb1487e commit dc8b489
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
return new ByteUpdater();
} else if (sparkType == DataTypes.ShortType) {
return new ShortUpdater();
} else if (sparkType == DataTypes.DoubleType) {
return new IntegerToDoubleUpdater();
} else if (sparkType == DataTypes.DateType) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
return new IntegerUpdater();
Expand Down Expand Up @@ -331,6 +333,41 @@ public void decodeSingleDictionaryId(
}
}

static class IntegerToDoubleUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
for (int i = 0; i < total; ++i) {
values.putDouble(offset + i, valuesReader.readInteger());
}
}

@Override
public void skipValues(int total, VectorizedValuesReader valuesReader) {
valuesReader.skipIntegers(total);
}

@Override
public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
values.putDouble(offset, valuesReader.readInteger());
}

@Override
public void decodeSingleDictionaryId(
int offset,
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
values.putDouble(offset, dictionary.decodeToInt(dictionaryIds.getDictId(offset)));
}
}

static class DateToTimestampNTZUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ private[parquet] class ParquetRowConverter(
override def addInt(value: Int): Unit =
this.updater.setLong(value)
}
case DoubleType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit =
this.updater.setDouble(value)
}
case DoubleType if parquetType.asPrimitiveType().getPrimitiveTypeName == FLOAT =>
new ParquetPrimitiveConverter(updater) {
override def addFloat(value: Float): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,11 @@ class ParquetTypeWideningSuite
// Int->Short isn't a widening conversion but Parquet stores both as INT32 so it just works.
(Seq("1", "2", Short.MinValue.toString), IntegerType, ShortType),
(Seq("1", "2", Int.MinValue.toString), IntegerType, LongType),
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampNTZType),
(Seq("1.23", "10.34"), FloatType, DoubleType))
(Seq("1", "2", Short.MinValue.toString), ShortType, DoubleType),
(Seq("1", "2", Int.MinValue.toString), IntegerType, DoubleType),
(Seq("1.23", "10.34"), FloatType, DoubleType),
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampNTZType)
)
}
test(s"parquet widening conversion $fromType -> $toType") {
checkAllParquetReaders(values, fromType, toType, expectError = false)
Expand All @@ -155,9 +158,10 @@ class ParquetTypeWideningSuite
for {
(values: Seq[String], fromType: DataType, toType: DataType) <- Seq(
(Seq("1", "2", Int.MinValue.toString), LongType, IntegerType),
// Test different timestamp types
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType),
(Seq("1.23", "10.34"), DoubleType, FloatType))
(Seq("1.23", "10.34"), DoubleType, FloatType),
(Seq("1.23", "10.34"), FloatType, LongType),
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType)
)
}
test(s"unsupported parquet conversion $fromType -> $toType") {
checkAllParquetReaders(values, fromType, toType, expectError = true)
Expand Down

0 comments on commit dc8b489

Please sign in to comment.