Skip to content

Commit

Permalink
Improve JSON and CSV support for boolean values (#4780)
Browse files Browse the repository at this point in the history
* Improve JSON and CSV support for boolean values

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* implement custom boolean parsing logic for CSV and JSON
  • Loading branch information
andygrove authored Feb 15, 2022
1 parent c67eedb commit 5132869
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 7 deletions.
2 changes: 1 addition & 1 deletion integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def read_impl(spark):
pytest.param('floats_invalid.csv', _double_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}),
pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2071')),
pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}),
pytest.param('ints_with_whitespace.csv', _number_as_string_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2069')),
pytest.param('ints_with_whitespace.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130'))
], ids=idfn)
Expand Down
7 changes: 6 additions & 1 deletion integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
'spark.rapids.sql.format.json.enabled': 'true',
'spark.rapids.sql.format.json.read.enabled': 'true'}

_bool_schema = StructType([
StructField('number', BooleanType())])

_float_schema = StructType([
StructField('number', FloatType())])

Expand Down Expand Up @@ -170,14 +173,16 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena

@approximate_float
@pytest.mark.parametrize('filename', [
'boolean.json',
pytest.param('boolean_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4779')),
'nan_and_inf.json',
pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4646')),
'floats.json',
'floats_leading_zeros.json',
'floats_invalid.json',
pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')),
])
@pytest.mark.parametrize('schema', [_float_schema, _double_schema])
@pytest.mark.parametrize('schema', [_bool_schema, _float_schema, _double_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"])
@pytest.mark.parametrize('allow_numeric_leading_zeros', ["true"])
Expand Down
11 changes: 11 additions & 0 deletions integration_tests/src/test/resources/boolean.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{ "number": true }
{ "number": True }
{ "number": TRUE }
{ "number": false }
{ "number": False }
{ "number": FALSE }
{ "number": null }
{ "number": y }
{ "number": n }
{ "number": 0 }
{ "number": 1 }
9 changes: 9 additions & 0 deletions integration_tests/src/test/resources/boolean_invalid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{ "number": "true" }
{ "number": "false" }
{ "number": "null" }
{ "number": "" }
{ "number": "True" }
{ "number": "TRUE" }
{ "number": "False" }
{ "number": "FALSE" }
{ "number": "BAD" }
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,11 @@ False
TRUE
FALSE
BAD
y
n
yes
no
1
0
t
f
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._

import ai.rapids.cudf
import ai.rapids.cudf.{HostMemoryBuffer, Schema, Table}
import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, Scalar, Schema, Table}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

Expand Down Expand Up @@ -423,4 +423,28 @@ class CSVPartitionReader(
* @return the file format short name
*/
override def getFileFormatShortName: String = "CSV"

/**
* CSV supports "true" and "false" (case-insensitive) as valid boolean values.
*/
override def castStringToBool(input: ColumnVector): ColumnVector = {
withResource(input.strip()) { stripped =>
withResource(stripped.lower()) { lower =>
withResource(Scalar.fromString("true")) { t =>
withResource(Scalar.fromString("false")) { f =>
withResource(lower.equalTo(t)) { isTrue =>
withResource(lower.equalTo(f)) { isFalse =>
withResource(isTrue.or(isFalse)) { isValidBool =>
withResource(Scalar.fromNull(DType.BOOL8)) { nullBool =>
isValidBool.ifElse(isTrue, nullBool)
}
}
}
}
}
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,11 @@ abstract class GpuTextBasedPartitionReader(
readDataSchema
}

// read floating-point columns as strings in cuDF
// read boolean and floating-point columns as strings in cuDF
val dataSchemaWithStrings = StructType(dataSchema.fields
.map(f => {
f.dataType match {
case DataTypes.FloatType | DataTypes.DoubleType =>
case DataTypes.BooleanType | DataTypes.FloatType | DataTypes.DoubleType =>
f.copy(dataType = DataTypes.StringType)
case _ =>
f
Expand All @@ -188,7 +188,7 @@ abstract class GpuTextBasedPartitionReader(
}
maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory)

// parse floating-point columns that were read as strings
// parse boolean and floating-point columns that were read as strings
val castTable = withResource(table) { _ =>
val columns = new ListBuffer[ColumnVector]()
// Table increases the ref counts on the columns so we have
Expand All @@ -198,6 +198,8 @@ abstract class GpuTextBasedPartitionReader(
val ansiEnabled = false
for (i <- 0 until table.getNumberOfColumns) {
val castColumn = dataSchema.fields(i).dataType match {
case DataTypes.BooleanType =>
castStringToBool(table.getColumn(i))
case DataTypes.FloatType =>
GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32)
case DataTypes.DoubleType =>
Expand All @@ -218,6 +220,8 @@ abstract class GpuTextBasedPartitionReader(
}
}

def castStringToBool(input: ColumnVector): ColumnVector

/**
* Read the host buffer to GPU table
* @param dataBuffer host buffer to be read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._

import ai.rapids.cudf
import ai.rapids.cudf.{HostMemoryBuffer, Schema, Table}
import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, Scalar, Schema, Table}
import com.nvidia.spark.rapids._
import org.apache.hadoop.conf.Configuration

Expand Down Expand Up @@ -334,4 +334,24 @@ class JsonPartitionReader(
Some(new Table(prunedColumnVectors: _*))
}
}

/**
* JSON only supports unquoted lower-case "true" and "false" as valid boolean values.
*/
override def castStringToBool(input: ColumnVector): ColumnVector = {
withResource(Scalar.fromString("true")) { t =>
withResource(Scalar.fromString("false")) { f =>
withResource(input.equalTo(t)) { isTrue =>
withResource(input.equalTo(f)) { isFalse =>
withResource(isTrue.or(isFalse)) { isValidBool =>
withResource(Scalar.fromNull(DType.BOOL8)) { nullBool =>
isValidBool.ifElse(isTrue, nullBool)
}
}
}
}
}
}
}

}

0 comments on commit 5132869

Please sign in to comment.