Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the mixedType config for JSON as it has no downsides any longer #10716

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ Name | Description | Default Value | Applicable at
<a name="sql.json.read.decimal.enabled"></a>spark.rapids.sql.json.read.decimal.enabled|When reading a quoted string as a decimal Spark supports reading non-ascii unicode digits, and the RAPIDS Accelerator does not.|true|Runtime
<a name="sql.json.read.double.enabled"></a>spark.rapids.sql.json.read.double.enabled|JSON reading is not 100% compatible when reading doubles.|true|Runtime
<a name="sql.json.read.float.enabled"></a>spark.rapids.sql.json.read.float.enabled|JSON reading is not 100% compatible when reading floats.|true|Runtime
<a name="sql.json.read.mixedTypesAsString.enabled"></a>spark.rapids.sql.json.read.mixedTypesAsString.enabled|JSON reading is not 100% compatible when reading mixed types as string.|false|Runtime
<a name="sql.mode"></a>spark.rapids.sql.mode|Set the mode for the Rapids Accelerator. The supported modes are explainOnly and executeOnGPU. This config can not be changed at runtime, you must restart the application for it to take affect. The default mode is executeOnGPU, which means the RAPIDS Accelerator plugin convert the Spark operations and execute them on the GPU when possible. The explainOnly mode allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was going to run on the GPU. The explanations of what would have run on the GPU and why are output in log messages. When using explainOnly mode, the default explain output is ALL, this can be changed by setting spark.rapids.sql.explain. See that config for more details.|executeongpu|Startup
<a name="sql.optimizer.joinReorder.enabled"></a>spark.rapids.sql.optimizer.joinReorder.enabled|When enabled, joins may be reordered for improved query performance|true|Runtime
<a name="sql.python.gpu.enabled"></a>spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false|Runtime
Expand Down
6 changes: 2 additions & 4 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,8 @@ In versions of Spark before 3.5.0 there is no maximum to how deeply nested JSON
no matter what version of Spark is used. If the nesting level is over this the JSON is considered
invalid and all values will be returned as nulls.

Only structs are supported for nested types. There are also some issues with arrays of structs. If
your data includes this, even if you are not reading it, you might get an exception. You can
try to set `spark.rapids.sql.json.read.mixedTypesAsString.enabled` to true to work around this,
but it also has some issues with it.
Mixed types can have some problems. If an item being read could have some lines that are arrays
and others that are structs/dictionaries it is possible an error will be thrown.

Dates and Timestamps have some issues and may return values for technically invalid inputs.

Expand Down
6 changes: 2 additions & 4 deletions integration_tests/src/main/python/json_matrix_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,15 @@ def read_json_as_text(spark, data_path, column_name):
'spark.rapids.sql.format.json.read.enabled': 'true',
'spark.rapids.sql.json.read.float.enabled': 'true',
'spark.rapids.sql.json.read.double.enabled': 'true',
'spark.rapids.sql.json.read.decimal.enabled': 'true',
'spark.rapids.sql.json.read.mixedTypesAsString.enabled': 'true'
'spark.rapids.sql.json.read.decimal.enabled': 'true'
}

_enable_json_to_structs_conf = {
'spark.rapids.sql.expression.JsonToStructs': 'true',
'spark.rapids.sql.json.read.float.enabled': 'true',
'spark.rapids.sql.json.read.double.enabled': 'true',
'spark.rapids.sql.json.read.decimal.enabled': 'true',
'spark.rapids.sql.json.read.decimal.enabled': 'true',
'spark.rapids.sql.json.read.mixedTypesAsString.enabled': 'true'
'spark.rapids.sql.json.read.decimal.enabled': 'true'
}

