Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] support json to struct function #8174

Merged
merged 10 commits into from
May 1, 2023
10 changes: 8 additions & 2 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,15 +313,21 @@ The following input is invalid and will cause error:
{"name": Justin", "age":19}
```

Reading input with duplicated json key names is also incompatible with CPU Spark.

### JSON supporting types

In the current version, nested types (array, struct, and map types) are not yet supported in regular JSON parsing.

### `from_json` function

This particular function supports to output a map type with limited functionalities. In particular, the output map is not resulted from a regular JSON parsing but instead it will just contain plain text of key-value pairs extracted directly from the input JSON string.
This particular function supports to output a map or struct type with limited functionalities.

For struct output type, the function only supports struct of struct, array, string and int types. The output is incompatible if duplicated json key names are present in the input strings. For schemas that include IntegerType,
if arbitrarily large numbers are specified in the JSON strings, the GPU implementation will cast the numbers to
IntegerType, whereas CPU Spark will return null.

Due to such limitations, the input JSON schema must be `MAP<STRING,STRING>` and nothing else. Furthermore, there is no validation, no error tolerance, no data conversion as well as string formatting is performed. This may lead to some minor differences in the output if compared to the result of Spark CPU's `from_json`, such as:
In particular, the output map is not resulted from a regular JSON parsing but instead it will just contain plain text of key-value pairs extracted directly from the input JSON string. Due to such limitations, the input JSON map type schema must be `MAP<STRING,STRING>` and nothing else. Furthermore, there is no validation, no error tolerance, no data conversion as well as string formatting is performed. This may lead to some minor differences in the output if compared to the result of Spark CPU's `from_json`, such as:
* Floating point numbers in the input JSON string such as `1.2000` will not be reformatted to `1.2`. Instead, the output will be the same as the input.
* If the input JSON is given as multiple rows, any row containing invalid JSON format will lead to an application crash. On the other hand, Spark CPU version just produces nulls for the invalid rows, as shown below:
```
Expand Down
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -7890,8 +7890,8 @@ are limited.
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS<br/>unsupported child types BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT</em></td>
<td><b>NS</b></td>
<td><em>PS<br/>MAP only supports keys and values that are of STRING type;<br/>unsupported child types BOOLEAN, BYTE, SHORT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>unsupported child types BOOLEAN, BYTE, SHORT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td> </td>
</tr>
<tr>
Expand Down
50 changes: 49 additions & 1 deletion integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,4 +367,52 @@ def test_from_json_map():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json(f.col('a'), 'MAP<STRING,STRING>')),
conf={"spark.rapids.sql.expression.JsonToStructs": "true"})
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@allow_non_gpu('ProjectExec', 'JsonToStructs')
def test_from_json_map_fallback():
# The test here is working around some inconsistencies in how the keys are parsed for maps
# on the GPU the keys are dense, but on the CPU they are sparse
json_string_gen = StringGen(r'{"a": \d\d}')
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json(f.col('a'), 'MAP<STRING,INT>')),
'JsonToStructs',
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('data_gen', [StringGen(r'{"a": "[0-9]{0,5}", "b": "[A-Z]{0,5}", "c": 1234}')])
@pytest.mark.parametrize('schema', [StructType([StructField("a", StringType())]),
StructType([StructField("d", StringType())]),
StructType([StructField("a", StringType()), StructField("b", StringType())]),
StructType([StructField("c", IntegerType()), StructField("a", StringType())]),
StructType([StructField("a", StringType()), StructField("a", StringType())])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only see tests for String, Long, and Struct. If we say that we support the other types we really should have tests for them. This needs to include things like STRUCT of STRUCTs and STURCTs of LISTS.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we say that we support all of the types in our meta object, then we need tests for all of the data types that JSON supports in Spark

https://github.com/apache/spark/blob/4a238cd9d8e80eed06732fc52b1456cb5ece6652/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala#L193-L385

I personally would rather see us start with a few simple types and add more as we add tests for them. So if we have tests for String, Int, array and struct, then we should only say that we support those types. We can add in support for boolean, byte, short, long, decimal (which needs to include multiple precision and scale types), Float, Double, Timestamp, TimestampNTZ, Date, Binary, CalendarInterval, YearMonthInterval, DayTimeInterval, UDT and NullTypes when a customer/management asks for them or when we have tests that show that they are working correctly.

])
def test_from_json_struct(data_gen, schema):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen) \
.select(f.from_json(f.col('a'), schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('data_gen', [StringGen(r'{"teacher": "Alice", "student": {"name": "Bob", "age": 20}}')])
@pytest.mark.parametrize('schema', [StructType([StructField("teacher", StringType())]),
StructType([StructField("student", StructType([StructField("name", StringType()), \
StructField("age", IntegerType())]))])])
def test_from_json_struct_of_struct(data_gen, schema):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen) \
.select(f.from_json(f.col('a'), schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('data_gen', [StringGen(r'{"teacher": "Alice", "student": \[{"name": "Bob", "class": "junior"},' \
r'{"name": "Charlie", "class": "freshman"}\]}')])
@pytest.mark.parametrize('schema', [StructType([StructField("teacher", StringType())]),
StructType([StructField("student", ArrayType(StructType([StructField("name", StringType()), \
StructField("class", StringType())])))]),
StructType([StructField("teacher", StringType()), \
StructField("student", ArrayType(StructType([StructField("name", StringType()), \
StructField("class", StringType())])))])])
def test_from_json_struct_of_list(data_gen, schema):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen) \
.select(f.from_json(f.col('a'), schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})
Original file line number Diff line number Diff line change
Expand Up @@ -3373,14 +3373,25 @@ object GpuOverrides extends Logging {
expr[JsonToStructs](
"Returns a struct value with the given `jsonStr` and `schema`",
ExprChecks.projectOnly(
TypeSig.MAP.nested(TypeSig.STRING),
TypeSig.MAP.nested(TypeSig.STRING).withPsNote(TypeEnum.MAP,
"MAP only supports keys and values that are of STRING type") +
TypeSig.STRUCT.nested(TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.STRING + TypeSig.INT),
(TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY).nested(TypeSig.all),
Seq(ParamCheck("jsonStr", TypeSig.STRING, TypeSig.STRING))),
(a, conf, p, r) => new UnaryExprMeta[JsonToStructs](a, conf, p, r) {
override def tagExprForGpu(): Unit =
a.schema match {
case MapType(_: StringType, _: StringType, _) => ()
case MapType(kt, vt, _) => {
willNotWorkOnGpu("JsonToStructs only supports MapType<StringType, StringType> for " +
s"input MapType schema, but received MapType<$kt, $vt>")
}
case _ => ()
}
GpuJsonScan.tagJsonToStructsSupport(a.options, this)

override def convertToGpu(child: Expression): GpuExpression =
// GPU implementation currently does not support duplicated json key names in input
revans2 marked this conversation as resolved.
Show resolved Hide resolved
GpuJsonToStructs(a.schema, a.options, child, a.timeZoneId)
}).disabledByDefault("parsing JSON from a column has a large number of issues and " +
"should be considered beta quality right now."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@

package org.apache.spark.sql.rapids

import scala.collection.mutable.{ArrayBuffer, Set}

import ai.rapids.cudf
import com.nvidia.spark.rapids.{GpuColumnVector, GpuUnaryExpression}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuScalar, GpuUnaryExpression}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuCast.doCast
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import com.nvidia.spark.rapids.jni.MapUtils

import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, NullIntolerant, TimeZoneAwareExpression}
import org.apache.spark.sql.types.{AbstractDataType, DataType, StringType}
// import org.apache.spark.sql.types.{AbstractDataType, DataType, MapType, StringType, StructType}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: delete commented out code.

import org.apache.spark.sql.types._

case class GpuJsonToStructs(
schema: DataType,
Expand All @@ -30,8 +36,156 @@ case class GpuJsonToStructs(
timeZoneId: Option[String] = None)
extends GpuUnaryExpression with TimeZoneAwareExpression with ExpectsInputTypes
with NullIntolerant {

private def cleanAndConcat(input: cudf.ColumnVector): (cudf.ColumnVector, cudf.ColumnVector) ={
withResource(cudf.Scalar.fromString("{}")) { emptyRow =>
val stripped = withResource(cudf.Scalar.fromString(" ")) { space =>
input.strip(space)
}
withResource(stripped) { stripped =>
val isNullOrEmptyInput = withResource(input.isNull) { isNull =>
val isEmpty = withResource(stripped.getCharLengths) { lengths =>
withResource(cudf.Scalar.fromInt(0)) { zero =>
lengths.lessOrEqualTo(zero)
}
}
withResource(isEmpty) { isEmpty =>
isNull.binaryOp(cudf.BinaryOp.NULL_LOGICAL_OR, isEmpty, cudf.DType.BOOL8)
}
}
closeOnExcept(isNullOrEmptyInput) { _ =>
withResource(isNullOrEmptyInput.ifElse(emptyRow, stripped)) { cleaned =>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
withResource(cudf.Scalar.fromString("\n")) { lineSep =>
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
withResource(cudf.Scalar.fromString("\r")) { returnSep =>
withResource(cleaned.stringContains(lineSep)) { inputHas =>
withResource(inputHas.any()) { anyLineSep =>
if (anyLineSep.isValid && anyLineSep.getBoolean) {
throw new IllegalArgumentException("We cannot currently support parsing " +
"JSON that contains a line separator in it")
}
}
}
withResource(cleaned.stringContains(returnSep)) { inputHas =>
withResource(inputHas.any()) { anyReturnSep =>
if (anyReturnSep.isValid && anyReturnSep.getBoolean) {
throw new IllegalArgumentException("We cannot currently support parsing " +
"JSON that contains a carriage return in it")
}
}
}
}
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
(isNullOrEmptyInput, cleaned.joinStrings(lineSep, emptyRow))
}
}
}
}
}
}

// Process a sequence of field names. If there are duplicated field names, we only keep the field
// name with the largest index in the sequence, for others, replace the field names with null.
// Example:
// Input = [("a", StringType), ("b", StringType), ("a", IntegerType)]
// Output = [(null, StringType), ("b", StringType), ("a", IntegerType)]
private def processFieldNames(names: Seq[(String, DataType)]): Seq[(String, DataType)] = {
val existingNames = Set[String]()
names.foldRight(Seq[(String, DataType)]())((elem, acc) => {
val (name, dtype) = elem
if (existingNames(name)) (null, dtype)+:acc else {existingNames += name; (name, dtype)+:acc}})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we make the formatting less dense so it is simpler to read?

names.foldRight(Seq.empty) { (elem, acc) =>
  val (name, dtype) = elem
  if (existingNames(name)) {
    (null, dtype) +: acc
  } else {
    existingNames += name
    (name, dtype) +: acc
  }
}

gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
}

private def getSparkType(col: cudf.ColumnView): DataType = {
col.getType match {
case cudf.DType.INT8 | cudf.DType.UINT8 => ByteType
case cudf.DType.INT16 | cudf.DType.UINT16 => ShortType
case cudf.DType.INT32 | cudf.DType.UINT32 => IntegerType
case cudf.DType.INT64 | cudf.DType.UINT64 => LongType
case cudf.DType.FLOAT32 => FloatType
case cudf.DType.FLOAT64 => DoubleType
case cudf.DType.BOOL8 => BooleanType
case cudf.DType.STRING => StringType
case cudf.DType.LIST => ArrayType(getSparkType(col.getChildColumnView(0)))
case cudf.DType.STRUCT =>
val structFields = ArrayBuffer.empty[StructField]
(0 until col.getNumChildren).foreach { i =>
val child = col.getChildColumnView(i)
structFields += StructField("", getSparkType(child))
}
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
StructType(structFields)
case t => throw new IllegalArgumentException(
s"GpuJsonToStructs currently cannot process CUDF column of type $t.")
}
}

override protected def doColumnar(input: GpuColumnVector): cudf.ColumnVector = {
MapUtils.extractRawMapFromJsonString(input.getBase)
schema match {
case _: MapType =>
MapUtils.extractRawMapFromJsonString(input.getBase)
case struct: StructType => {
// We cannot handle all corner cases with this right now. The parser just isn't
// good enough, but we will try to handle a few common ones.
val numRows = input.getRowCount.toInt

// Step 1: verify and preprocess the data to clean it up and normalize a few things
// Step 2: Concat the data into a single buffer
val (isNullOrEmpty, combined) = cleanAndConcat(input.getBase)
withResource(isNullOrEmpty) { isNullOrEmpty =>
// Step 3: copy the data back to the host so we can parse it.
val combinedHost = withResource(combined) { combined =>
combined.copyToHost()
}
// Step 4: Have cudf parse the JSON data
val (names, rawTable) = withResource(combinedHost) { combinedHost =>
val data = combinedHost.getData
val start = combinedHost.getStartListOffset(0)
val end = combinedHost.getEndListOffset(0)
val length = end - start

withResource(cudf.Table.readJSON(cudf.JSONOptions.DEFAULT, data, start,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is having CUDF do name and type inference. Is that really what we want? Should we do this like we do for regular JSON parsing? (Never mind turns out we do the same thing in the JSON reader??? What are we doing that? It is a huge waste of memory. Can we please file a follow on issue to fix it both here and in the JSON reader. Bonus points if we can combine the code reader code together.

length)) { tableWithMeta =>
val names = tableWithMeta.getColumnNames
(names, tableWithMeta.releaseTable())
}
}

// process duplicated field names in input struct schema
val fieldNames = processFieldNames(struct.fields.map { field =>
(field.name, field.dataType)})

withResource(rawTable) { rawTable =>
// Step 5: verify that the data looks correct
if (rawTable.getRowCount != numRows) {
throw new IllegalStateException("The input data didn't parse correctly and we read " +
s"a different number of rows than was expected. Expected $numRows, " +
s"but got ${rawTable.getRowCount}")
}

// Step 6: get the data based on input struct schema
val columns = fieldNames.safeMap { case (name, dtype) =>
val i = names.indexOf(name)
if (i == -1) {
GpuColumnVector.columnVectorFromNull(numRows, dtype)
} else {
val col = rawTable.getColumn(i)
doCast(col, getSparkType(col), dtype, false, false, false)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
}
}

revans2 marked this conversation as resolved.
Show resolved Hide resolved
// Step 7: turn the data into a Struct
withResource(columns) { columns =>
withResource(cudf.ColumnVector.makeStruct(columns: _*)) { structData =>
// Step 8: put nulls back in for nulls and empty strings
withResource(GpuScalar.from(null, struct)) { nullVal =>
isNullOrEmpty.ifElse(nullVal, structData)
}
}
}
}
}
}
case _ => throw new IllegalArgumentException(
s"GpuJsonToStructs currently does not support schema of type $schema.")
}
}

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
Expand Down
2 changes: 1 addition & 1 deletion tools/generated_files/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ IsNotNull,S,`isnotnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,N
IsNull,S,`isnull`,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS
IsNull,S,`isnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,jsonStr,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NS,PS,NS,NA
JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NS,PS,PS,NA
JsonTuple,S,`json_tuple`,None,project,json,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,S,`json_tuple`,None,project,field,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,S,`json_tuple`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
Expand Down