Skip to content

Commit

Permalink
Add UDT support to ParquetCachedBatchSerializer (CPU) (#4955)
Browse files Browse the repository at this point in the history
* Add support for UDT

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* add test and checks to fallback to CPU

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* some refactoring to simplify code

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* addressed review comments

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* removed the mapping

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

Co-authored-by: Raza Jafri <rjafri@nvidia.com>
  • Loading branch information
razajafri and razajafri authored Mar 21, 2022
1 parent d7c2385 commit c3cf357
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 196 deletions.
16 changes: 15 additions & 1 deletion integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330
from join_test import create_df
from marks import incompat, allow_non_gpu, ignore_order
import pyspark.mllib.linalg as mllib
import pyspark.ml.linalg as ml

enable_vectorized_confs = [{"spark.sql.inMemoryColumnarStorage.enableVectorizedReader": "true"},
{"spark.sql.inMemoryColumnarStorage.enableVectorizedReader": "false"}]
Expand Down Expand Up @@ -286,6 +288,19 @@ def helper(spark):

assert_gpu_and_cpu_are_equal_collect(helper)

def test_cache_udt():
def fun(spark):
df = spark.sparkContext.parallelize([
(mllib.DenseVector([1, ]), ml.DenseVector([1, ])),
(mllib.SparseVector(1, [0, ], [1, ]), ml.SparseVector(1, [0, ], [1, ]))
]).toDF(["mllib_v", "ml_v"])
df.cache().count()
return df.selectExpr("mllib_v", "ml_v").collect()
cpu_result = with_cpu_session(fun)
gpu_result = with_gpu_session(fun)
# assert_gpu_and_cpu_are_equal_collect method doesn't handle UDT so we just write a single
# statement here to compare
assert cpu_result == gpu_result, "not equal"

@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Spark3.3.0')
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
Expand All @@ -296,4 +311,3 @@ def test_func(spark):
df.cache().count()
return df.selectExpr("b", "a")
assert_gpu_and_cpu_are_equal_collect(test_func, enable_vectorized_conf)

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark

import com.nvidia.spark.rapids.{DataFromReplacementRule, GpuExec, RapidsConf, RapidsMeta, ShimLoader, SparkPlanMeta}
import com.nvidia.spark.rapids.{DataFromReplacementRule, ExecChecks, GpuExec, RapidsConf, RapidsMeta, ShimLoader, SparkPlanMeta}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -25,7 +25,7 @@ import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.rapids.shims.GpuInMemoryTableScanExec
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.storage.StorageLevel

Expand All @@ -46,6 +46,22 @@ class InMemoryTableScanMeta(
extends SparkPlanMeta[InMemoryTableScanExec](imts, conf, parent, rule) {

override def tagPlanForGpu(): Unit = {
def stringifyTypeAttributeMap(groupedByType: Map[DataType, Set[String]]): String = {
groupedByType.map { case (dataType, nameSet) =>
dataType + " " + nameSet.mkString("[", ", ", "]")
}.mkString(", ")
}

val supportedTypeSig = rule.getChecks.get.asInstanceOf[ExecChecks]
val unsupportedTypes: Map[DataType, Set[String]] = imts.relation.output
.filterNot(attr => supportedTypeSig.check.isSupportedByPlugin(attr.dataType))
.groupBy(_.dataType)
.mapValues(_.map(_.name).toSet)

val msgFormat = "unsupported data types in output: %s"
if (unsupportedTypes.nonEmpty) {
willNotWorkOnGpu(msgFormat.format(stringifyTypeAttributeMap(unsupportedTypes)))
}
if (!imts.relation.cacheBuilder.serializer
.isInstanceOf[com.nvidia.spark.ParquetCachedBatchSerializer]) {
willNotWorkOnGpu("ParquetCachedBatchSerializer is not being used")
Expand Down
Loading

0 comments on commit c3cf357

Please sign in to comment.