Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Parquet readers [databricks] #9631

Merged
merged 32 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e368aa6
Add check for nested types
ttnghia Aug 28, 2023
7da416b
Recursively check for rebasing
ttnghia Nov 2, 2023
df8f861
Extract common code
ttnghia Nov 2, 2023
95d19ee
Allow nested type in rebase check
ttnghia Nov 2, 2023
b426610
Enable nested timestamp in roundtrip test
ttnghia Nov 2, 2023
7343b17
Fix another test
ttnghia Nov 2, 2023
0d48f57
Merge branch 'check_rebase_nested' into rebase_datatime
ttnghia Nov 2, 2023
024e6c9
Enable `LEGACY` rebase in read
ttnghia Nov 2, 2023
9a39628
Remove comment
ttnghia Nov 2, 2023
e686bb0
Change function/class signatures
ttnghia Nov 2, 2023
b49963e
Merge branch 'branch-23.12' into rebase_datatime
ttnghia Nov 3, 2023
2c232f8
Complete modification
ttnghia Nov 3, 2023
ac0f3e4
Misc
ttnghia Nov 3, 2023
c773794
Add explicit type
ttnghia Nov 3, 2023
29df7cd
Rename file and add some stuff in DateTimeRebaseHelpers.scala
ttnghia Nov 3, 2023
1b5112d
Move file and rename class
ttnghia Nov 4, 2023
63342a9
Adopt new enum type
ttnghia Nov 4, 2023
6b2d795
Add name for the enum classes
ttnghia Nov 4, 2023
37aa40b
Change exception messages
ttnghia Nov 4, 2023
d4cdc1b
Merge branch 'branch-23.12' into refactor_parquet_scan
ttnghia Nov 4, 2023
03f681e
Does not yet support legacy rebase in read
ttnghia Nov 5, 2023
14f230f
Change legacy to corrected mode
ttnghia Nov 5, 2023
1b464ec
Extract common code
ttnghia Nov 5, 2023
0d26d97
Rename functions
ttnghia Nov 5, 2023
c2504fd
Reformat
ttnghia Nov 5, 2023
edb6c81
Make classes serializable
ttnghia Nov 5, 2023
54e959f
Merge branch 'branch-23.12' into refactor_parquet_scan
ttnghia Nov 6, 2023
3f01690
Change comment
ttnghia Nov 6, 2023
74fe84a
Various changes
ttnghia Nov 6, 2023
a455a90
Fix compile errors
ttnghia Nov 6, 2023
b87493c
Fix comments
ttnghia Nov 6, 2023
321e516
Fix indentations
ttnghia Nov 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import scala.collection.Seq;

