Skip to content

Commit

Permalink
Disable JSON and CSV floating-point reads by default (#5078)
Browse files Browse the repository at this point in the history
* Disable floating-point types in CSV and JSON by default

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Update docs

* Update tests

* Fix decimal check

* formatting

* fix typo in decimal config names
  • Loading branch information
andygrove authored Mar 29, 2022
1 parent 29db478 commit 3139f54
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 8 deletions.
6 changes: 6 additions & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ Name | Description | Default Value
<a name="sql.castStringToFloat.enabled"></a>spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false
<a name="sql.castStringToTimestamp.enabled"></a>spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false
<a name="sql.concurrentGpuTasks"></a>spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1
<a name="sql.csv.read.decimal.enabled"></a>spark.rapids.sql.csv.read.decimal.enabled|CSV reading is not 100% compatible when reading decimals.|false
<a name="sql.csv.read.double.enabled"></a>spark.rapids.sql.csv.read.double.enabled|CSV reading is not 100% compatible when reading doubles.|false
<a name="sql.csv.read.float.enabled"></a>spark.rapids.sql.csv.read.float.enabled|CSV reading is not 100% compatible when reading floats.|false
<a name="sql.decimalOverflowGuarantees"></a>spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
Expand Down Expand Up @@ -105,6 +108,9 @@ Name | Description | Default Value
<a name="sql.join.leftOuter.enabled"></a>spark.rapids.sql.join.leftOuter.enabled|When set to true left outer joins are enabled on the GPU|true
<a name="sql.join.leftSemi.enabled"></a>spark.rapids.sql.join.leftSemi.enabled|When set to true left semi joins are enabled on the GPU|true
<a name="sql.join.rightOuter.enabled"></a>spark.rapids.sql.join.rightOuter.enabled|When set to true right outer joins are enabled on the GPU|true
<a name="sql.json.read.decimal.enabled"></a>spark.rapids.sql.json.read.decimal.enabled|JSON reading is not 100% compatible when reading decimals.|false
<a name="sql.json.read.double.enabled"></a>spark.rapids.sql.json.read.double.enabled|JSON reading is not 100% compatible when reading doubles.|false
<a name="sql.json.read.float.enabled"></a>spark.rapids.sql.json.read.float.enabled|JSON reading is not 100% compatible when reading floats.|false
<a name="sql.metrics.level"></a>spark.rapids.sql.metrics.level|GPU plans can produce a lot more metrics than CPU plans do. In very large queries this can sometimes result in going over the max result size limit for the driver. Supported values include DEBUG which will enable all metrics supported and typically only needs to be enabled when debugging the plugin. MODERATE which should output enough metrics to understand how long each part of the query is taking and how much data is going to each part of the query. ESSENTIAL which disables most metrics except those Apache Spark CPU plans will also report or their equivalents.|MODERATE
<a name="sql.mode"></a>spark.rapids.sql.mode|Set the mode for the Rapids Accelerator. The supported modes are explainOnly and executeOnGPU. This config can not be changed at runtime, you must restart the application for it to take affect. The default mode is executeOnGPU, which means the RAPIDS Accelerator plugin convert the Spark operations and execute them on the GPU when possible. The explainOnly mode allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was going to run on the GPU. The explanations of what would have run on the GPU and why are output in log messages. When using explainOnly mode, the default explain output is ALL, this can be changed by setting spark.rapids.sql.explain. See that config for more details.|executeongpu
<a name="sql.python.gpu.enabled"></a>spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false
Expand Down
5 changes: 4 additions & 1 deletion integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@
StructField('number', DoubleType()),
StructField('ignored_b', StringType())])

_enable_all_types_conf = {'spark.sql.legacy.timeParserPolicy': 'CORRECTED'}
_enable_all_types_conf = {'spark.rapids.sql.csv.read.float.enabled': 'true',
'spark.rapids.sql.csv.read.double.enabled': 'true',
'spark.rapids.sql.csv.read.decimal.enabled': 'true',
'spark.sql.legacy.timeParserPolicy': 'CORRECTED'}

def read_csv_df(data_path, schema, spark_tmp_table_factory_ignored, options = {}):
def read_impl(spark):
Expand Down
6 changes: 5 additions & 1 deletion integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@

_enable_all_types_conf = {
'spark.rapids.sql.format.json.enabled': 'true',
'spark.rapids.sql.format.json.read.enabled': 'true'}
'spark.rapids.sql.format.json.read.enabled': 'true',
'spark.rapids.sql.json.read.float.enabled': 'true',
'spark.rapids.sql.json.read.double.enabled': 'true',
'spark.rapids.sql.json.read.decimal.enabled': 'true'
}

