Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added in very specific support for from_json to a Map<String,String> #6211

Merged
merged 5 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.IsNaN"></a>spark.rapids.sql.expression.IsNaN|`isnan`|Checks if a value is NaN|true|None|
<a name="sql.expression.IsNotNull"></a>spark.rapids.sql.expression.IsNotNull|`isnotnull`|Checks if a value is not null|true|None|
<a name="sql.expression.IsNull"></a>spark.rapids.sql.expression.IsNull|`isnull`|Checks if a value is null|true|None|
<a name="sql.expression.JsonToStructs"></a>spark.rapids.sql.expression.JsonToStructs|`from_json`|Returns a struct value with the given `jsonStr` and `schema`|false|This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.|
<a name="sql.expression.KnownFloatingPointNormalized"></a>spark.rapids.sql.expression.KnownFloatingPointNormalized| |Tag to prevent redundant normalization|true|None|
<a name="sql.expression.KnownNotNull"></a>spark.rapids.sql.expression.KnownNotNull| |Tag an expression as known to not be null|true|None|
<a name="sql.expression.Lag"></a>spark.rapids.sql.expression.Lag|`lag`|Window function that returns N entries behind this one|true|None|
Expand Down
67 changes: 57 additions & 10 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -7590,22 +7590,22 @@ are limited.
<td> </td>
</tr>
<tr>
<td rowSpan="2">KnownFloatingPointNormalized</td>
<td rowSpan="2"> </td>
<td rowSpan="2">Tag to prevent redundant normalization</td>
<td rowSpan="2">None</td>
<td rowSpan="2">JsonToStructs</td>
<td rowSpan="2">`from_json`</td>
<td rowSpan="2">Returns a struct value with the given `jsonStr` and `schema`</td>
<td rowSpan="2">This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.</td>
<td rowSpan="2">project</td>
<td>input</td>
<td>jsonStr</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
Expand All @@ -7622,9 +7622,6 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
Expand All @@ -7634,6 +7631,9 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS<br/>unsupported child types BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT</em></td>
<td><b>NS</b></td>
<td> </td>
</tr>
<tr>
Expand Down Expand Up @@ -7663,6 +7663,53 @@ are limited.
<th>UDT</th>
</tr>
<tr>
<td rowSpan="2">KnownFloatingPointNormalized</td>
<td rowSpan="2"> </td>
<td rowSpan="2">Tag to prevent redundant normalization</td>
<td rowSpan="2">None</td>
<td rowSpan="2">project</td>
<td>input</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td>result</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td rowSpan="2">KnownNotNull</td>
<td rowSpan="2"> </td>
<td rowSpan="2">Tag an expression as known to not be null</td>
Expand Down
11 changes: 11 additions & 0 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,14 @@ def test_json_read_count(spark_tmp_path, v1_enabled_list):
assert_gpu_and_cpu_row_counts_equal(
lambda spark : spark.read.schema(schema).json(data_path),
conf=updated_conf)

def test_from_json_map():
# The test here is working around some inconsistencies in how the keys are parsed for maps
# on the GPU the keys are dense, but on the CPU they are sparse
json_string_gen = StringGen("{\"a\": \"[0-9]{0,5}\"(, \"b\": \"[A-Z]{0,5}\")?}")
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen)\
.selectExpr("from_json(a, \"MAP<STRING,STRING>\") as parsed")\
.selectExpr("parsed[\"a\"] as pa", "parsed[\"b\"] as pb"),
conf={"spark.rapids.sql.expression.JsonToStructs": "true"})