import com.nvidia.spark.rapids.DateTimeRebaseCorrected$;
import com.nvidia.spark.rapids.GpuMetric;
import com.nvidia.spark.rapids.GpuParquetUtils;
import com.nvidia.spark.rapids.ParquetPartitionReader;
Expand Down Expand Up @@ -139,8 +140,8 @@ public org.apache.iceberg.io.CloseableIterator<ColumnarBatch> iterator() {
new Path(input.location()), clippedBlocks, fileReadSchema, caseSensitive,
partReaderSparkSchema, debugDumpPrefix, debugDumpAlways,
maxBatchSizeRows, maxBatchSizeBytes, targetBatchSizeBytes, useChunkedReader, metrics,
true, // isCorrectedInt96RebaseMode
true, // isCorrectedRebaseMode
DateTimeRebaseCorrected$.MODULE$, // dateRebaseMode
DateTimeRebaseCorrected$.MODULE$, // timestampRebaseMode
true, // hasInt96Timestamps
false // useFieldId
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,12 @@ static class IcebergParquetExtraInfo extends ParquetExtraInfo {
private final Schema expectedSchema;
private final PartitionSpec partitionSpec;

IcebergParquetExtraInfo(boolean isCorrectedRebaseMode,
boolean isCorrectedInt96RebaseMode, boolean hasInt96Timestamps,
Map<Integer, ?> idToConstant, Schema expectedSchema, PartitionSpec partitionSpec) {
super(isCorrectedRebaseMode, isCorrectedInt96RebaseMode, hasInt96Timestamps);
IcebergParquetExtraInfo(DateTimeRebaseMode dateRebaseMode,
DateTimeRebaseMode timestampRebaseMode,
boolean hasInt96Timestamps,
Map<Integer, ?> idToConstant, Schema expectedSchema,
PartitionSpec partitionSpec) {
super(dateRebaseMode, timestampRebaseMode, hasInt96Timestamps);
this.idToConstant = idToConstant;
this.expectedSchema = expectedSchema;
this.partitionSpec = partitionSpec;
Expand Down Expand Up @@ -309,8 +311,8 @@ protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst) {
ParquetFileInfoWithBlockMeta parquetBlockMeta = ParquetFileInfoWithBlockMeta.apply(
new Path(new URI(fst.file().path().toString())), clippedBlocks,
InternalRow.empty(), fileReadSchema, partReaderSparkSchema,
true, // isCorrectedInt96RebaseMode
true, // isCorrectedRebaseMode
DateTimeRebaseCorrected$.MODULE$, // dateRebaseMode
DateTimeRebaseCorrected$.MODULE$, // timestampRebaseMode
true // hasInt96Timestamps
);
return new FilteredParquetFileInfo(parquetBlockMeta, updatedConstants, updatedSchema);
Expand Down Expand Up @@ -397,8 +399,8 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles,
ParquetSchemaWrapper.apply(filteredInfo.parquetBlockMeta.schema()),
filteredInfo.parquetBlockMeta.readSchema(),
new IcebergParquetExtraInfo(
filteredInfo.parquetBlockMeta.isCorrectedRebaseMode(),
filteredInfo.parquetBlockMeta.isCorrectedInt96RebaseMode(),
filteredInfo.parquetBlockMeta.dateRebaseMode(),
filteredInfo.parquetBlockMeta.timestampRebaseMode(),
filteredInfo.parquetBlockMeta.hasInt96Timestamps(),
filteredInfo.idToConstant(),
filteredInfo.expectedSchema(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids
import java.time.ZoneId

import ai.rapids.cudf._
import com.nvidia.spark.RebaseHelper
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray
import com.nvidia.spark.rapids.jni.DateTimeRebase
Expand Down Expand Up @@ -121,28 +120,31 @@ object GpuParquetFileFormat {
}
}

SparkShimImpl.int96ParquetRebaseWrite(sqlConf) match {
case "EXCEPTION" =>
case "CORRECTED" =>
case "LEGACY" =>
DateTimeRebaseMode.fromName(SparkShimImpl.int96ParquetRebaseWrite(sqlConf)) match {
case DateTimeRebaseException | DateTimeRebaseCorrected => // Good
case DateTimeRebaseLegacy =>
if (schemaHasTimestamps) {
meta.willNotWorkOnGpu("LEGACY rebase mode for int96 timestamps is not supported")
}
case other =>
meta.willNotWorkOnGpu(s"$other is not a supported rebase mode for int96")
// This should never be reached out, since invalid mode is handled in
// `DateTimeRebaseMode.fromName`.
case other => meta.willNotWorkOnGpu(
DateTimeRebaseUtils.invalidRebaseModeMessage(other.getClass.getName))
}

SparkShimImpl.parquetRebaseWrite(sqlConf) match {
case "EXCEPTION" | "CORRECTED" => // Good
case "LEGACY" =>
DateTimeRebaseMode.fromName(SparkShimImpl.parquetRebaseWrite(sqlConf)) match {
case DateTimeRebaseException | DateTimeRebaseCorrected => // Good
case DateTimeRebaseLegacy =>
if (!TypeChecks.areTimestampsSupported()) {
meta.willNotWorkOnGpu("Only UTC timezone is supported in LEGACY rebase mode. " +
s"Current timezone settings: (JVM : ${ZoneId.systemDefault()}, " +
s"session: ${SQLConf.get.sessionLocalTimeZone}). " +
" Set both of the timezones to UTC to enable LEGACY rebase support.")
}
case other =>
meta.willNotWorkOnGpu(s"$other is not a supported rebase mode")
// This should never be reached out, since invalid mode is handled in
// `DateTimeRebaseMode.fromName`.
case other => meta.willNotWorkOnGpu(
DateTimeRebaseUtils.invalidRebaseModeMessage(other.getClass.getName))
}

if (meta.canThisBeReplaced) {
Expand Down Expand Up @@ -193,9 +195,11 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging {
val conf = ContextUtil.getConfiguration(job)

val outputTimestampType = sqlConf.parquetOutputTimestampType
val dateTimeRebaseMode = sparkSession.sqlContext.getConf(SparkShimImpl.parquetRebaseWriteKey)
val dateTimeRebaseMode = DateTimeRebaseMode.fromName(
sparkSession.sqlContext.getConf(SparkShimImpl.parquetRebaseWriteKey))
val timestampRebaseMode = if (outputTimestampType.equals(ParquetOutputTimestampType.INT96)) {
sparkSession.sqlContext.getConf(SparkShimImpl.int96ParquetRebaseWriteKey)
DateTimeRebaseMode.fromName(
sparkSession.sqlContext.getConf(SparkShimImpl.int96ParquetRebaseWriteKey))
} else {
dateTimeRebaseMode
}
Expand Down Expand Up @@ -302,19 +306,20 @@ class GpuParquetWriter(
dataSchema: StructType,
compressionType: CompressionType,
outputTimestampType: String,
dateRebaseMode: String,
timestampRebaseMode: String,
dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode,
context: TaskAttemptContext,
parquetFieldIdEnabled: Boolean)
extends ColumnarOutputWriter(context, dataSchema, "Parquet", true) {
override def throwIfRebaseNeededInExceptionMode(batch: ColumnarBatch): Unit = {
val cols = GpuColumnVector.extractBases(batch)
cols.foreach { col =>
if (dateRebaseMode.equals("EXCEPTION") && RebaseHelper.isDateRebaseNeededInWrite(col)) {
if (dateRebaseMode == DateTimeRebaseException &&
DateTimeRebaseUtils.isDateRebaseNeededInWrite(col)) {
throw DataSourceUtils.newRebaseExceptionInWrite("Parquet")
}
else if (timestampRebaseMode.equals("EXCEPTION") &&
RebaseHelper.isTimeRebaseNeededInWrite(col)) {
else if (timestampRebaseMode == DateTimeRebaseException &&
DateTimeRebaseUtils.isTimeRebaseNeededInWrite(col)) {
throw DataSourceUtils.newRebaseExceptionInWrite("Parquet")
}
}
Expand All @@ -334,14 +339,14 @@ class GpuParquetWriter(
ColumnCastUtil.deepTransform(cv, Some(dt)) {
case (cv, _) if cv.getType.isTimestampType =>
if(cv.getType == DType.TIMESTAMP_DAYS) {
if (dateRebaseMode.equals("LEGACY")) {
if (dateRebaseMode == DateTimeRebaseLegacy) {
DateTimeRebase.rebaseGregorianToJulian(cv)
} else {
cv.copyToColumnVector()
}
} else { /* timestamp */
val typeMillis = ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString
if (timestampRebaseMode.equals("LEGACY")) {
if (timestampRebaseMode == DateTimeRebaseLegacy) {
val rebasedTimestampAsMicros = if(cv.getType == DType.TIMESTAMP_MICROSECONDS) {
DateTimeRebase.rebaseGregorianToJulian(cv)
} else {
Expand Down
Loading
Loading