Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UDT support to ParquetCachedBatchSerializer (CPU) #4955

Merged
merged 6 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330
from join_test import create_df
from marks import incompat, allow_non_gpu, ignore_order
import pyspark.mllib.linalg as mllib
import pyspark.ml.linalg as ml

enable_vectorized_confs = [{"spark.sql.inMemoryColumnarStorage.enableVectorizedReader": "true"},
{"spark.sql.inMemoryColumnarStorage.enableVectorizedReader": "false"}]
Expand Down Expand Up @@ -286,6 +288,19 @@ def helper(spark):

assert_gpu_and_cpu_are_equal_collect(helper)

def test_cache_udt():
def fun(spark):
df = spark.sparkContext.parallelize([
(mllib.DenseVector([1, ]), ml.DenseVector([1, ])),
(mllib.SparseVector(1, [0, ], [1, ]), ml.SparseVector(1, [0, ], [1, ]))
]).toDF(["mllib_v", "ml_v"])
df.cache().count()
return df.selectExpr("mllib_v", "ml_v").collect()
cpu_result = with_cpu_session(fun)
gpu_result = with_gpu_session(fun)
# assert_gpu_and_cpu_are_equal_collect method doesn't handle UDT so we just write a single
# statement here to compare
assert cpu_result == gpu_result, "not equal"

@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Spark3.3.0')
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
Expand All @@ -296,4 +311,3 @@ def test_func(spark):
df.cache().count()
return df.selectExpr("b", "a")
assert_gpu_and_cpu_are_equal_collect(test_func, enable_vectorized_conf)

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark

import com.nvidia.spark.rapids.{DataFromReplacementRule, GpuExec, RapidsConf, RapidsMeta, ShimLoader, SparkPlanMeta}
import com.nvidia.spark.rapids.{DataFromReplacementRule, ExecChecks, GpuExec, RapidsConf, RapidsMeta, ShimLoader, SparkPlanMeta}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -25,7 +25,7 @@ import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.rapids.shims.GpuInMemoryTableScanExec
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.storage.StorageLevel

Expand All @@ -46,6 +46,22 @@ class InMemoryTableScanMeta(
extends SparkPlanMeta[InMemoryTableScanExec](imts, conf, parent, rule) {

override def tagPlanForGpu(): Unit = {
def stringifyTypeAttributeMap(groupedByType: Map[DataType, Set[String]]): String = {
groupedByType.map { case (dataType, nameSet) =>
dataType + " " + nameSet.mkString("[", ", ", "]")
}.mkString(", ")
}

val supportedTypeSig = rule.getChecks.get.asInstanceOf[ExecChecks]
val unsupportedTypes: Map[DataType, Set[String]] = imts.relation.output
.filterNot(attr => supportedTypeSig.check.isSupportedByPlugin(attr.dataType))
.groupBy(_.dataType)
.mapValues(_.map(_.name).toSet)

val msgFormat = "unsupported data types in output: %s"
if (unsupportedTypes.nonEmpty) {
willNotWorkOnGpu(msgFormat.format(stringifyTypeAttributeMap(unsupportedTypes)))
}
if (!imts.relation.cacheBuilder.serializer
.isInstanceOf[com.nvidia.spark.ParquetCachedBatchSerializer]) {
willNotWorkOnGpu("ParquetCachedBatchSerializer is not being used")
Expand Down
Loading