Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DayTimeIntervalType in ParquetCachedBatchSerializer[databricks] #4926

Merged
merged 5 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_equal
from data_gen import *
import pyspark.sql.functions as f
from spark_session import with_cpu_session, with_gpu_session
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330
from join_test import create_df
from marks import incompat, allow_non_gpu, ignore_order

Expand Down Expand Up @@ -285,3 +285,15 @@ def helper(spark):
return df.selectExpr("a")

assert_gpu_and_cpu_are_equal_collect(helper)

razajafri marked this conversation as resolved.
Show resolved Hide resolved

@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Spark3.3.0')
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
@ignore_order(local=True)
def test_cache_daytimeinterval(enable_vectorized_conf):
razajafri marked this conversation as resolved.
Show resolved Hide resolved
def test_func(spark):
df = two_col_df(spark, DayTimeIntervalGen(), int_gen)
df.cache().count()
return df.selectExpr("b", "a")
assert_gpu_and_cpu_are_equal_collect(test_func, enable_vectorized_conf)

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import ai.rapids.cudf.DType
import com.nvidia.spark.rapids.GpuRowToColumnConverter.TypeConverter

import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnVector

object GpuTypeShims {

Expand Down Expand Up @@ -46,4 +47,19 @@ object GpuTypeShims {
* @return the cuDF type if the Shim supports
*/
def toRapidsOrNull(t: DataType): DType = null

/** Whether the Shim supports columnar copy for the given type */
def isColumnarCopySupportedForType(colType: DataType): Boolean = false

/**
* Copy a column for computing on GPU.
* Better to check if the type is supported first by calling 'isColumnarCopySupportedForType'
*/
def columnarCopy(cv: ColumnVector,
b: ai.rapids.cudf.HostColumnVector.ColumnBuilder, rows: Int): Unit = {
val t = cv.dataType()
throw new UnsupportedOperationException(s"Converting to GPU for $t is not supported yet")
}

def isParquetColumnarWriterSupportedForType(colType: DataType): Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi
case TimestampType | StringType | BooleanType | DateType | BinaryType |
DoubleType | FloatType | ByteType | IntegerType | LongType | ShortType => true
case _: DecimalType => true
case other if GpuTypeShims.isParquetColumnarWriterSupportedForType(other) => true
case _ => false
}
}
Expand Down Expand Up @@ -334,10 +335,14 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi
isSchemaSupportedByCudf(schema)) {
def putOnGpuIfNeeded(batch: ColumnarBatch): ColumnarBatch = {
if (!batch.column(0).isInstanceOf[GpuColumnVector]) {
val s: StructType = structSchema
val gpuCB = new GpuColumnarBatchBuilder(s, batch.numRows()).build(batch.numRows())
batch.close()
gpuCB
// The input batch from CPU must NOT be closed, because the columns inside it
// will be reused, and Spark expects the producer to close its batches.
val numRows = batch.numRows()
val gcbBuilder = new GpuColumnarBatchBuilder(structSchema, numRows)
for (i <- 0 until batch.numCols()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use the following to improve the performance:

var rowIndex = 0
while (rowIndex < batch.numRows()) {

    ......
    rowIndex += 1
}

A similar PR: #4770

Copy link
Collaborator Author

@firestarman firestarman Mar 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number of rows is always quite large, so this change can improve some performance. However here is for columns, this suggestion will get little benfit for performance, since number of columns is usually small.

gcbBuilder.copyColumnar(batch.column(i), i, structSchema(i).nullable, numRows)
}
gcbBuilder.build(numRows)
} else {
batch
}
Expand Down Expand Up @@ -1038,7 +1043,7 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi
if (!cbIter.hasNext) {
Iterator.empty
} else {
new CurrentBatchIterator(cbIter.next().asInstanceOf[ParquetCachedBatch])
new CurrentBatchIterator(cbIter.next().asInstanceOf[ParquetCachedBatch])
}
}

Expand Down Expand Up @@ -1434,6 +1439,9 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi

ParquetWriteSupport.setSchema(requestedSchema, hadoopConf)

// From 3.3.0, Spark will check this filed ID config
ParquetFieldIdShims.setupParquetFieldIdWriteConfig(hadoopConf, sqlConf)

hadoopConf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
package com.nvidia.spark.rapids.shims

import ai.rapids.cudf.DType
import com.nvidia.spark.rapids.ColumnarCopyHelper
import com.nvidia.spark.rapids.GpuRowToColumnConverter.{LongConverter, NotNullLongConverter, TypeConverter}

import org.apache.spark.sql.types.{DataType, DayTimeIntervalType}
import org.apache.spark.sql.vectorized.ColumnVector

/**
* Spark stores ANSI YearMonthIntervalType as int32 and ANSI DayTimeIntervalType as int64
Expand Down Expand Up @@ -93,4 +95,27 @@ object GpuTypeShims {
null
}
}

/** Whether the Shim supports columnar copy for the given type */
def isColumnarCopySupportedForType(colType: DataType): Boolean = colType match {
case DayTimeIntervalType(_, _) => true
case _ => false
}

/**
* Copy a column for computing on GPU.
* Better to check if the type is supported first by calling 'isColumnarCopySupportedForType'
*/
def columnarCopy(cv: ColumnVector,
b: ai.rapids.cudf.HostColumnVector.ColumnBuilder, rows: Int): Unit = cv.dataType() match {
case DayTimeIntervalType(_, _) =>
ColumnarCopyHelper.longCopy(cv, b, rows)
case t =>
throw new UnsupportedOperationException(s"Converting to GPU for $t is not supported yet")
}

def isParquetColumnarWriterSupportedForType(colType: DataType): Boolean = colType match {
case DayTimeIntervalType(_, _) => true
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.nvidia.spark.rapids.shims

import com.nvidia.spark.InMemoryTableScanMeta
import com.nvidia.spark.rapids._
import org.apache.parquet.schema.MessageType

Expand All @@ -25,6 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Coalesce, DynamicPruningExpression, Expression, MetadataAttribute, TimeAdd}
import org.apache.spark.sql.catalyst.json.rapids.shims.Spark33XFileOptionsShims
import org.apache.spark.sql.execution.{BaseSubqueryExec, CoalesceExec, FileSourceScanExec, InSubqueryExec, ProjectExec, ReusedSubqueryExec, SparkPlan, SubqueryBroadcastExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FilePartition, FileScanRDD, HadoopFsRelation, PartitionedFile}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
Expand Down Expand Up @@ -261,6 +263,11 @@ trait Spark33XShims extends Spark33XFileOptionsShims {
wrapped.disableBucketedScan)(conf)
}
}),
GpuOverrides.exec[InMemoryTableScanExec](
"Implementation of InMemoryTableScanExec to use GPU accelerated Caching",
// NullType is actually supported
ExecChecks(TypeSig.commonCudfTypesWithNested + TypeSig.DAYTIME, TypeSig.all),
(scan, conf, p, r) => new InMemoryTableScanMeta(scan, conf, p, r)),
GpuOverrides.exec[ProjectExec](
"The backend for most select, withColumn and dropColumn statements",
ExecChecks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import scala.collection.mutable

import com.nvidia.spark.rapids.shims.{ShimUnaryExecNode, SparkShimImpl}
import com.nvidia.spark.rapids.shims.{GpuTypeShims, ShimUnaryExecNode, SparkShimImpl}
import org.apache.arrow.memory.ReferenceManager
import org.apache.arrow.vector.ValueVector

Expand Down Expand Up @@ -148,6 +148,8 @@ object HostColumnarToGpu extends Logging {
ColumnarCopyHelper.decimal128Copy(cv, b, rows, dt.precision, dt.scale)
}
}
case other if GpuTypeShims.isColumnarCopySupportedForType(other) =>
GpuTypeShims.columnarCopy(cv, b, rows)
case t =>
throw new UnsupportedOperationException(
s"Converting to GPU for $t is not currently supported")
Expand Down