Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds in some support for the array sql function #1489

Merged
merged 2 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Cos"></a>spark.rapids.sql.expression.Cos|`cos`|Cosine|true|None|
<a name="sql.expression.Cosh"></a>spark.rapids.sql.expression.Cosh|`cosh`|Hyperbolic cosine|true|None|
<a name="sql.expression.Cot"></a>spark.rapids.sql.expression.Cot|`cot`|Cotangent|true|None|
<a name="sql.expression.CreateNamedStruct"></a>spark.rapids.sql.expression.CreateNamedStruct|`named_struct`, `struct`|Creates a struct with the given field names and values.|true|None|
<a name="sql.expression.CreateArray"></a>spark.rapids.sql.expression.CreateArray|`array`| Returns an array with the given elements|true|None|
<a name="sql.expression.CreateNamedStruct"></a>spark.rapids.sql.expression.CreateNamedStruct|`named_struct`, `struct`|Creates a struct with the given field names and values|true|None|
<a name="sql.expression.CurrentRow$"></a>spark.rapids.sql.expression.CurrentRow$| |Special boundary for a window frame, indicating stopping at the current row|true|None|
<a name="sql.expression.DateAdd"></a>spark.rapids.sql.expression.DateAdd|`date_add`|Returns the date that is num_days after start_date|true|None|
<a name="sql.expression.DateDiff"></a>spark.rapids.sql.expression.DateDiff|`datediff`|Returns the number of days from startDate to endDate|true|None|
Expand Down
104 changes: 97 additions & 7 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -555,7 +555,7 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -578,7 +578,7 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -601,7 +601,7 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -624,7 +624,7 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -647,7 +647,7 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -3415,9 +3415,99 @@ Accelerator support is described below.
<td> </td>
</tr>
<tr>
<td rowSpan="4">CreateArray</td>
<td rowSpan="4">`array`</td>
<td rowSpan="4"> Returns an array with the given elements</td>
<td rowSpan="4">None</td>
<td rowSpan="2">project</td>
<td>arg</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<td>result</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td rowSpan="2">lambda</td>
<td>arg</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<td>result</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td rowSpan="6">CreateNamedStruct</td>
<td rowSpan="6">`named_struct`, `struct`</td>
<td rowSpan="6">Creates a struct with the given field names and values.</td>
<td rowSpan="6">Creates a struct with the given field names and values</td>
<td rowSpan="6">None</td>
<td rowSpan="3">project</td>
<td>name</td>
Expand Down
11 changes: 10 additions & 1 deletion integration_tests/src/main/python/array_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,3 +46,12 @@ def test_nested_array_index(data_gen):
'a[1]',
jlowe marked this conversation as resolved.
Show resolved Hide resolved
'a[3]',
'a[50]'))


@pytest.mark.parametrize('data_gen', all_basic_gens + [decimal_gen_default, decimal_gen_scale_precision], ids=idfn)
def test_make_array(data_gen):
(s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).selectExpr(
'array(a, b)',
'array(b, a, null, {}, {})'.format(s1, s2)))
Original file line number Diff line number Diff line change
Expand Up @@ -1912,12 +1912,26 @@ object GpuOverrides {
("key", TypeSig.lit(TypeEnum.STRING), TypeSig.all)),
(in, conf, p, r) => new GpuGetMapValueMeta(in, conf, p, r)),
expr[CreateNamedStruct](
"Creates a struct with the given field names and values.",
"Creates a struct with the given field names and values",
CreateNamedStructCheck,
(in, conf, p, r) => new ExprMeta[CreateNamedStruct](in, conf, p, r) {
override def convertToGpu(): GpuExpression =
GpuCreateNamedStruct(childExprs.map(_.convertToGpu()))
}),
expr[CreateArray](
" Returns an array with the given elements",
ExprChecks.projectNotLambda(
TypeSig.ARRAY.nested(TypeSig.numeric + TypeSig.NULL + TypeSig.STRING +
TypeSig.BOOLEAN + TypeSig.DATE + TypeSig.TIMESTAMP),
TypeSig.ARRAY.nested(TypeSig.all),
repeatingParamCheck = Some(RepeatingParamCheck("arg",
TypeSig.numeric + TypeSig.NULL + TypeSig.STRING +
TypeSig.BOOLEAN + TypeSig.DATE + TypeSig.TIMESTAMP,
TypeSig.all))),
(in, conf, p, r) => new ExprMeta[CreateArray](in, conf, p, r) {
override def convertToGpu(): GpuExpression =
GpuCreateArray(childExprs.map(_.convertToGpu()), wrapped.useStringTypeWhenEmpty)
}),
expr[StringLocate](
"Substring search operator",
ExprChecks.projectNotLambda(TypeSig.INT, TypeSig.INT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,71 @@ package org.apache.spark.sql.rapids

import ai.rapids.cudf.ColumnVector
import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuScalar}
import com.nvidia.spark.rapids.RapidsPluginImplicits.{AutoCloseableArray, ReallyAGpuExpression}
import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression

import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FUNC_ALIAS
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.{EmptyRow, Expression, NamedExpression}
import org.apache.spark.sql.types.{Metadata, StringType, StructField, StructType}
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, DataType, Metadata, NullType, StringType, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

case class GpuCreateArray(children: Seq[Expression], useStringTypeWhenEmpty: Boolean)
extends GpuExpression {

def this(children: Seq[Expression]) = {
this(children, SQLConf.get.getConf(SQLConf.LEGACY_CREATE_EMPTY_COLLECTION_USING_STRING_TYPE))
}

override def foldable: Boolean = children.forall(_.foldable)

override def stringArgs: Iterator[Any] = super.stringArgs.take(1)

override def checkInputDataTypes(): TypeCheckResult = {
TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), s"function $prettyName")
}

private val defaultElementType: DataType = {
if (useStringTypeWhenEmpty) {
StringType
} else {
NullType
}
}

override def dataType: ArrayType = {
ArrayType(
TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(children.map(_.dataType))
.getOrElse(defaultElementType),
containsNull = children.exists(_.nullable))
}

override def nullable: Boolean = false

override def prettyName: String = "array"

override def columnarEval(batch: ColumnarBatch): Any = {
withResource(new Array[ColumnVector](children.size)) { columns =>
val numRows = batch.numRows()
children.indices.foreach { index =>
children(index).columnarEval(batch) match {
case cv: GpuColumnVector =>
columns(index) = cv.getBase
case other =>
val dt = dataType.elementType
withResource(GpuScalar.from(other, dt)) { scalar =>
columns(index) = ColumnVector.fromScalar(scalar, numRows)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
GpuColumnVector.from(ColumnVector.makeList(numRows,
GpuColumnVector.getNonNestedRapidsType(dataType.elementType),
columns: _*), dataType)
}
}
}

case class GpuCreateNamedStruct(children: Seq[Expression]) extends GpuExpression {
lazy val (nameExprs, valExprs) = children.grouped(2).map {
case Seq(name, value) => (name, value)
Expand Down