Original file line number Diff line number Diff line change
Expand Up @@ -3560,6 +3560,20 @@ object GpuOverrides extends Logging {
GpuGetJsonObject(lhs, rhs)
}
),
expr[JsonToStructs](
"Returns a struct value with the given `jsonStr` and `schema`",
ExprChecks.projectOnly(
TypeSig.MAP.nested(TypeSig.STRING),
(TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY).nested(TypeSig.all),
Seq(ParamCheck("jsonStr", TypeSig.STRING, TypeSig.STRING))),
(a, conf, p, r) => new UnaryExprMeta[JsonToStructs](a, conf, p, r) {
override def tagExprForGpu(): Unit =
GpuJsonScan.tagJsonToStructsSupport(a.options, this)

override def convertToGpu(child: Expression): GpuExpression =
GpuJsonToStructs(a.schema, a.options, child, a.timeZoneId)
}).disabledByDefault("parsing JSON from a column has a large number of issues and " +
"should be considered beta quality right now."),
expr[org.apache.spark.sql.execution.ScalarSubquery](
"Subquery that will return only one row and one column",
ExprChecks.projectOnly(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,71 +56,89 @@ object GpuJsonScan {
scanMeta)
}

def tagSupport(
sparkSession: SparkSession,
dataSchema: StructType,
readSchema: StructType,
options: Map[String, String],
def tagSupportOptions(
options: JSONOptionsInRead,
meta: RapidsMeta[_, _, _]): Unit = {

val parsedOptions = new JSONOptionsInRead(
options,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)

if (!meta.conf.isJsonEnabled) {
meta.willNotWorkOnGpu("JSON input and output has been disabled. To enable set " +
s"${RapidsConf.ENABLE_JSON} to true")
}

if (!meta.conf.isJsonReadEnabled) {
meta.willNotWorkOnGpu("JSON input has been disabled. To enable set " +
s"${RapidsConf.ENABLE_JSON_READ} to true. Please note that, currently json reader does " +
s"not support column prune, so user must specify the full schema or just let spark to " +
s"infer the schema")
}

if (parsedOptions.multiLine) {
if (options.multiLine) {
meta.willNotWorkOnGpu("GpuJsonScan does not support multiLine")
}

// {"name": /* hello */ "Reynold Xin"} is not supported by CUDF
if (parsedOptions.allowComments) {
if (options.allowComments) {
meta.willNotWorkOnGpu("GpuJsonScan does not support allowComments")
}

// {name: 'Reynold Xin'} is not supported by CUDF
if (parsedOptions.allowUnquotedFieldNames) {
if (options.allowUnquotedFieldNames) {
meta.willNotWorkOnGpu("GpuJsonScan does not support allowUnquotedFieldNames")
}

// {'name': 'Reynold Xin'} is not supported by CUDF
if (options.get("allowSingleQuotes").map(_.toBoolean).getOrElse(false)) {
// This is different because the default for this is true, but we don't support it so we lie...
if (options.parameters.get("allowSingleQuotes").map(_.toBoolean).getOrElse(false)) {
meta.willNotWorkOnGpu("GpuJsonScan does not support allowSingleQuotes")
}

// {"name": "Cazen Lee", "price": "\$10"} is not supported by CUDF
if (parsedOptions.allowBackslashEscapingAnyCharacter) {
if (options.allowBackslashEscapingAnyCharacter) {
meta.willNotWorkOnGpu("GpuJsonScan does not support allowBackslashEscapingAnyCharacter")
}

// {"a":null, "b":1, "c":3.0}, Spark will drop column `a` if dropFieldIfAllNull is enabled.
if (parsedOptions.dropFieldIfAllNull) {
if (options.dropFieldIfAllNull) {
meta.willNotWorkOnGpu("GpuJsonScan does not support dropFieldIfAllNull")
}

if (parsedOptions.parseMode != PermissiveMode) {
if (options.parseMode != PermissiveMode) {
meta.willNotWorkOnGpu("GpuJsonScan only supports Permissive JSON parsing")
}

if (parsedOptions.lineSeparator.getOrElse("\n") != "\n") {
if (options.lineSeparator.getOrElse("\n") != "\n") {
meta.willNotWorkOnGpu("GpuJsonScan only supports \"\\n\" as a line separator")
}

parsedOptions.encoding.foreach(enc =>
options.encoding.foreach(enc =>
if (enc != StandardCharsets.UTF_8.name() && enc != StandardCharsets.US_ASCII.name()) {
meta.willNotWorkOnGpu("GpuJsonScan only supports UTF8 or US-ASCII encoded data")
})
meta.willNotWorkOnGpu("GpuJsonScan only supports UTF8 or US-ASCII encoded data")
})
}

def tagJsonToStructsSupport(options:Map[String, String],
meta: RapidsMeta[_, _, _]): Unit = {
val parsedOptions = new JSONOptionsInRead(
options,
SQLConf.get.sessionLocalTimeZone,
SQLConf.get.columnNameOfCorruptRecord)

tagSupportOptions(parsedOptions, meta)
}

def tagSupport(
sparkSession: SparkSession,
dataSchema: StructType,
readSchema: StructType,
options: Map[String, String],
meta: RapidsMeta[_, _, _]): Unit = {

val parsedOptions = new JSONOptionsInRead(
options,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)

if (!meta.conf.isJsonEnabled) {
meta.willNotWorkOnGpu("JSON input and output has been disabled. To enable set " +
s"${RapidsConf.ENABLE_JSON} to true")
}

if (!meta.conf.isJsonReadEnabled) {
meta.willNotWorkOnGpu("JSON input has been disabled. To enable set " +
s"${RapidsConf.ENABLE_JSON_READ} to true. Please note that, currently json reader does " +
s"not support column prune, so user must specify the full schema or just let spark to " +
s"infer the schema")
}

tagSupportOptions(parsedOptions, meta)

val types = readSchema.map(_.dataType)
if (types.contains(DateType)) {
Expand All @@ -136,17 +154,17 @@ object GpuJsonScan {

if (!meta.conf.isJsonFloatReadEnabled && types.contains(FloatType)) {
meta.willNotWorkOnGpu("JSON reading is not 100% compatible when reading floats. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_FLOATS} to true.")
s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_FLOATS} to true.")
}

if (!meta.conf.isJsonDoubleReadEnabled && types.contains(DoubleType)) {
meta.willNotWorkOnGpu("JSON reading is not 100% compatible when reading doubles. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_DOUBLES} to true.")
s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_DOUBLES} to true.")
}

if (!meta.conf.isJsonDecimalReadEnabled && types.exists(_.isInstanceOf[DecimalType])) {
meta.willNotWorkOnGpu("JSON reading is not 100% compatible when reading decimals. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_DECIMALS} to true.")
s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_DECIMALS} to true.")
}

dataSchema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { corruptFieldIndex =>
Expand Down
Loading