diff --git a/docs/configs.md b/docs/configs.md
index edcf1bcc621..1d4a7e69345 100644
--- a/docs/configs.md
+++ b/docs/configs.md
@@ -249,7 +249,6 @@ Name | SQL Function(s) | Description | Default Value | Notes
spark.rapids.sql.expression.IsNaN|`isnan`|Checks if a value is NaN|true|None|
spark.rapids.sql.expression.IsNotNull|`isnotnull`|Checks if a value is not null|true|None|
spark.rapids.sql.expression.IsNull|`isnull`|Checks if a value is null|true|None|
-spark.rapids.sql.expression.JsonToStructs|`from_json`|Returns a struct value with the given `jsonStr` and `schema`|false|This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.|
spark.rapids.sql.expression.KnownFloatingPointNormalized| |Tag to prevent redundant normalization|true|None|
spark.rapids.sql.expression.KnownNotNull| |Tag an expression as known to not be null|true|None|
spark.rapids.sql.expression.Lag|`lag`|Window function that returns N entries behind this one|true|None|
diff --git a/docs/supported_ops.md b/docs/supported_ops.md
index d2c8b5ee370..64f12141e2b 100644
--- a/docs/supported_ops.md
+++ b/docs/supported_ops.md
@@ -7684,53 +7684,6 @@ are limited.
|
-JsonToStructs |
-`from_json` |
-Returns a struct value with the given `jsonStr` and `schema` |
-This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now. |
-project |
-jsonStr |
- |
- |
- |
- |
- |
- |
- |
- |
- |
-S |
- |
- |
- |
- |
- |
- |
- |
- |
-
-
-result |
- |
- |
- |
- |
- |
- |
- |
- |
- |
- |
- |
- |
- |
- |
-NS |
-PS unsupported child types BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT |
-NS |
- |
-
-
KnownFloatingPointNormalized |
|
Tag to prevent redundant normalization |
diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py
index 18d1eadbd7c..6fe5e800291 100644
--- a/integration_tests/src/main/python/json_test.py
+++ b/integration_tests/src/main/python/json_test.py
@@ -359,14 +359,3 @@ def test_json_read_count(spark_tmp_path, v1_enabled_list):
assert_gpu_and_cpu_row_counts_equal(
lambda spark : spark.read.schema(schema).json(data_path),
conf=updated_conf)
-
-def test_from_json_map():
- # The test here is working around some inconsistencies in how the keys are parsed for maps
- # on the GPU the keys are dense, but on the CPU they are sparse
- json_string_gen = StringGen("{\"a\": \"[0-9]{0,5}\"(, \"b\": \"[A-Z]{0,5}\")?}")
- assert_gpu_and_cpu_are_equal_collect(
- lambda spark : unary_op_df(spark, json_string_gen)\
- .selectExpr("from_json(a, \"MAP\") as parsed")\
- .selectExpr("parsed[\"a\"] as pa", "parsed[\"b\"] as pb"),
- conf={"spark.rapids.sql.expression.JsonToStructs": "true"})
-
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
index 8657a707b35..402f31af96b 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
@@ -3532,20 +3532,6 @@ object GpuOverrides extends Logging {
GpuGetJsonObject(lhs, rhs)
}
),
- expr[JsonToStructs](
- "Returns a struct value with the given `jsonStr` and `schema`",
- ExprChecks.projectOnly(
- TypeSig.MAP.nested(TypeSig.STRING),
- (TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY).nested(TypeSig.all),
- Seq(ParamCheck("jsonStr", TypeSig.STRING, TypeSig.STRING))),
- (a, conf, p, r) => new UnaryExprMeta[JsonToStructs](a, conf, p, r) {
- override def tagExprForGpu(): Unit =
- GpuJsonScan.tagJsonToStructsSupport(a.options, this)
-
- override def convertToGpu(child: Expression): GpuExpression =
- GpuJsonToStructs(a.schema, a.options, child, a.timeZoneId)
- }).disabledByDefault("parsing JSON from a column has a large number of issues and " +
- "should be considered beta quality right now."),
expr[org.apache.spark.sql.execution.ScalarSubquery](
"Subquery that will return only one row and one column",
ExprChecks.projectOnly(
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala
index 4dabbe52a9f..41a32ffd246 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala
@@ -56,89 +56,71 @@ object GpuJsonScan {
scanMeta)
}
- def tagSupportOptions(
- options: JSONOptionsInRead,
+ def tagSupport(
+ sparkSession: SparkSession,
+ dataSchema: StructType,
+ readSchema: StructType,
+ options: Map[String, String],
meta: RapidsMeta[_, _, _]): Unit = {
- if (options.multiLine) {
+ val parsedOptions = new JSONOptionsInRead(
+ options,
+ sparkSession.sessionState.conf.sessionLocalTimeZone,
+ sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+
+ if (!meta.conf.isJsonEnabled) {
+ meta.willNotWorkOnGpu("JSON input and output has been disabled. To enable set " +
+ s"${RapidsConf.ENABLE_JSON} to true")
+ }
+
+ if (!meta.conf.isJsonReadEnabled) {
+ meta.willNotWorkOnGpu("JSON input has been disabled. To enable set " +
+ s"${RapidsConf.ENABLE_JSON_READ} to true. Please note that, currently json reader does " +
+ s"not support column prune, so user must specify the full schema or just let spark to " +
+ s"infer the schema")
+ }
+
+ if (parsedOptions.multiLine) {
meta.willNotWorkOnGpu("GpuJsonScan does not support multiLine")
}
// {"name": /* hello */ "Reynold Xin"} is not supported by CUDF
- if (options.allowComments) {
+ if (parsedOptions.allowComments) {
meta.willNotWorkOnGpu("GpuJsonScan does not support allowComments")
}
// {name: 'Reynold Xin'} is not supported by CUDF
- if (options.allowUnquotedFieldNames) {
+ if (parsedOptions.allowUnquotedFieldNames) {
meta.willNotWorkOnGpu("GpuJsonScan does not support allowUnquotedFieldNames")
}
// {'name': 'Reynold Xin'} is not supported by CUDF
- // This is different because the default for this is true, but we don't support it so we lie...
- if (options.parameters.get("allowSingleQuotes").map(_.toBoolean).getOrElse(false)) {
+ if (options.get("allowSingleQuotes").map(_.toBoolean).getOrElse(false)) {
meta.willNotWorkOnGpu("GpuJsonScan does not support allowSingleQuotes")
}
// {"name": "Cazen Lee", "price": "\$10"} is not supported by CUDF
- if (options.allowBackslashEscapingAnyCharacter) {
+ if (parsedOptions.allowBackslashEscapingAnyCharacter) {
meta.willNotWorkOnGpu("GpuJsonScan does not support allowBackslashEscapingAnyCharacter")
}
// {"a":null, "b":1, "c":3.0}, Spark will drop column `a` if dropFieldIfAllNull is enabled.
- if (options.dropFieldIfAllNull) {
+ if (parsedOptions.dropFieldIfAllNull) {
meta.willNotWorkOnGpu("GpuJsonScan does not support dropFieldIfAllNull")
}
- if (options.parseMode != PermissiveMode) {
+ if (parsedOptions.parseMode != PermissiveMode) {
meta.willNotWorkOnGpu("GpuJsonScan only supports Permissive JSON parsing")
}
- if (options.lineSeparator.getOrElse("\n") != "\n") {
+ if (parsedOptions.lineSeparator.getOrElse("\n") != "\n") {
meta.willNotWorkOnGpu("GpuJsonScan only supports \"\\n\" as a line separator")
}
- options.encoding.foreach(enc =>
+ parsedOptions.encoding.foreach(enc =>
if (enc != StandardCharsets.UTF_8.name() && enc != StandardCharsets.US_ASCII.name()) {
- meta.willNotWorkOnGpu("GpuJsonScan only supports UTF8 or US-ASCII encoded data")
- })
- }
-
- def tagJsonToStructsSupport(options:Map[String, String],
- meta: RapidsMeta[_, _, _]): Unit = {
- val parsedOptions = new JSONOptionsInRead(
- options,
- SQLConf.get.sessionLocalTimeZone,
- SQLConf.get.columnNameOfCorruptRecord)
-
- tagSupportOptions(parsedOptions, meta)
- }
-
- def tagSupport(
- sparkSession: SparkSession,
- dataSchema: StructType,
- readSchema: StructType,
- options: Map[String, String],
- meta: RapidsMeta[_, _, _]): Unit = {
-
- val parsedOptions = new JSONOptionsInRead(
- options,
- sparkSession.sessionState.conf.sessionLocalTimeZone,
- sparkSession.sessionState.conf.columnNameOfCorruptRecord)
-
- if (!meta.conf.isJsonEnabled) {
- meta.willNotWorkOnGpu("JSON input and output has been disabled. To enable set " +
- s"${RapidsConf.ENABLE_JSON} to true")
- }
-
- if (!meta.conf.isJsonReadEnabled) {
- meta.willNotWorkOnGpu("JSON input has been disabled. To enable set " +
- s"${RapidsConf.ENABLE_JSON_READ} to true. Please note that, currently json reader does " +
- s"not support column prune, so user must specify the full schema or just let spark to " +
- s"infer the schema")
- }
-
- tagSupportOptions(parsedOptions, meta)
+ meta.willNotWorkOnGpu("GpuJsonScan only supports UTF8 or US-ASCII encoded data")
+ })
val types = readSchema.map(_.dataType)
if (types.contains(DateType)) {
@@ -154,17 +136,17 @@ object GpuJsonScan {
if (!meta.conf.isJsonFloatReadEnabled && types.contains(FloatType)) {
meta.willNotWorkOnGpu("JSON reading is not 100% compatible when reading floats. " +
- s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_FLOATS} to true.")
+ s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_FLOATS} to true.")
}
if (!meta.conf.isJsonDoubleReadEnabled && types.contains(DoubleType)) {
meta.willNotWorkOnGpu("JSON reading is not 100% compatible when reading doubles. " +
- s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_DOUBLES} to true.")
+ s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_DOUBLES} to true.")
}
if (!meta.conf.isJsonDecimalReadEnabled && types.exists(_.isInstanceOf[DecimalType])) {
meta.willNotWorkOnGpu("JSON reading is not 100% compatible when reading decimals. " +
- s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_DECIMALS} to true.")
+ s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_DECIMALS} to true.")
}
dataSchema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { corruptFieldIndex =>
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala
deleted file mode 100644
index d12f8030c38..00000000000
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Copyright (c) 2022, NVIDIA CORPORATION.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.rapids
-
-import ai.rapids.cudf
-import com.nvidia.spark.rapids.{GpuColumnVector, GpuScalar, GpuUnaryExpression}
-import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
-
-import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, NullIntolerant, TimeZoneAwareExpression}
-import org.apache.spark.sql.types.{AbstractDataType, DataType, StringType}
-
-case class GpuJsonToStructs(
- schema: DataType,
- options: Map[String, String],
- child: Expression,
- timeZoneId: Option[String] = None)
- extends GpuUnaryExpression with TimeZoneAwareExpression with ExpectsInputTypes
- with NullIntolerant {
-
- private def cleanAndConcat(input: cudf.ColumnVector): (cudf.ColumnVector, cudf.ColumnVector) ={
- withResource(cudf.Scalar.fromString("{}")) { emptyRow =>
- val stripped = withResource(cudf.Scalar.fromString(" ")) { space =>
- input.strip(space)
- }
- withResource(stripped) { stripped =>
- val isNullOrEmptyInput = withResource(input.isNull) { isNull =>
- val isEmpty = withResource(stripped.getCharLengths) { lengths =>
- withResource(cudf.Scalar.fromInt(0)) { zero =>
- lengths.lessOrEqualTo(zero)
- }
- }
- withResource(isEmpty) { isEmpty =>
- isNull.binaryOp(cudf.BinaryOp.NULL_LOGICAL_OR, isEmpty, cudf.DType.BOOL8)
- }
- }
- closeOnExcept(isNullOrEmptyInput) { _ =>
- withResource(isNullOrEmptyInput.ifElse(emptyRow, stripped)) { cleaned =>
- withResource(cudf.Scalar.fromString("\n")) { lineSep =>
- withResource(cleaned.stringContains(lineSep)) { inputHas =>
- withResource(inputHas.any()) { anyLineSep =>
- if (anyLineSep.isValid && anyLineSep.getBoolean) {
- throw new IllegalArgumentException("We cannot currently support parsing " +
- "JSON that contains a line separator in it")
- }
- }
- }
- (isNullOrEmptyInput, cleaned.joinStrings(lineSep, emptyRow))
- }
- }
- }
- }
- }
- }
-
- private def castToStrings(rawTable: cudf.Table): Seq[cudf.ColumnVector] = {
- (0 until rawTable.getNumberOfColumns).safeMap { i =>
- val col = rawTable.getColumn(i)
- if (!cudf.DType.STRING.equals(col.getType)) {
- col.castTo(cudf.DType.STRING)
- } else {
- col.incRefCount()
- }
- }
- }
-
- private def makeMap(names: Seq[String], values: Seq[cudf.ColumnVector],
- numRows: Int): cudf.ColumnVector = {
- val nameCols = names.safeMap { name =>
- withResource(cudf.Scalar.fromString(name)) { scalarName =>
- cudf.ColumnVector.fromScalar(scalarName, numRows)
- }
- }
- withResource(nameCols) { nameCols =>
- val structViews = values.zip(nameCols).safeMap {
- case (dataCol, nameCol) => cudf.ColumnView.makeStructView(nameCol, dataCol)
- }
- withResource(structViews) { structViews =>
- cudf.ColumnVector.makeList(numRows, cudf.DType.STRUCT, structViews: _*)
- }
- }
- }
-
- override protected def doColumnar(input: GpuColumnVector): cudf.ColumnVector = {
- // We cannot handle all corner cases with this right now. The parser just isn't
- // good enough, but we will try to handle a few common ones.
- val numRows = input.getRowCount.toInt
-
- // Step 1: verify and preprocess the data to clean it up and normalize a few things.
- // Step 2: Concat the data into a single buffer.
- val (isNullOrEmpty, combined) = cleanAndConcat(input.getBase)
- withResource(isNullOrEmpty) { isNullOrEmpty =>
- // Step 3: copy the data back to the host so we can parse it.
- val combinedHost = withResource(combined) { combined =>
- combined.copyToHost()
- }
- // Step 4: Have cudf parse the JSON data
- val (names, rawTable) = withResource(combinedHost) { combinedHost =>
- val data = combinedHost.getData
- val start = combinedHost.getStartListOffset(0)
- val end = combinedHost.getEndListOffset(0)
- val length = end - start
-
- withResource(cudf.Table.readJSON(cudf.JSONOptions.DEFAULT, data, start,
- length)) { tableWithMeta =>
- val names = tableWithMeta.getColumnNames
- (names, tableWithMeta.releaseTable())
- }
- }
-
- val updatedCols = withResource(rawTable) { rawTable =>
- // Step 5 verify that the data looks correct.
- if (rawTable.getRowCount != numRows) {
- throw new IllegalStateException("The input data didn't parse correctly and we read a " +
- s"different number of rows than was expected. Expected $numRows, " +
- s"but got ${rawTable.getRowCount}")
- }
- if (names.toSet.size != names.size) {
- throw new IllegalStateException("Internal Error: found duplicate key names...")
- }
-
- // Step 6: convert any non-string columns back to strings
- castToStrings(rawTable)
- }
-
- // Step 7: turn the data into a Map
- val mapData = withResource(updatedCols) { updatedCols =>
- makeMap(names, updatedCols, numRows)
- }
-
- // Step 8: put nulls back in for nulls and empty strings
- withResource(mapData) { mapData =>
- withResource(GpuScalar.from(null, dataType)) { nullVal =>
- isNullOrEmpty.ifElse(nullVal, mapData)
- }
- }
- }
- }
-
- override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
- copy(timeZoneId = Option(timeZoneId))
-
- override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
-
- override def dataType: DataType = schema.asNullable
-
- override def nullable: Boolean = true
-}
\ No newline at end of file
diff --git a/tools/src/main/resources/operatorsScore.csv b/tools/src/main/resources/operatorsScore.csv
index ce62bb87fc5..f67f0f396c4 100644
--- a/tools/src/main/resources/operatorsScore.csv
+++ b/tools/src/main/resources/operatorsScore.csv
@@ -129,7 +129,6 @@ IntegralDivide,4
IsNaN,4
IsNotNull,4
IsNull,4
-JsonToStructs,4
KnownFloatingPointNormalized,4
KnownNotNull,4
Lag,4
diff --git a/tools/src/main/resources/supportedExprs.csv b/tools/src/main/resources/supportedExprs.csv
index 7b93812944c..33fb60ec5d7 100644
--- a/tools/src/main/resources/supportedExprs.csv
+++ b/tools/src/main/resources/supportedExprs.csv
@@ -261,8 +261,6 @@ IsNotNull,S,`isnotnull`,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,P
IsNotNull,S,`isnotnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
IsNull,S,`isnull`,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS
IsNull,S,`isnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
-JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,jsonStr,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
-JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NS,PS,NS,NA
KnownFloatingPointNormalized,S, ,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S
KnownFloatingPointNormalized,S, ,None,project,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S
KnownNotNull,S, ,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,NS,S,S,PS,PS,PS,NS