diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 74001e5e01a..9cd39038348 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -193,7 +193,8 @@ datasource::owning_buffer> get_record_range_raw_input( size_t chunk_size = reader_opts.get_byte_range_size(); CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, - "Invalid offsetting"); + "Invalid offsetting", + std::invalid_argument); auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset; chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size; diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index c8a438fc40b..91be154e09d 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -217,7 +217,7 @@ class memory_mapped_source : public file_source { void map(int fd, size_t offset, size_t size) { - CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file"); + CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file", std::overflow_error); // Offset for `mmap()` must be page aligned _map_offset = offset & ~(sysconf(_SC_PAGESIZE) - 1); diff --git a/python/cudf/cudf/_lib/json.pyx b/python/cudf/cudf/_lib/json.pyx index 9c646e3357b..853dd431099 100644 --- a/python/cudf/cudf/_lib/json.pyx +++ b/python/cudf/cudf/_lib/json.pyx @@ -10,6 +10,7 @@ from cudf.core.buffer import acquire_spill_lock from libcpp cimport bool cimport cudf._lib.pylibcudf.libcudf.io.types as cudf_io_types +from cudf._lib.column cimport Column from cudf._lib.io.utils cimport add_df_col_struct_names from cudf._lib.pylibcudf.io.types cimport compression_type from cudf._lib.pylibcudf.libcudf.io.json cimport json_recovery_mode_t @@ -17,7 +18,7 @@ from cudf._lib.pylibcudf.libcudf.io.types cimport compression_type from cudf._lib.pylibcudf.libcudf.types cimport data_type, type_id from cudf._lib.pylibcudf.types cimport DataType from cudf._lib.types cimport dtype_to_data_type -from cudf._lib.utils cimport data_from_pylibcudf_io +from cudf._lib.utils cimport _data_from_columns, data_from_pylibcudf_io import cudf._lib.pylibcudf as plc @@ -98,28 +99,48 @@ cpdef read_json(object filepaths_or_buffers, else: raise TypeError("`dtype` must be 'list like' or 'dict'") - table_w_meta = plc.io.json.read_json( - plc.io.SourceInfo(filepaths_or_buffers), - processed_dtypes, - c_compression, - lines, - byte_range_offset = byte_range[0] if byte_range is not None else 0, - byte_range_size = byte_range[1] if byte_range is not None else 0, - keep_quotes = keep_quotes, - mixed_types_as_string = mixed_types_as_string, - prune_columns = prune_columns, - recovery_mode = _get_json_recovery_mode(on_bad_lines) - ) - - df = cudf.DataFrame._from_data( - *data_from_pylibcudf_io( - table_w_meta + if cudf.get_option("mode.pandas_compatible") and lines: + res_cols, res_col_names, res_child_names = plc.io.json.chunked_read_json( + plc.io.SourceInfo(filepaths_or_buffers), + processed_dtypes, + c_compression, + keep_quotes = keep_quotes, + mixed_types_as_string = mixed_types_as_string, + prune_columns = prune_columns, + recovery_mode = _get_json_recovery_mode(on_bad_lines) + ) + df = cudf.DataFrame._from_data( + *_data_from_columns( + columns=[Column.from_pylibcudf(plc) for plc in res_cols], + column_names=res_col_names, + index_names=None + ) + ) + add_df_col_struct_names(df, res_child_names) + return df + else: + table_w_meta = plc.io.json.read_json( + plc.io.SourceInfo(filepaths_or_buffers), + processed_dtypes, + c_compression, + lines, + byte_range_offset = byte_range[0] if byte_range is not None else 0, + byte_range_size = byte_range[1] if byte_range is not None else 0, + keep_quotes = keep_quotes, + mixed_types_as_string = mixed_types_as_string, + prune_columns = prune_columns, + recovery_mode = _get_json_recovery_mode(on_bad_lines) + ) + + df = cudf.DataFrame._from_data( + *data_from_pylibcudf_io( + table_w_meta + ) ) - ) - # Post-processing to add in struct column names - add_df_col_struct_names(df, table_w_meta.child_names) - return df + # Post-processing to add in struct column names + add_df_col_struct_names(df, table_w_meta.child_names) + return df @acquire_spill_lock() diff --git a/python/cudf/cudf/_lib/pylibcudf/io/json.pxd b/python/cudf/cudf/_lib/pylibcudf/io/json.pxd index f7f733a493d..2e0e92a054f 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/json.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/json.pxd @@ -35,3 +35,14 @@ cpdef void write_json( str true_value = *, str false_value = * ) + +cpdef tuple chunked_read_json( + SourceInfo source_info, + list dtypes = *, + compression_type compression = *, + bool keep_quotes = *, + bool mixed_types_as_string = *, + bool prune_columns = *, + json_recovery_mode_t recovery_mode = *, + int chunk_size= *, +) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/json.pyx b/python/cudf/cudf/_lib/pylibcudf/io/json.pyx index 354cb4981de..2710ee60075 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/json.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/json.pyx @@ -6,6 +6,7 @@ from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector +from cudf._lib.pylibcudf.concatenate cimport concatenate from cudf._lib.pylibcudf.io.types cimport ( SinkInfo, SourceInfo, @@ -50,6 +51,144 @@ cdef map[string, schema_element] _generate_schema_map(list dtypes): return schema_map +cdef json_reader_options _setup_json_reader_options( + SourceInfo source_info, + list dtypes, + compression_type compression, + bool lines, + size_type byte_range_offset, + size_type byte_range_size, + bool keep_quotes, + bool mixed_types_as_string, + bool prune_columns, + json_recovery_mode_t recovery_mode): + + cdef vector[data_type] types_vec + cdef json_reader_options opts = move( + json_reader_options.builder(source_info.c_obj) + .compression(compression) + .lines(lines) + .byte_range_offset(byte_range_offset) + .byte_range_size(byte_range_size) + .recovery_mode(recovery_mode) + .build() + ) + + if dtypes is not None: + if isinstance(dtypes[0], tuple): + opts.set_dtypes(move(_generate_schema_map(dtypes))) + else: + for dtype in dtypes: + types_vec.push_back((dtype).c_obj) + opts.set_dtypes(types_vec) + + opts.enable_keep_quotes(keep_quotes) + opts.enable_mixed_types_as_string(mixed_types_as_string) + opts.enable_prune_columns(prune_columns) + return opts + + +cpdef tuple chunked_read_json( + SourceInfo source_info, + list dtypes = None, + compression_type compression = compression_type.AUTO, + bool keep_quotes = False, + bool mixed_types_as_string = False, + bool prune_columns = False, + json_recovery_mode_t recovery_mode = json_recovery_mode_t.FAIL, + int chunk_size=100_000_000, +): + """Reads an JSON file into a :py:class:`~.types.TableWithMetadata`. + + Parameters + ---------- + source_info : SourceInfo + The SourceInfo object to read the JSON file from. + dtypes : list, default None + Set data types for the columns in the JSON file. + + Each element of the list has the format + (column_name, column_dtype, list of child dtypes), where + the list of child dtypes is an empty list if the child is not + a nested type (list or struct dtype), and is of format + (column_child_name, column_child_type, list of grandchild dtypes). + compression: CompressionType, default CompressionType.AUTO + The compression format of the JSON source. + keep_quotes : bool, default False + Whether the reader should keep quotes of string values. + mixed_types_as_string : bool, default False + If True, mixed type columns are returned as string columns. + If `False` parsing mixed type columns will thrown an error. + prune_columns : bool, default False + Whether to only read columns specified in dtypes. + recover_mode : JSONRecoveryMode, default JSONRecoveryMode.FAIL + Whether to raise an error or set corresponding values to null + when encountering an invalid JSON line. + chunk_size : int, default 100_000_000 bytes. + The number of bytes to be read in chunks. + The chunk_size should be set to at least row_size. + + Returns + ------- + tuple + A tuple of (columns, column_name, child_names) + """ + cdef size_type c_range_size = ( + chunk_size if chunk_size is not None else 0 + ) + cdef json_reader_options opts = _setup_json_reader_options( + source_info=source_info, + dtypes=dtypes, + compression=compression, + lines=True, + byte_range_offset=0, + byte_range_size=0, + keep_quotes=keep_quotes, + mixed_types_as_string=mixed_types_as_string, + prune_columns=prune_columns, + recovery_mode=recovery_mode, + ) + + # Read JSON + cdef table_with_metadata c_result + + final_columns = [] + meta_names = None + child_names = None + i = 0 + while True: + opts.set_byte_range_offset(c_range_size * i) + opts.set_byte_range_size(c_range_size) + + try: + with nogil: + c_result = move(cpp_read_json(opts)) + except (ValueError, OverflowError): + break + if meta_names is None: + meta_names = [info.name.decode() for info in c_result.metadata.schema_info] + if child_names is None: + child_names = TableWithMetadata._parse_col_names( + c_result.metadata.schema_info + ) + new_chunk = [ + col for col in TableWithMetadata.from_libcudf( + c_result).columns + ] + + if len(final_columns) == 0: + final_columns = new_chunk + else: + for col_idx in range(len(meta_names)): + final_columns[col_idx] = concatenate( + [final_columns[col_idx], new_chunk[col_idx]] + ) + # Must drop any residual GPU columns to save memory + new_chunk[col_idx] = None + i += 1 + return (final_columns, meta_names, child_names) + + cpdef TableWithMetadata read_json( SourceInfo source_info, list dtypes = None, @@ -76,7 +215,7 @@ cpdef TableWithMetadata read_json( the list of child dtypes is an empty list if the child is not a nested type (list or struct dtype), and is of format (column_child_name, column_child_type, list of grandchild dtypes). - compression_type: CompressionType, default CompressionType.AUTO + compression: CompressionType, default CompressionType.AUTO The compression format of the JSON source. byte_range_offset : size_type, default 0 Number of bytes to skip from source start. @@ -84,6 +223,9 @@ cpdef TableWithMetadata read_json( Number of bytes to read. By default, will read all bytes. keep_quotes : bool, default False Whether the reader should keep quotes of string values. + mixed_types_as_string : bool, default False + If True, mixed type columns are returned as string columns. + If `False` parsing mixed type columns will thrown an error. prune_columns : bool, default False Whether to only read columns specified in dtypes. recover_mode : JSONRecoveryMode, default JSONRecoveryMode.FAIL @@ -95,29 +237,19 @@ cpdef TableWithMetadata read_json( TableWithMetadata The Table and its corresponding metadata (column names) that were read in. """ - cdef vector[data_type] types_vec - cdef json_reader_options opts = move( - json_reader_options.builder(source_info.c_obj) - .compression(compression) - .lines(lines) - .byte_range_offset(byte_range_offset) - .byte_range_size(byte_range_size) - .recovery_mode(recovery_mode) - .build() + cdef json_reader_options opts = _setup_json_reader_options( + source_info=source_info, + dtypes=dtypes, + compression=compression, + lines=lines, + byte_range_offset=byte_range_offset, + byte_range_size=byte_range_size, + keep_quotes=keep_quotes, + mixed_types_as_string=mixed_types_as_string, + prune_columns=prune_columns, + recovery_mode=recovery_mode, ) - if dtypes is not None: - if isinstance(dtypes[0], tuple): - opts.set_dtypes(move(_generate_schema_map(dtypes))) - else: - for dtype in dtypes: - types_vec.push_back((dtype).c_obj) - opts.set_dtypes(types_vec) - - opts.enable_keep_quotes(keep_quotes) - opts.enable_mixed_types_as_string(mixed_types_as_string) - opts.enable_prune_columns(prune_columns) - # Read JSON cdef table_with_metadata c_result diff --git a/python/cudf/cudf/_lib/utils.pxd b/python/cudf/cudf/_lib/utils.pxd index 99850d549a1..1d55f7218dc 100644 --- a/python/cudf/cudf/_lib/utils.pxd +++ b/python/cudf/cudf/_lib/utils.pxd @@ -19,3 +19,4 @@ cdef table_view table_view_from_table(tbl, ignore_index=*) except* cdef columns_from_unique_ptr(unique_ptr[table] c_tbl) cdef columns_from_table_view(table_view tv, object owners) cdef columns_from_pylibcudf_table(tbl) +cdef _data_from_columns(columns, column_names, index_names=*) diff --git a/python/cudf/cudf/tests/test_csv.py b/python/cudf/cudf/tests/test_csv.py index 09617306606..a22a627523f 100644 --- a/python/cudf/cudf/tests/test_csv.py +++ b/python/cudf/cudf/tests/test_csv.py @@ -1191,7 +1191,7 @@ def test_csv_reader_byte_range_type_corner_case(tmpdir): ).to_csv(fname, chunksize=100000) byte_range = (2_147_483_648, 0) - with pytest.raises(RuntimeError, match="Offset is past end of file"): + with pytest.raises(OverflowError, match="Offset is past end of file"): cudf.read_csv(fname, byte_range=byte_range, header=None) diff --git a/python/cudf/cudf/tests/test_json.py b/python/cudf/cudf/tests/test_json.py index 9222f6d23db..7771afd692f 100644 --- a/python/cudf/cudf/tests/test_json.py +++ b/python/cudf/cudf/tests/test_json.py @@ -1428,3 +1428,19 @@ def test_json_reader_on_bad_lines(on_bad_lines): orient="records", on_bad_lines=on_bad_lines, ) + + +def test_chunked_json_reader(): + df = cudf.DataFrame( + { + "a": ["aaaa"] * 9_00_00_00, + "b": list(range(0, 9_00_00_00)), + } + ) + buf = BytesIO() + df.to_json(buf, lines=True, orient="records", engine="cudf") + buf.seek(0) + df = df.to_pandas() + with cudf.option_context("mode.pandas_compatible", True): + gdf = cudf.read_json(buf, lines=True) + assert_eq(df, gdf)