_bool_schema = StructType([
StructField('number', BooleanType())])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,21 @@ object GpuCSVScan {
}
// TODO parsedOptions.emptyValueInRead

if (!meta.conf.isCsvFloatReadEnabled && types.contains(FloatType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading floats. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_FLOATS} to true.")
}

if (!meta.conf.isCsvDoubleReadEnabled && types.contains(DoubleType)) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading doubles. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_DOUBLES} to true.")
}

if (!meta.conf.isCsvDecimalReadEnabled && types.exists(_.isInstanceOf[DecimalType])) {
meta.willNotWorkOnGpu("CSV reading is not 100% compatible when reading decimals. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_CSV_DECIMALS} to true.")
}

FileFormatChecks.tag(meta, readSchema, CsvFormatType, ReadFileOp)
}
}
Expand Down
42 changes: 42 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,21 @@ object RapidsConf {
.booleanConf
.createWithDefault(true)

val ENABLE_READ_CSV_FLOATS = conf("spark.rapids.sql.csv.read.float.enabled")
.doc("CSV reading is not 100% compatible when reading floats.")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_DOUBLES = conf("spark.rapids.sql.csv.read.double.enabled")
.doc("CSV reading is not 100% compatible when reading doubles.")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_CSV_DECIMALS = conf("spark.rapids.sql.csv.read.decimal.enabled")
.doc("CSV reading is not 100% compatible when reading decimals.")
.booleanConf
.createWithDefault(false)

val ENABLE_JSON = conf("spark.rapids.sql.format.json.enabled")
.doc("When set to true enables all json input and output acceleration. " +
"(only input is currently supported anyways)")
Expand All @@ -900,6 +915,21 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)

val ENABLE_READ_JSON_FLOATS = conf("spark.rapids.sql.json.read.float.enabled")
.doc("JSON reading is not 100% compatible when reading floats.")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_JSON_DOUBLES = conf("spark.rapids.sql.json.read.double.enabled")
.doc("JSON reading is not 100% compatible when reading doubles.")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_JSON_DECIMALS = conf("spark.rapids.sql.json.read.decimal.enabled")
.doc("JSON reading is not 100% compatible when reading decimals.")
.booleanConf
.createWithDefault(false)

val ENABLE_AVRO = conf("spark.rapids.sql.format.avro.enabled")
.doc("When set to true enables all avro input and output acceleration. " +
"(only input is currently supported anyways)")
Expand Down Expand Up @@ -1655,10 +1685,22 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isCsvReadEnabled: Boolean = get(ENABLE_CSV_READ)

lazy val isCsvFloatReadEnabled: Boolean = get(ENABLE_READ_CSV_FLOATS)

lazy val isCsvDoubleReadEnabled: Boolean = get(ENABLE_READ_CSV_DOUBLES)

lazy val isCsvDecimalReadEnabled: Boolean = get(ENABLE_READ_CSV_DECIMALS)

lazy val isJsonEnabled: Boolean = get(ENABLE_JSON)

lazy val isJsonReadEnabled: Boolean = get(ENABLE_JSON_READ)

lazy val isJsonFloatReadEnabled: Boolean = get(ENABLE_READ_JSON_FLOATS)

lazy val isJsonDoubleReadEnabled: Boolean = get(ENABLE_READ_JSON_DOUBLES)

lazy val isJsonDecimalReadEnabled: Boolean = get(ENABLE_READ_JSON_DECIMALS)

lazy val isAvroEnabled: Boolean = get(ENABLE_AVRO)

lazy val isAvroReadEnabled: Boolean = get(ENABLE_AVRO_READ)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.datasources.{PartitionedFile, Partitioning
import org.apache.spark.sql.execution.datasources.v2.{FilePartitionReaderFactory, FileScan, TextBasedFileScan}
import org.apache.spark.sql.execution.datasources.v2.json.JsonScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, DecimalType, StringType, StructType, TimestampType}
import org.apache.spark.sql.types.{DateType, DecimalType, DoubleType, FloatType, StringType, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -121,19 +121,35 @@ object GpuJsonScan {
meta.willNotWorkOnGpu("GpuJsonScan only supports UTF8 or US-ASCII encoded data")
})

