Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] support json to struct function #8174

Merged
merged 10 commits into from
May 1, 2023
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -7890,8 +7890,8 @@ are limited.
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS<br/>unsupported child types BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT</em></td>
<td><b>NS</b></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td> </td>
</tr>
<tr>
Expand Down
13 changes: 13 additions & 0 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,3 +368,16 @@ def test_from_json_map():
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json(f.col('a'), 'MAP<STRING,STRING>')),
conf={"spark.rapids.sql.expression.JsonToStructs": "true"})

@pytest.mark.parametrize('data_gen', [StringGen(r'{"a": "[0-9]{0,5}", "b": "[A-Z]{0,5}", "c": 1234}')])
@pytest.mark.parametrize('schema', [StructType([StructField("a", StringType())]),
StructType([StructField("d", StringType())]),
StructType([StructField("a", StringType()), StructField("b", StringType())]),
StructType([StructField("c", LongType()), StructField("a", StringType())]),
StructType([StructField("a", StringType()), StructField("a", StringType())])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only see tests for String, Long, and Struct. If we say that we support the other types we really should have tests for them. This needs to include things like STRUCT of STRUCTs and STURCTs of LISTS.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we say that we support all of the types in our meta object, then we need tests for all of the data types that JSON supports in Spark

https://github.com/apache/spark/blob/4a238cd9d8e80eed06732fc52b1456cb5ece6652/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala#L193-L385

I personally would rather see us start with a few simple types and add more as we add tests for them. So if we have tests for String, Int, array and struct, then we should only say that we support those types. We can add in support for boolean, byte, short, long, decimal (which needs to include multiple precision and scale types), Float, Double, Timestamp, TimestampNTZ, Date, Binary, CalendarInterval, YearMonthInterval, DayTimeInterval, UDT and NullTypes when a customer/management asks for them or when we have tests that show that they are working correctly.

])
def test_from_json_struct(data_gen, schema):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen) \
.select(f.from_json(f.col('a'), schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": "true"})
Original file line number Diff line number Diff line change
Expand Up @@ -3373,14 +3373,15 @@ object GpuOverrides extends Logging {
expr[JsonToStructs](
"Returns a struct value with the given `jsonStr` and `schema`",
ExprChecks.projectOnly(
TypeSig.MAP.nested(TypeSig.STRING),
(TypeSig.STRUCT + TypeSig.MAP).nested(TypeSig.all),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a way so that MAP type is only supported if it is a MAP<STRING, STRING> and only if it is at the top level. Some of this can be done with a change to this line. But we need more than this and ideally have some tests to verify that we do fall back properly.

(TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY).nested(TypeSig.all),
Seq(ParamCheck("jsonStr", TypeSig.STRING, TypeSig.STRING))),
(a, conf, p, r) => new UnaryExprMeta[JsonToStructs](a, conf, p, r) {
override def tagExprForGpu(): Unit =
GpuJsonScan.tagJsonToStructsSupport(a.options, this)

override def convertToGpu(child: Expression): GpuExpression =
// GPU implementation currently does not support duplicated json key names in input
revans2 marked this conversation as resolved.
Show resolved Hide resolved
GpuJsonToStructs(a.schema, a.options, child, a.timeZoneId)
}).disabledByDefault("parsing JSON from a column has a large number of issues and " +
"should be considered beta quality right now."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

package org.apache.spark.sql.rapids

import scala.collection.mutable.Set

import ai.rapids.cudf
import com.nvidia.spark.rapids.{GpuColumnVector, GpuUnaryExpression}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuScalar, GpuUnaryExpression}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import com.nvidia.spark.rapids.jni.MapUtils

import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, NullIntolerant, TimeZoneAwareExpression}
import org.apache.spark.sql.types.{AbstractDataType, DataType, StringType}
import org.apache.spark.sql.types.{AbstractDataType, DataType, MapType, StringType, StructType}

case class GpuJsonToStructs(
schema: DataType,
Expand All @@ -30,8 +34,118 @@ case class GpuJsonToStructs(
timeZoneId: Option[String] = None)
extends GpuUnaryExpression with TimeZoneAwareExpression with ExpectsInputTypes
with NullIntolerant {

private def cleanAndConcat(input: cudf.ColumnVector): (cudf.ColumnVector, cudf.ColumnVector) ={
withResource(cudf.Scalar.fromString("{}")) { emptyRow =>
val stripped = withResource(cudf.Scalar.fromString(" ")) { space =>
input.strip(space)
}
withResource(stripped) { stripped =>
val isNullOrEmptyInput = withResource(input.isNull) { isNull =>
val isEmpty = withResource(stripped.getCharLengths) { lengths =>
withResource(cudf.Scalar.fromInt(0)) { zero =>
lengths.lessOrEqualTo(zero)
}
}
withResource(isEmpty) { isEmpty =>
isNull.binaryOp(cudf.BinaryOp.NULL_LOGICAL_OR, isEmpty, cudf.DType.BOOL8)
}
}
closeOnExcept(isNullOrEmptyInput) { _ =>
withResource(isNullOrEmptyInput.ifElse(emptyRow, stripped)) { cleaned =>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
withResource(cudf.Scalar.fromString("\n")) { lineSep =>
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
withResource(cleaned.stringContains(lineSep)) { inputHas =>
withResource(inputHas.any()) { anyLineSep =>
if (anyLineSep.isValid && anyLineSep.getBoolean) {
throw new IllegalArgumentException("We cannot currently support parsing " +
"JSON that contains a line separator in it")
}
}
}
(isNullOrEmptyInput, cleaned.joinStrings(lineSep, emptyRow))
}
}
}
}
}
}

private def processFieldNames(names: Seq[(String, DataType)]): Seq[(String, DataType)] = {
val existingNames = Set[String]()
// for duplicated field names, only keep the one with the largest index
names.foldRight(Seq[(String, DataType)]())((elem, acc) => {
val (name, dtype) = elem
if (existingNames(name)) (null, dtype)+:acc else {existingNames += name; (name, dtype)+:acc}})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we make the formatting less dense so it is simpler to read?

names.foldRight(Seq.empty) { (elem, acc) =>
  val (name, dtype) = elem
  if (existingNames(name)) {
    (null, dtype) +: acc
  } else {
    existingNames += name
    (name, dtype) +: acc
  }
}

}
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved

override protected def doColumnar(input: GpuColumnVector): cudf.ColumnVector = {
MapUtils.extractRawMapFromJsonString(input.getBase)
schema match {
case _: MapType =>
MapUtils.extractRawMapFromJsonString(input.getBase)
case struct: StructType => {
// We cannot handle all corner cases with this right now. The parser just isn't
// good enough, but we will try to handle a few common ones.
val numRows = input.getRowCount.toInt

// Step 1: verify and preprocess the data to clean it up and normalize a few things
// Step 2: Concat the data into a single buffer
val (isNullOrEmpty, combined) = cleanAndConcat(input.getBase)
withResource(isNullOrEmpty) { isNullOrEmpty =>
// Step 3: copy the data back to the host so we can parse it.
val combinedHost = withResource(combined) { combined =>
combined.copyToHost()
}
// Step 4: Have cudf parse the JSON data
val (names, rawTable) = withResource(combinedHost) { combinedHost =>
val data = combinedHost.getData
val start = combinedHost.getStartListOffset(0)
val end = combinedHost.getEndListOffset(0)
val length = end - start

withResource(cudf.Table.readJSON(cudf.JSONOptions.DEFAULT, data, start,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is having CUDF do name and type inference. Is that really what we want? Should we do this like we do for regular JSON parsing? (Never mind turns out we do the same thing in the JSON reader??? What are we doing that? It is a huge waste of memory. Can we please file a follow on issue to fix it both here and in the JSON reader. Bonus points if we can combine the code reader code together.

length)) { tableWithMeta =>
val names = tableWithMeta.getColumnNames
(names, tableWithMeta.releaseTable())
}
}

// process duplicated field names in input struct schema
val fieldNames = processFieldNames(struct.fields.map { field =>
(field.name, field.dataType)})

withResource(rawTable) { rawTable =>
// Step 5: verify that the data looks correct
if (rawTable.getRowCount != numRows) {
throw new IllegalStateException("The input data didn't parse correctly and we read " +
s"a different number of rows than was expected. Expected $numRows, " +
s"but got ${rawTable.getRowCount}")
}

// Step 6: get the data based on input struct schema
val columns = fieldNames.safeMap { case (name, dtype) =>
val i = names.indexOf(name)
if (i == -1) {
GpuColumnVector.columnVectorFromNull(numRows, dtype)
} else {
rawTable.getColumn(i).incRefCount
}
}

revans2 marked this conversation as resolved.
Show resolved Hide resolved
// Step 7: turn the data into a Struct
withResource(columns) { columns =>
withResource(cudf.ColumnVector.makeStruct(columns: _*)) { structData =>
// Step 8: put nulls back in for nulls and empty strings
withResource(GpuScalar.from(null, struct)) { nullVal =>
isNullOrEmpty.ifElse(nullVal, structData)
}
}
}
}
}
}
case _ => throw new IllegalArgumentException(
s"GpuJsonToStructs currently does not support schema of type ${schema}.")
}
}

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
Expand Down
2 changes: 1 addition & 1 deletion tools/generated_files/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ IsNotNull,S,`isnotnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,N
IsNull,S,`isnull`,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS
IsNull,S,`isnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,jsonStr,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NS,PS,NS,NA
JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NS,PS,PS,NA
JsonTuple,S,`json_tuple`,None,project,json,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,S,`json_tuple`,None,project,field,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,S,`json_tuple`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
Expand Down