_enable_get_json_object_conf = {
Expand Down
6 changes: 2 additions & 4 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,7 @@ def test_read_invalid_json(spark_tmp_table_factory, std_input_path, read_func, f
@pytest.mark.parametrize('v1_enabled_list', ["", "json"])
def test_read_valid_json(spark_tmp_table_factory, std_input_path, read_func, filename, schema, v1_enabled_list):
conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.rapids.sql.json.read.mixedTypesAsString.enabled': True})
{'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
read_func(std_input_path + '/' + filename,
schema,
Expand Down Expand Up @@ -898,11 +897,10 @@ def test_from_json_struct_of_list(schema):
@pytest.mark.xfail(reason = 'https://github.com/NVIDIA/spark-rapids/issues/10351')
def test_from_json_mixed_types_list_struct(schema):
json_string_gen = StringGen(r'{"a": (\[1,2,3\]|{"b":"[a-z]{2}"}) }')
conf = copy_and_update(_enable_all_types_conf, {'spark.rapids.sql.json.read.mixedTypesAsString.enabled': 'true'})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select('a', f.from_json('a', schema)),
conf=conf)
conf=_enable_all_types_conf)

@pytest.mark.parametrize('schema', ['struct<a:string>', 'struct<a:string,b:int>'])
@allow_non_gpu(*non_utc_allow)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3689,8 +3689,7 @@ object GpuOverrides extends Logging {

override def convertToGpu(child: Expression): GpuExpression =
// GPU implementation currently does not support duplicated json key names in input
GpuJsonToStructs(a.schema, a.options, child, conf.isJsonMixedTypesAsStringEnabled,
a.timeZoneId)
GpuJsonToStructs(a.schema, a.options, child, a.timeZoneId)
}).disabledByDefault("it is currently in beta and undergoes continuous enhancements."+
" Please consult the "+
"[compatibility documentation](../compatibility.md#json-supporting-types)"+
Expand Down Expand Up @@ -3883,8 +3882,7 @@ object GpuOverrides extends Logging {
a.dataFilters,
conf.maxReadBatchSizeRows,
conf.maxReadBatchSizeBytes,
conf.maxGpuColumnSizeBytes,
conf.isJsonMixedTypesAsStringEnabled)
conf.maxGpuColumnSizeBytes)
})).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

val scans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1239,12 +1239,6 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(true)

val ENABLE_READ_JSON_MIXED_TYPES_AS_STRING =
conf("spark.rapids.sql.json.read.mixedTypesAsString.enabled")
.doc("JSON reading is not 100% compatible when reading mixed types as string.")
.booleanConf
.createWithDefault(false)

val ENABLE_AVRO = conf("spark.rapids.sql.format.avro.enabled")
.doc("When set to true enables all avro input and output acceleration. " +
"(only input is currently supported anyways)")
Expand Down Expand Up @@ -2686,8 +2680,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isJsonDecimalReadEnabled: Boolean = get(ENABLE_READ_JSON_DECIMALS)

lazy val isJsonMixedTypesAsStringEnabled: Boolean = get(ENABLE_READ_JSON_MIXED_TYPES_AS_STRING)

lazy val isAvroEnabled: Boolean = get(ENABLE_AVRO)

lazy val isAvroReadEnabled: Boolean = get(ENABLE_AVRO_READ)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,7 @@ case class GpuJsonScan(
dataFilters: Seq[Expression],
maxReaderBatchSizeRows: Integer,
maxReaderBatchSizeBytes: Long,
maxGpuColumnSizeBytes: Long,
mixedTypesAsStringEnabled: Boolean)
maxGpuColumnSizeBytes: Long)
extends TextBasedFileScan(sparkSession, options) with GpuScan {

private lazy val parsedOptions: JSONOptions = new JSONOptions(
Expand All @@ -270,8 +269,7 @@ case class GpuJsonScan(

GpuJsonPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
dataSchema, readDataSchema, readPartitionSchema, parsedOptions, maxReaderBatchSizeRows,
maxReaderBatchSizeBytes, maxGpuColumnSizeBytes, metrics, options.asScala.toMap,
mixedTypesAsStringEnabled)
maxReaderBatchSizeBytes, maxGpuColumnSizeBytes, metrics, options.asScala.toMap)
}

