From 76d78fa6734403c047129d38b05e09815e597620 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Tue, 6 Sep 2022 13:38:07 -0500 Subject: [PATCH] Revert "Added in very specific support for from_json to a Map (#6211)" This reverts commit 8b497c5b25ca80d410dee64d45d5592eba1ba307. --- docs/configs.md | 1 - docs/supported_ops.md | 47 ----- .../src/main/python/json_test.py | 11 -- .../nvidia/spark/rapids/GpuOverrides.scala | 14 -- .../catalyst/json/rapids/GpuJsonScan.scala | 90 ++++------ .../spark/sql/rapids/GpuJsonToStructs.scala | 161 ------------------ tools/src/main/resources/operatorsScore.csv | 1 - tools/src/main/resources/supportedExprs.csv | 2 - 8 files changed, 36 insertions(+), 291 deletions(-) delete mode 100644 sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala diff --git a/docs/configs.md b/docs/configs.md index edcf1bcc621..1d4a7e69345 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -249,7 +249,6 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.IsNaN|`isnan`|Checks if a value is NaN|true|None| spark.rapids.sql.expression.IsNotNull|`isnotnull`|Checks if a value is not null|true|None| spark.rapids.sql.expression.IsNull|`isnull`|Checks if a value is null|true|None| -spark.rapids.sql.expression.JsonToStructs|`from_json`|Returns a struct value with the given `jsonStr` and `schema`|false|This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.| spark.rapids.sql.expression.KnownFloatingPointNormalized| |Tag to prevent redundant normalization|true|None| spark.rapids.sql.expression.KnownNotNull| |Tag an expression as known to not be null|true|None| spark.rapids.sql.expression.Lag|`lag`|Window function that returns N entries behind this one|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index d2c8b5ee370..64f12141e2b 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -7684,53 +7684,6 @@ are limited. -JsonToStructs -`from_json` -Returns a struct value with the given `jsonStr` and `schema` -This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now. -project -jsonStr - - - - - - - - - -S - - - - - - - - - - -result - - - - - - - - - - - - - - -NS -PS
unsupported child types BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT
-NS - - - KnownFloatingPointNormalized Tag to prevent redundant normalization diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 18d1eadbd7c..6fe5e800291 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -359,14 +359,3 @@ def test_json_read_count(spark_tmp_path, v1_enabled_list): assert_gpu_and_cpu_row_counts_equal( lambda spark : spark.read.schema(schema).json(data_path), conf=updated_conf) - -def test_from_json_map(): - # The test here is working around some inconsistencies in how the keys are parsed for maps - # on the GPU the keys are dense, but on the CPU they are sparse - json_string_gen = StringGen("{\"a\": \"[0-9]{0,5}\"(, \"b\": \"[A-Z]{0,5}\")?}") - assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, json_string_gen)\ - .selectExpr("from_json(a, \"MAP\") as parsed")\ - .selectExpr("parsed[\"a\"] as pa", "parsed[\"b\"] as pb"), - conf={"spark.rapids.sql.expression.JsonToStructs": "true"}) - diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 8657a707b35..402f31af96b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3532,20 +3532,6 @@ object GpuOverrides extends Logging { GpuGetJsonObject(lhs, rhs) } ), - expr[JsonToStructs]( - "Returns a struct value with the given `jsonStr` and `schema`", - ExprChecks.projectOnly( - TypeSig.MAP.nested(TypeSig.STRING), - (TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY).nested(TypeSig.all), - Seq(ParamCheck("jsonStr", TypeSig.STRING, TypeSig.STRING))), - (a, conf, p, r) => new UnaryExprMeta[JsonToStructs](a, conf, p, r) { - override def tagExprForGpu(): Unit = - GpuJsonScan.tagJsonToStructsSupport(a.options, this) - - override def convertToGpu(child: Expression): GpuExpression = - GpuJsonToStructs(a.schema, a.options, child, a.timeZoneId) - }).disabledByDefault("parsing JSON from a column has a large number of issues and " + - "should be considered beta quality right now."), expr[org.apache.spark.sql.execution.ScalarSubquery]( "Subquery that will return only one row and one column", ExprChecks.projectOnly( diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 4dabbe52a9f..41a32ffd246 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -56,89 +56,71 @@ object GpuJsonScan { scanMeta) } - def tagSupportOptions( - options: JSONOptionsInRead, + def tagSupport( + sparkSession: SparkSession, + dataSchema: StructType, + readSchema: StructType, + options: Map[String, String], meta: RapidsMeta[_, _, _]): Unit = { - if (options.multiLine) { + val parsedOptions = new JSONOptionsInRead( + options, + sparkSession.sessionState.conf.sessionLocalTimeZone, + sparkSession.sessionState.conf.columnNameOfCorruptRecord) + + if (!meta.conf.isJsonEnabled) { + meta.willNotWorkOnGpu("JSON input and output has been disabled. To enable set " + + s"${RapidsConf.ENABLE_JSON} to true") + } + + if (!meta.conf.isJsonReadEnabled) { + meta.willNotWorkOnGpu("JSON input has been disabled. To enable set " + + s"${RapidsConf.ENABLE_JSON_READ} to true. Please note that, currently json reader does " + + s"not support column prune, so user must specify the full schema or just let spark to " + + s"infer the schema") + } + + if (parsedOptions.multiLine) { meta.willNotWorkOnGpu("GpuJsonScan does not support multiLine") } // {"name": /* hello */ "Reynold Xin"} is not supported by CUDF - if (options.allowComments) { + if (parsedOptions.allowComments) { meta.willNotWorkOnGpu("GpuJsonScan does not support allowComments") } // {name: 'Reynold Xin'} is not supported by CUDF - if (options.allowUnquotedFieldNames) { + if (parsedOptions.allowUnquotedFieldNames) { meta.willNotWorkOnGpu("GpuJsonScan does not support allowUnquotedFieldNames") } // {'name': 'Reynold Xin'} is not supported by CUDF - // This is different because the default for this is true, but we don't support it so we lie... - if (options.parameters.get("allowSingleQuotes").map(_.toBoolean).getOrElse(false)) { + if (options.get("allowSingleQuotes").map(_.toBoolean).getOrElse(false)) { meta.willNotWorkOnGpu("GpuJsonScan does not support allowSingleQuotes") } // {"name": "Cazen Lee", "price": "\$10"} is not supported by CUDF - if (options.allowBackslashEscapingAnyCharacter) { + if (parsedOptions.allowBackslashEscapingAnyCharacter) { meta.willNotWorkOnGpu("GpuJsonScan does not support allowBackslashEscapingAnyCharacter") } // {"a":null, "b":1, "c":3.0}, Spark will drop column `a` if dropFieldIfAllNull is enabled. - if (options.dropFieldIfAllNull) { + if (parsedOptions.dropFieldIfAllNull) { meta.willNotWorkOnGpu("GpuJsonScan does not support dropFieldIfAllNull") } - if (options.parseMode != PermissiveMode) { + if (parsedOptions.parseMode != PermissiveMode) { meta.willNotWorkOnGpu("GpuJsonScan only supports Permissive JSON parsing") } - if (options.lineSeparator.getOrElse("\n") != "\n") { + if (parsedOptions.lineSeparator.getOrElse("\n") != "\n") { meta.willNotWorkOnGpu("GpuJsonScan only supports \"\\n\" as a line separator") } - options.encoding.foreach(enc => + parsedOptions.encoding.foreach(enc => if (enc != StandardCharsets.UTF_8.name() && enc != StandardCharsets.US_ASCII.name()) { - meta.willNotWorkOnGpu("GpuJsonScan only supports UTF8 or US-ASCII encoded data") - }) - } - - def tagJsonToStructsSupport(options:Map[String, String], - meta: RapidsMeta[_, _, _]): Unit = { - val parsedOptions = new JSONOptionsInRead( - options, - SQLConf.get.sessionLocalTimeZone, - SQLConf.get.columnNameOfCorruptRecord) - - tagSupportOptions(parsedOptions, meta) - } - - def tagSupport( - sparkSession: SparkSession, - dataSchema: StructType, - readSchema: StructType, - options: Map[String, String], - meta: RapidsMeta[_, _, _]): Unit = { - - val parsedOptions = new JSONOptionsInRead( - options, - sparkSession.sessionState.conf.sessionLocalTimeZone, - sparkSession.sessionState.conf.columnNameOfCorruptRecord) - - if (!meta.conf.isJsonEnabled) { - meta.willNotWorkOnGpu("JSON input and output has been disabled. To enable set " + - s"${RapidsConf.ENABLE_JSON} to true") - } - - if (!meta.conf.isJsonReadEnabled) { - meta.willNotWorkOnGpu("JSON input has been disabled. To enable set " + - s"${RapidsConf.ENABLE_JSON_READ} to true. Please note that, currently json reader does " + - s"not support column prune, so user must specify the full schema or just let spark to " + - s"infer the schema") - } - - tagSupportOptions(parsedOptions, meta) + meta.willNotWorkOnGpu("GpuJsonScan only supports UTF8 or US-ASCII encoded data") + }) val types = readSchema.map(_.dataType) if (types.contains(DateType)) { @@ -154,17 +136,17 @@ object GpuJsonScan { if (!meta.conf.isJsonFloatReadEnabled && types.contains(FloatType)) { meta.willNotWorkOnGpu("JSON reading is not 100% compatible when reading floats. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_FLOATS} to true.") + s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_FLOATS} to true.") } if (!meta.conf.isJsonDoubleReadEnabled && types.contains(DoubleType)) { meta.willNotWorkOnGpu("JSON reading is not 100% compatible when reading doubles. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_DOUBLES} to true.") + s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_DOUBLES} to true.") } if (!meta.conf.isJsonDecimalReadEnabled && types.exists(_.isInstanceOf[DecimalType])) { meta.willNotWorkOnGpu("JSON reading is not 100% compatible when reading decimals. " + - s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_DECIMALS} to true.") + s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_DECIMALS} to true.") } dataSchema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { corruptFieldIndex => diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala deleted file mode 100644 index d12f8030c38..00000000000 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright (c) 2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.rapids - -import ai.rapids.cudf -import com.nvidia.spark.rapids.{GpuColumnVector, GpuScalar, GpuUnaryExpression} -import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq - -import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, NullIntolerant, TimeZoneAwareExpression} -import org.apache.spark.sql.types.{AbstractDataType, DataType, StringType} - -case class GpuJsonToStructs( - schema: DataType, - options: Map[String, String], - child: Expression, - timeZoneId: Option[String] = None) - extends GpuUnaryExpression with TimeZoneAwareExpression with ExpectsInputTypes - with NullIntolerant { - - private def cleanAndConcat(input: cudf.ColumnVector): (cudf.ColumnVector, cudf.ColumnVector) ={ - withResource(cudf.Scalar.fromString("{}")) { emptyRow => - val stripped = withResource(cudf.Scalar.fromString(" ")) { space => - input.strip(space) - } - withResource(stripped) { stripped => - val isNullOrEmptyInput = withResource(input.isNull) { isNull => - val isEmpty = withResource(stripped.getCharLengths) { lengths => - withResource(cudf.Scalar.fromInt(0)) { zero => - lengths.lessOrEqualTo(zero) - } - } - withResource(isEmpty) { isEmpty => - isNull.binaryOp(cudf.BinaryOp.NULL_LOGICAL_OR, isEmpty, cudf.DType.BOOL8) - } - } - closeOnExcept(isNullOrEmptyInput) { _ => - withResource(isNullOrEmptyInput.ifElse(emptyRow, stripped)) { cleaned => - withResource(cudf.Scalar.fromString("\n")) { lineSep => - withResource(cleaned.stringContains(lineSep)) { inputHas => - withResource(inputHas.any()) { anyLineSep => - if (anyLineSep.isValid && anyLineSep.getBoolean) { - throw new IllegalArgumentException("We cannot currently support parsing " + - "JSON that contains a line separator in it") - } - } - } - (isNullOrEmptyInput, cleaned.joinStrings(lineSep, emptyRow)) - } - } - } - } - } - } - - private def castToStrings(rawTable: cudf.Table): Seq[cudf.ColumnVector] = { - (0 until rawTable.getNumberOfColumns).safeMap { i => - val col = rawTable.getColumn(i) - if (!cudf.DType.STRING.equals(col.getType)) { - col.castTo(cudf.DType.STRING) - } else { - col.incRefCount() - } - } - } - - private def makeMap(names: Seq[String], values: Seq[cudf.ColumnVector], - numRows: Int): cudf.ColumnVector = { - val nameCols = names.safeMap { name => - withResource(cudf.Scalar.fromString(name)) { scalarName => - cudf.ColumnVector.fromScalar(scalarName, numRows) - } - } - withResource(nameCols) { nameCols => - val structViews = values.zip(nameCols).safeMap { - case (dataCol, nameCol) => cudf.ColumnView.makeStructView(nameCol, dataCol) - } - withResource(structViews) { structViews => - cudf.ColumnVector.makeList(numRows, cudf.DType.STRUCT, structViews: _*) - } - } - } - - override protected def doColumnar(input: GpuColumnVector): cudf.ColumnVector = { - // We cannot handle all corner cases with this right now. The parser just isn't - // good enough, but we will try to handle a few common ones. - val numRows = input.getRowCount.toInt - - // Step 1: verify and preprocess the data to clean it up and normalize a few things. - // Step 2: Concat the data into a single buffer. - val (isNullOrEmpty, combined) = cleanAndConcat(input.getBase) - withResource(isNullOrEmpty) { isNullOrEmpty => - // Step 3: copy the data back to the host so we can parse it. - val combinedHost = withResource(combined) { combined => - combined.copyToHost() - } - // Step 4: Have cudf parse the JSON data - val (names, rawTable) = withResource(combinedHost) { combinedHost => - val data = combinedHost.getData - val start = combinedHost.getStartListOffset(0) - val end = combinedHost.getEndListOffset(0) - val length = end - start - - withResource(cudf.Table.readJSON(cudf.JSONOptions.DEFAULT, data, start, - length)) { tableWithMeta => - val names = tableWithMeta.getColumnNames - (names, tableWithMeta.releaseTable()) - } - } - - val updatedCols = withResource(rawTable) { rawTable => - // Step 5 verify that the data looks correct. - if (rawTable.getRowCount != numRows) { - throw new IllegalStateException("The input data didn't parse correctly and we read a " + - s"different number of rows than was expected. Expected $numRows, " + - s"but got ${rawTable.getRowCount}") - } - if (names.toSet.size != names.size) { - throw new IllegalStateException("Internal Error: found duplicate key names...") - } - - // Step 6: convert any non-string columns back to strings - castToStrings(rawTable) - } - - // Step 7: turn the data into a Map - val mapData = withResource(updatedCols) { updatedCols => - makeMap(names, updatedCols, numRows) - } - - // Step 8: put nulls back in for nulls and empty strings - withResource(mapData) { mapData => - withResource(GpuScalar.from(null, dataType)) { nullVal => - isNullOrEmpty.ifElse(nullVal, mapData) - } - } - } - } - - override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = - copy(timeZoneId = Option(timeZoneId)) - - override def inputTypes: Seq[AbstractDataType] = StringType :: Nil - - override def dataType: DataType = schema.asNullable - - override def nullable: Boolean = true -} \ No newline at end of file diff --git a/tools/src/main/resources/operatorsScore.csv b/tools/src/main/resources/operatorsScore.csv index ce62bb87fc5..f67f0f396c4 100644 --- a/tools/src/main/resources/operatorsScore.csv +++ b/tools/src/main/resources/operatorsScore.csv @@ -129,7 +129,6 @@ IntegralDivide,4 IsNaN,4 IsNotNull,4 IsNull,4 -JsonToStructs,4 KnownFloatingPointNormalized,4 KnownNotNull,4 Lag,4 diff --git a/tools/src/main/resources/supportedExprs.csv b/tools/src/main/resources/supportedExprs.csv index 7b93812944c..33fb60ec5d7 100644 --- a/tools/src/main/resources/supportedExprs.csv +++ b/tools/src/main/resources/supportedExprs.csv @@ -261,8 +261,6 @@ IsNotNull,S,`isnotnull`,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,P IsNotNull,S,`isnotnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA IsNull,S,`isnull`,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS IsNull,S,`isnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA -JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,jsonStr,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA -JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NS,PS,NS,NA KnownFloatingPointNormalized,S, ,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S KnownFloatingPointNormalized,S, ,None,project,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S KnownNotNull,S, ,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,NS,S,S,PS,PS,PS,NS