if (readSchema.map(_.dataType).contains(DateType)) {
val types = readSchema.map(_.dataType)
if (types.contains(DateType)) {
GpuTextBasedDateUtils.tagCudfFormat(meta,
GpuJsonUtils.dateFormatInRead(parsedOptions), parseString = true)
}

if (readSchema.map(_.dataType).contains(TimestampType)) {
if (types.contains(TimestampType)) {
if (!TypeChecks.areTimestampsSupported(parsedOptions.zoneId)) {
meta.willNotWorkOnGpu("Only UTC zone id is supported")
}
GpuTextBasedDateUtils.tagCudfFormat(meta,
GpuJsonUtils.timestampFormatInRead(parsedOptions), parseString = true)
}

if (!meta.conf.isJsonFloatReadEnabled && types.contains(FloatType)) {
meta.willNotWorkOnGpu("JSON reading is not 100% compatible when reading floats. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_FLOATS} to true.")
}

if (!meta.conf.isJsonDoubleReadEnabled && types.contains(DoubleType)) {
meta.willNotWorkOnGpu("JSON reading is not 100% compatible when reading doubles. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_DOUBLES} to true.")
}

if (!meta.conf.isJsonDecimalReadEnabled && types.exists(_.isInstanceOf[DecimalType])) {
meta.willNotWorkOnGpu("JSON reading is not 100% compatible when reading decimals. " +
s"To enable it please set ${RapidsConf.ENABLE_READ_JSON_DECIMALS} to true.")
}

dataSchema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { corruptFieldIndex =>
val f = dataSchema(corruptFieldIndex)
if (f.dataType != StringType || !f.nullable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class CsvScanSuite extends SparkQueryCompareTestSuite {

testSparkResultsAreEqual("Test CSV splits with chunks", floatCsvDf,
conf = new SparkConf()
.set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1")) {
.set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1")
.set(RapidsConf.ENABLE_READ_CSV_FLOATS.key, "true")) {
frame => frame.select(col("floats"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm {

def enableCsvConf(): SparkConf = {
new SparkConf()
.set(RapidsConf.ENABLE_READ_CSV_FLOATS.key, "true")
.set(RapidsConf.ENABLE_READ_CSV_DOUBLES.key, "true")
.set(RapidsConf.ENABLE_READ_CSV_DECIMALS.key, "true")
.set(RapidsConf.ENABLE_READ_JSON_FLOATS.key, "true")
.set(RapidsConf.ENABLE_READ_JSON_DOUBLES.key, "true")
.set(RapidsConf.ENABLE_READ_JSON_DECIMALS.key, "true")
}

// @see java.lang.Float#intBitsToFloat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import com.nvidia.spark.rapids.{GpuAlias, GpuBatchScanExec, GpuColumnVector, Gpu
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.GpuFileSourceScanExec
import org.apache.spark.sql.types.{Decimal, DecimalType, IntegerType, LongType, StructField, StructType}
Expand Down Expand Up @@ -255,7 +257,7 @@ class DecimalUnitTest extends GpuUnitTests {
var rootPlan = frameFromOrc("decimal-test.orc")(ss).queryExecution.executedPlan
assert(rootPlan.map(p => p).exists(_.isInstanceOf[GpuFileSourceScanExec]))
rootPlan = fromCsvDf("decimal-test.csv", decimalCsvStruct)(ss).queryExecution.executedPlan
assert(rootPlan.map(p => p).exists(_.isInstanceOf[GpuFileSourceScanExec]))
assert(rootPlan.map(p => p).exists(_.isInstanceOf[FileSourceScanExec]))
rootPlan = frameFromParquet("decimal-test.parquet")(ss).queryExecution.executedPlan
assert(rootPlan.map(p => p).exists(_.isInstanceOf[GpuFileSourceScanExec]))
}, conf)
Expand All @@ -264,7 +266,7 @@ class DecimalUnitTest extends GpuUnitTests {
var rootPlan = frameFromOrc("decimal-test.orc")(ss).queryExecution.executedPlan
assert(rootPlan.map(p => p).exists(_.isInstanceOf[GpuBatchScanExec]))
rootPlan = fromCsvDf("decimal-test.csv", decimalCsvStruct)(ss).queryExecution.executedPlan
assert(rootPlan.map(p => p).exists(_.isInstanceOf[GpuBatchScanExec]))
assert(rootPlan.map(p => p).exists(_.isInstanceOf[BatchScanExec]))
rootPlan = frameFromParquet("decimal-test.parquet")(ss).queryExecution.executedPlan
assert(rootPlan.map(p => p).exists(_.isInstanceOf[GpuBatchScanExec]))
}, conf.set(SQLConf.USE_V1_SOURCE_LIST.key, ""))
Expand Down

0 comments on commit 3139f54

Please sign in to comment.