Skip to content

Commit

Permalink
Adds in some support for the array sql function (NVIDIA#1489)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Jan 12, 2021
1 parent 8fe38dc commit 5ace7a2
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 13 deletions.
3 changes: 2 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Cos"></a>spark.rapids.sql.expression.Cos|`cos`|Cosine|true|None|
<a name="sql.expression.Cosh"></a>spark.rapids.sql.expression.Cosh|`cosh`|Hyperbolic cosine|true|None|
<a name="sql.expression.Cot"></a>spark.rapids.sql.expression.Cot|`cot`|Cotangent|true|None|
<a name="sql.expression.CreateNamedStruct"></a>spark.rapids.sql.expression.CreateNamedStruct|`named_struct`, `struct`|Creates a struct with the given field names and values.|true|None|
<a name="sql.expression.CreateArray"></a>spark.rapids.sql.expression.CreateArray|`array`| Returns an array with the given elements|true|None|
<a name="sql.expression.CreateNamedStruct"></a>spark.rapids.sql.expression.CreateNamedStruct|`named_struct`, `struct`|Creates a struct with the given field names and values|true|None|
<a name="sql.expression.CurrentRow$"></a>spark.rapids.sql.expression.CurrentRow$| |Special boundary for a window frame, indicating stopping at the current row|true|None|
<a name="sql.expression.DateAdd"></a>spark.rapids.sql.expression.DateAdd|`date_add`|Returns the date that is num_days after start_date|true|None|
<a name="sql.expression.DateDiff"></a>spark.rapids.sql.expression.DateDiff|`datediff`|Returns the number of days from startDate to endDate|true|None|
Expand Down
104 changes: 97 additions & 7 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -557,7 +557,7 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -580,7 +580,7 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -603,7 +603,7 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -626,7 +626,7 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -649,7 +649,7 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -3417,9 +3417,99 @@ Accelerator support is described below.
<td> </td>
</tr>
<tr>
<td rowSpan="4">CreateArray</td>
<td rowSpan="4">`array`</td>
<td rowSpan="4"> Returns an array with the given elements</td>
<td rowSpan="4">None</td>
<td rowSpan="2">project</td>
<td>arg</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<td>result</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td rowSpan="2">lambda</td>
<td>arg</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<td>result</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td rowSpan="6">CreateNamedStruct</td>
<td rowSpan="6">`named_struct`, `struct`</td>
<td rowSpan="6">Creates a struct with the given field names and values.</td>
<td rowSpan="6">Creates a struct with the given field names and values</td>
<td rowSpan="6">None</td>
<td rowSpan="3">project</td>
<td>name</td>
Expand Down
11 changes: 10 additions & 1 deletion integration_tests/src/main/python/array_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,3 +46,12 @@ def test_nested_array_index(data_gen):
'a[1]',
'a[3]',
'a[50]'))


@pytest.mark.parametrize('data_gen', all_basic_gens + [decimal_gen_default, decimal_gen_scale_precision], ids=idfn)
def test_make_array(data_gen):
(s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).selectExpr(
'array(a, b)',
'array(b, a, null, {}, {})'.format(s1, s2)))
Original file line number Diff line number Diff line change
Expand Up @@ -1912,12 +1912,26 @@ object GpuOverrides {
("key", TypeSig.lit(TypeEnum.STRING), TypeSig.all)),
(in, conf, p, r) => new GpuGetMapValueMeta(in, conf, p, r)),
expr[CreateNamedStruct](
"Creates a struct with the given field names and values.",
"Creates a struct with the given field names and values",
CreateNamedStructCheck,
(in, conf, p, r) => new ExprMeta[CreateNamedStruct](in, conf, p, r) {
override def convertToGpu(): GpuExpression =
GpuCreateNamedStruct(childExprs.map(_.convertToGpu()))
}),
expr[CreateArray](
" Returns an array with the given elements",
ExprChecks.projectNotLambda(
TypeSig.ARRAY.nested(TypeSig.numeric + TypeSig.NULL + TypeSig.STRING +
TypeSig.BOOLEAN + TypeSig.DATE + TypeSig.TIMESTAMP),
TypeSig.ARRAY.nested(TypeSig.all),
repeatingParamCheck = Some(RepeatingParamCheck("arg",
TypeSig.numeric + TypeSig.NULL + TypeSig.STRING +
TypeSig.BOOLEAN + TypeSig.DATE + TypeSig.TIMESTAMP,
TypeSig.all))),
(in, conf, p, r) => new ExprMeta[CreateArray](in, conf, p, r) {
override def convertToGpu(): GpuExpression =
GpuCreateArray(childExprs.map(_.convertToGpu()), wrapped.useStringTypeWhenEmpty)
}),
expr[StringLocate](
"Substring search operator",
ExprChecks.projectNotLambda(TypeSig.INT, TypeSig.INT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,71 @@ package org.apache.spark.sql.rapids

import ai.rapids.cudf.ColumnVector
import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuScalar}
import com.nvidia.spark.rapids.RapidsPluginImplicits.{AutoCloseableArray, ReallyAGpuExpression}
import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression

import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FUNC_ALIAS
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.{EmptyRow, Expression, NamedExpression}
import org.apache.spark.sql.types.{Metadata, StringType, StructField, StructType}
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, DataType, Metadata, NullType, StringType, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

case class GpuCreateArray(children: Seq[Expression], useStringTypeWhenEmpty: Boolean)
extends GpuExpression {

def this(children: Seq[Expression]) = {
this(children, SQLConf.get.getConf(SQLConf.LEGACY_CREATE_EMPTY_COLLECTION_USING_STRING_TYPE))
}

override def foldable: Boolean = children.forall(_.foldable)

override def stringArgs: Iterator[Any] = super.stringArgs.take(1)

override def checkInputDataTypes(): TypeCheckResult = {
TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), s"function $prettyName")
}

private val defaultElementType: DataType = {
if (useStringTypeWhenEmpty) {
StringType
} else {
NullType
}
}

override def dataType: ArrayType = {
ArrayType(
TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(children.map(_.dataType))
.getOrElse(defaultElementType),
containsNull = children.exists(_.nullable))
}

override def nullable: Boolean = false

override def prettyName: String = "array"

override def columnarEval(batch: ColumnarBatch): Any = {
withResource(new Array[ColumnVector](children.size)) { columns =>
val numRows = batch.numRows()
children.indices.foreach { index =>
children(index).columnarEval(batch) match {
case cv: GpuColumnVector =>
columns(index) = cv.getBase
case other =>
val dt = dataType.elementType
withResource(GpuScalar.from(other, dt)) { scalar =>
columns(index) = ColumnVector.fromScalar(scalar, numRows)
}
}
}
GpuColumnVector.from(ColumnVector.makeList(numRows,
GpuColumnVector.getNonNestedRapidsType(dataType.elementType),
columns: _*), dataType)
}
}
}

case class GpuCreateNamedStruct(children: Seq[Expression]) extends GpuExpression {
lazy val (nameExprs, valExprs) = children.grouped(2).map {
case Seq(name, value) => (name, value)
Expand Down

0 comments on commit 5ace7a2

Please sign in to comment.