override def withInputFile(): GpuScan = this
Expand All @@ -289,8 +287,7 @@ case class GpuJsonPartitionReaderFactory(
maxReaderBatchSizeBytes: Long,
maxGpuColumnSizeBytes: Long,
metrics: Map[String, GpuMetric],
@transient params: Map[String, String],
mixedTypesAsStringEnabled: Boolean) extends ShimFilePartitionReaderFactory(params) {
@transient params: Map[String, String]) extends ShimFilePartitionReaderFactory(params) {

override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = {
throw new IllegalStateException("ROW BASED PARSING IS NOT SUPPORTED ON THE GPU...")
Expand All @@ -300,7 +297,7 @@ case class GpuJsonPartitionReaderFactory(
val conf = broadcastedConf.value.value
val reader = new PartitionReaderWithBytesRead(new JsonPartitionReader(conf, partFile,
dataSchema, readDataSchema, parsedOptions, maxReaderBatchSizeRows, maxReaderBatchSizeBytes,
metrics, mixedTypesAsStringEnabled))
metrics))
ColumnarPartitionReaderWithPartitionValues.newReader(partFile, reader, partitionSchema,
maxGpuColumnSizeBytes)
}
Expand Down Expand Up @@ -346,14 +343,13 @@ class JsonPartitionReader(
parsedOptions: JSONOptions,
maxRowsPerChunk: Integer,
maxBytesPerChunk: Long,
execMetrics: Map[String, GpuMetric],
enableMixedTypesAsString: Boolean)
execMetrics: Map[String, GpuMetric])
extends GpuTextBasedPartitionReader[HostLineBufferer, HostLineBuffererFactory.type](conf,
partFile, dataSchema, readDataSchema, parsedOptions.lineSeparatorInRead, maxRowsPerChunk,
maxBytesPerChunk, execMetrics, HostLineBuffererFactory) {

def buildJsonOptions(parsedOptions: JSONOptions): cudf.JSONOptions =
GpuJsonReadCommon.cudfJsonOptions(parsedOptions, enableMixedTypesAsString)
GpuJsonReadCommon.cudfJsonOptions(parsedOptions)

/**
* Read the host buffer to GPU table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ class GpuReadJsonFileFormat extends JsonFileFormat with GpuReadFileFormatWithMet
rapidsConf.maxReadBatchSizeBytes,
rapidsConf.maxGpuColumnSizeBytes,
metrics,
options,
rapidsConf.isJsonMixedTypesAsStringEnabled)
options)
PartitionReaderIterator.buildReader(factory)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,10 @@ object GpuJsonReadCommon {
}
}

def cudfJsonOptions(options: JSONOptions,
enableMixedTypes: Boolean): ai.rapids.cudf.JSONOptions = {
def cudfJsonOptions(options: JSONOptions): ai.rapids.cudf.JSONOptions = {
ai.rapids.cudf.JSONOptions.builder()
.withRecoverWithNull(true)
.withMixedTypesAsStrings(enableMixedTypes)
.withMixedTypesAsStrings(true)
.withNormalizeWhitespace(true)
.withKeepQuotes(true)
.withNormalizeSingleQuotes(options.allowSingleQuotes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ case class GpuJsonToStructs(
schema: DataType,
options: Map[String, String],
child: Expression,
enableMixedTypesAsString: Boolean,
timeZoneId: Option[String] = None)
extends GpuUnaryExpression with TimeZoneAwareExpression with ExpectsInputTypes
with NullIntolerant {
Expand Down Expand Up @@ -155,7 +154,7 @@ case class GpuJsonToStructs(
SQLConf.get.columnNameOfCorruptRecord)

private lazy val jsonOptions =
GpuJsonReadCommon.cudfJsonOptions(parsedOptions, enableMixedTypesAsString)
GpuJsonReadCommon.cudfJsonOptions(parsedOptions)

override protected def doColumnar(input: GpuColumnVector): cudf.ColumnVector = {
schema match {
Expand Down
Loading