From 1a4c2aa38c6e7de8c6937b787a1263a4ccddadea Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Tue, 2 Jul 2024 07:38:18 -0700 Subject: [PATCH] Start migrating I/O writers to pylibcudf (starting with JSON) (#15952) Switches the JSON writer to use pylibcudf. xref #15162 Authors: - Thomas Li (https://github.com/lithomas1) - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Lawrence Mitchell (https://github.com/wence-) - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/15952 --- .../api_docs/pylibcudf/io/index.rst | 1 + .../user_guide/api_docs/pylibcudf/io/json.rst | 6 + python/cudf/cudf/_lib/json.pyx | 98 +++----- .../cudf/_lib/pylibcudf/io/CMakeLists.txt | 6 +- .../cudf/cudf/_lib/pylibcudf/io/__init__.pxd | 2 +- .../cudf/cudf/_lib/pylibcudf/io/__init__.py | 4 +- python/cudf/cudf/_lib/pylibcudf/io/avro.pyx | 4 +- python/cudf/cudf/_lib/pylibcudf/io/json.pxd | 18 ++ python/cudf/cudf/_lib/pylibcudf/io/json.pyx | 68 ++++++ python/cudf/cudf/_lib/pylibcudf/io/types.pxd | 11 + python/cudf/cudf/_lib/pylibcudf/io/types.pyx | 125 +++++++++- .../cudf/cudf/pylibcudf_tests/common/utils.py | 122 ++++++++-- python/cudf/cudf/pylibcudf_tests/conftest.py | 104 ++++++-- .../pylibcudf_tests/{ => io}/test_avro.py | 0 .../cudf/cudf/pylibcudf_tests/io/test_json.py | 116 +++++++++ .../test_source_sink_info.py} | 34 ++- .../cudf/cudf/pylibcudf_tests/test_copying.py | 226 +++++++++++++----- 17 files changed, 768 insertions(+), 177 deletions(-) create mode 100644 docs/cudf/source/user_guide/api_docs/pylibcudf/io/json.rst create mode 100644 python/cudf/cudf/_lib/pylibcudf/io/json.pxd create mode 100644 python/cudf/cudf/_lib/pylibcudf/io/json.pyx rename python/cudf/cudf/pylibcudf_tests/{ => io}/test_avro.py (100%) create mode 100644 python/cudf/cudf/pylibcudf_tests/io/test_json.py rename python/cudf/cudf/pylibcudf_tests/{test_source_info.py => io/test_source_sink_info.py} (72%) diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst index 0d53ac92db9..bde6d8094ce 100644 --- a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst @@ -16,3 +16,4 @@ I/O Functions :maxdepth: 1 avro + json diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/json.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/json.rst new file mode 100644 index 00000000000..6aeae1f322a --- /dev/null +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/json.rst @@ -0,0 +1,6 @@ +==== +JSON +==== + +.. automodule:: cudf._lib.pylibcudf.io.json + :members: diff --git a/python/cudf/cudf/_lib/json.pyx b/python/cudf/cudf/_lib/json.pyx index a8fef907bad..22e34feb547 100644 --- a/python/cudf/cudf/_lib/json.pyx +++ b/python/cudf/cudf/_lib/json.pyx @@ -9,38 +9,27 @@ from cudf.core.buffer import acquire_spill_lock from libcpp cimport bool from libcpp.map cimport map -from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector cimport cudf._lib.pylibcudf.libcudf.io.types as cudf_io_types -from cudf._lib.column cimport Column -from cudf._lib.io.utils cimport ( - make_sink_info, - make_source_info, - update_struct_field_names, -) -from cudf._lib.pylibcudf.libcudf.io.data_sink cimport data_sink +from cudf._lib.io.utils cimport make_source_info, update_struct_field_names from cudf._lib.pylibcudf.libcudf.io.json cimport ( json_reader_options, json_recovery_mode_t, - json_writer_options, read_json as libcudf_read_json, schema_element, - write_json as libcudf_write_json, ) from cudf._lib.pylibcudf.libcudf.io.types cimport ( - column_name_info, compression_type, - sink_info, - table_metadata, table_with_metadata, ) -from cudf._lib.pylibcudf.libcudf.table.table_view cimport table_view from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type from cudf._lib.types cimport dtype_to_data_type -from cudf._lib.utils cimport data_from_unique_ptr, table_view_from_table +from cudf._lib.utils cimport data_from_unique_ptr + +import cudf._lib.pylibcudf as plc cdef json_recovery_mode_t _get_json_recovery_mode(object on_bad_lines): @@ -175,45 +164,27 @@ def write_json( -------- cudf.to_json """ - cdef table_view input_table_view = table_view_from_table( - table, ignore_index=True - ) - - cdef unique_ptr[data_sink] data_sink_c - cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c) - cdef string na_c = na_rep.encode() - cdef bool include_nulls_c = include_nulls - cdef bool lines_c = lines - cdef int rows_per_chunk_c = rows_per_chunk - cdef string true_value_c = 'true'.encode() - cdef string false_value_c = 'false'.encode() - cdef table_metadata tbl_meta - - num_index_cols_meta = 0 - cdef column_name_info child_info - for i, name in enumerate(table._column_names, num_index_cols_meta): - child_info.name = name.encode() - tbl_meta.schema_info.push_back(child_info) - _set_col_children_metadata( - table[name]._column, - tbl_meta.schema_info[i] - ) + cdef list colnames = [] - cdef json_writer_options options = move( - json_writer_options.builder(sink_info_c, input_table_view) - .metadata(tbl_meta) - .na_rep(na_c) - .include_nulls(include_nulls_c) - .lines(lines_c) - .rows_per_chunk(rows_per_chunk_c) - .true_value(true_value_c) - .false_value(false_value_c) - .build() - ) + for name in table._column_names: + colnames.append((name, _dtype_to_names_list(table[name]._column))) try: - with nogil: - libcudf_write_json(options) + plc.io.json.write_json( + plc.io.SinkInfo([path_or_buf]), + plc.io.TableWithMetadata( + plc.Table([ + c.to_pylibcudf(mode="read") for c in table._columns + ]), + colnames + ), + na_rep, + include_nulls, + lines, + rows_per_chunk, + true_value="true", + false_value="false" + ) except OverflowError: raise OverflowError( f"Writing JSON file with rows_per_chunk={rows_per_chunk} failed. " @@ -254,23 +225,12 @@ cdef data_type _get_cudf_data_type_from_dtype(object dtype) except *: ) return dtype_to_data_type(dtype) -cdef _set_col_children_metadata(Column col, - column_name_info& col_meta): - cdef column_name_info child_info + +def _dtype_to_names_list(col): if isinstance(col.dtype, cudf.StructDtype): - for i, (child_col, name) in enumerate( - zip(col.children, list(col.dtype.fields)) - ): - child_info.name = name.encode() - col_meta.children.push_back(child_info) - _set_col_children_metadata( - child_col, col_meta.children[i] - ) + return [(name, _dtype_to_names_list(child)) + for name, child in zip(col.dtype.fields, col.children)] elif isinstance(col.dtype, cudf.ListDtype): - for i, child_col in enumerate(col.children): - col_meta.children.push_back(child_info) - _set_col_children_metadata( - child_col, col_meta.children[i] - ) - else: - return + return [("", _dtype_to_names_list(child)) + for child in col.children] + return [] diff --git a/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt b/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt index 32f0f5543e4..084b341ec48 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt +++ b/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt @@ -12,7 +12,7 @@ # the License. # ============================================================================= -set(cython_sources avro.pyx datasource.pyx types.pyx) +set(cython_sources avro.pyx datasource.pyx json.pyx types.pyx) set(linked_libraries cudf::cudf) rapids_cython_create_modules( @@ -21,5 +21,7 @@ rapids_cython_create_modules( LINKED_LIBRARIES "${linked_libraries}" MODULE_PREFIX pylibcudf_io_ ASSOCIATED_TARGETS cudf ) -set(targets_using_arrow_headers pylibcudf_io_avro pylibcudf_io_datasource pylibcudf_io_types) +set(targets_using_arrow_headers pylibcudf_io_avro pylibcudf_io_datasource pylibcudf_io_json + pylibcudf_io_types +) link_to_pyarrow_headers("${targets_using_arrow_headers}") diff --git a/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd b/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd index cfd6d2cd281..ef4c65b277e 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd @@ -1,4 +1,4 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from . cimport avro, datasource, types +from . cimport avro, datasource, json, types from .types cimport SourceInfo, TableWithMetadata diff --git a/python/cudf/cudf/_lib/pylibcudf/io/__init__.py b/python/cudf/cudf/_lib/pylibcudf/io/__init__.py index a54ba1834dc..fb4e4c7e4bb 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/__init__.py +++ b/python/cudf/cudf/_lib/pylibcudf/io/__init__.py @@ -1,4 +1,4 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from . import avro, datasource, types -from .types import SourceInfo, TableWithMetadata +from . import avro, datasource, json, types +from .types import SinkInfo, SourceInfo, TableWithMetadata diff --git a/python/cudf/cudf/_lib/pylibcudf/io/avro.pyx b/python/cudf/cudf/_lib/pylibcudf/io/avro.pyx index 946e0896fc8..538bd8aa322 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/avro.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/avro.pyx @@ -19,7 +19,7 @@ cpdef TableWithMetadata read_avro( size_type num_rows = -1 ): """ - Reads an Avro dataset into a set of columns. + Reads an Avro dataset into a :py:class:`~.types.TableWithMetadata`. Parameters ---------- @@ -36,7 +36,7 @@ cpdef TableWithMetadata read_avro( Returns ------- TableWithMetadata - The Table and its corresponding metadata that was read in. + The Table and its corresponding metadata (column names) that were read in. """ cdef vector[string] c_columns if columns is not None and len(columns) > 0: diff --git a/python/cudf/cudf/_lib/pylibcudf/io/json.pxd b/python/cudf/cudf/_lib/pylibcudf/io/json.pxd new file mode 100644 index 00000000000..a91d574131f --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/io/json.pxd @@ -0,0 +1,18 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libcpp cimport bool + +from cudf._lib.pylibcudf.io.types cimport SinkInfo, TableWithMetadata +from cudf._lib.pylibcudf.libcudf.types cimport size_type + + +cpdef void write_json( + SinkInfo sink_info, + TableWithMetadata tbl, + str na_rep = *, + bool include_nulls = *, + bool lines = *, + size_type rows_per_chunk = *, + str true_value = *, + str false_value = * +) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/json.pyx b/python/cudf/cudf/_lib/pylibcudf/io/json.pyx new file mode 100644 index 00000000000..7530eba3803 --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/io/json.pyx @@ -0,0 +1,68 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libcpp cimport bool +from libcpp.limits cimport numeric_limits +from libcpp.string cimport string + +from cudf._lib.pylibcudf.io.types cimport SinkInfo, TableWithMetadata +from cudf._lib.pylibcudf.libcudf.io.json cimport ( + json_writer_options, + write_json as cpp_write_json, +) +from cudf._lib.pylibcudf.libcudf.io.types cimport table_metadata +from cudf._lib.pylibcudf.types cimport size_type + + +cpdef void write_json( + SinkInfo sink_info, + TableWithMetadata table_w_meta, + str na_rep = "", + bool include_nulls = False, + bool lines = False, + size_type rows_per_chunk = numeric_limits[size_type].max(), + str true_value = "true", + str false_value = "false" +): + """ + Writes a :py:class:`~cudf._lib.pylibcudf.table.Table` to JSON format. + + Parameters + ---------- + sink_info: SinkInfo + The SinkInfo object to write the JSON to. + table_w_meta: TableWithMetadata + The TableWithMetadata object containing the Table to write + na_rep: str, default "" + The string representation for null values. + include_nulls: bool, default False + Enables/Disables output of nulls as 'null'. + lines: bool, default False + If `True`, write output in the JSON lines format. + rows_per_chunk: size_type, defaults to length of the input table + The maximum number of rows to write at a time. + true_value: str, default "true" + The string representation for values != 0 in INT8 types. + false_value: str, default "false" + The string representation for values == 0 in INT8 types. + """ + cdef table_metadata tbl_meta = table_w_meta.metadata + cdef string na_rep_c = na_rep.encode() + + cdef json_writer_options options = ( + json_writer_options.builder(sink_info.c_obj, table_w_meta.tbl.view()) + .metadata(tbl_meta) + .na_rep(na_rep_c) + .include_nulls(include_nulls) + .lines(lines) + .build() + ) + + if rows_per_chunk != numeric_limits[size_type].max(): + options.set_rows_per_chunk(rows_per_chunk) + if true_value != "true": + options.set_true_value(true_value.encode()) + if false_value != "false": + options.set_false_value(false_value.encode()) + + with nogil: + cpp_write_json(options) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/types.pxd b/python/cudf/cudf/_lib/pylibcudf/io/types.pxd index aa846a47343..88daf54f33b 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/types.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/types.pxd @@ -1,4 +1,8 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +from libcpp.memory cimport unique_ptr +from libcpp.vector cimport vector + +from cudf._lib.pylibcudf.libcudf.io.data_sink cimport data_sink from cudf._lib.pylibcudf.libcudf.io.types cimport ( column_encoding, column_in_metadata, @@ -22,8 +26,15 @@ cdef class TableWithMetadata: cdef public Table tbl cdef table_metadata metadata + cdef vector[column_name_info] _make_column_info(self, list column_names) + @staticmethod cdef TableWithMetadata from_libcudf(table_with_metadata& tbl) cdef class SourceInfo: cdef source_info c_obj + +cdef class SinkInfo: + # This vector just exists to keep the unique_ptrs to the sinks alive + cdef vector[unique_ptr[data_sink]] sink_storage + cdef sink_info c_obj diff --git a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx index ab3375da662..f94e20970a4 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx @@ -1,17 +1,23 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +from cpython.buffer cimport PyBUF_READ +from cpython.memoryview cimport PyMemoryView_FromMemory +from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector from cudf._lib.pylibcudf.io.datasource cimport Datasource +from cudf._lib.pylibcudf.libcudf.io.data_sink cimport data_sink from cudf._lib.pylibcudf.libcudf.io.datasource cimport datasource from cudf._lib.pylibcudf.libcudf.io.types cimport ( + column_name_info, host_buffer, source_info, table_with_metadata, ) +import codecs import errno import io import os @@ -22,7 +28,39 @@ cdef class TableWithMetadata: (e.g. column names) For details, see :cpp:class:`cudf::io::table_with_metadata`. + + Parameters + ---------- + tbl : Table + The input table. + column_names : list + A list of tuples each containing the name of each column + and the names of its child columns (in the same format). + e.g. + [("id", []), ("name", [("first", []), ("last", [])])] + """ + def __init__(self, Table tbl, list column_names): + self.tbl = tbl + + self.metadata.schema_info = self._make_column_info(column_names) + + cdef vector[column_name_info] _make_column_info(self, list column_names): + cdef vector[column_name_info] col_name_infos + cdef column_name_info info + + col_name_infos.reserve(len(column_names)) + + for name, child_names in column_names: + if not isinstance(name, str): + raise ValueError("Column name must be a string!") + + info.name = name.encode() + info.children = self._make_column_info(child_names) + + col_name_infos.push_back(info) + + return col_name_infos @property def columns(self): @@ -51,6 +89,7 @@ cdef class TableWithMetadata: out.metadata = tbl_with_meta.metadata return out + cdef class SourceInfo: """A class containing details on a source to read from. @@ -119,7 +158,87 @@ cdef class SourceInfo: raise ValueError("Sources must be a list of str/paths, " "bytes, io.BytesIO, or a Datasource") - if empty_buffer is True: - c_host_buffers.push_back(host_buffer(NULL, 0)) + self.c_obj = source_info(c_host_buffers) + + +# Adapts a python io.IOBase object as a libcudf IO data_sink. This lets you +# write from cudf to any python file-like object (File/BytesIO/SocketIO etc) +cdef cppclass iobase_data_sink(data_sink): + object buf + + iobase_data_sink(object buf_): + this.buf = buf_ + + void host_write(const void * data, size_t size) with gil: + if isinstance(buf, io.TextIOBase): + buf.write(PyMemoryView_FromMemory(data, size, PyBUF_READ) + .tobytes().decode()) + else: + buf.write(PyMemoryView_FromMemory(data, size, PyBUF_READ)) + + void flush() with gil: + buf.flush() + + size_t bytes_written() with gil: + return buf.tell() + + +cdef class SinkInfo: + """A class containing details on a source to read from. + + For details, see :cpp:class:`cudf::io::sink_info`. + + Parameters + ---------- + sinks : list of str, PathLike, BytesIO, StringIO + + A homogeneous list of sinks (this can be a string filename, + bytes, or one of the Python I/O classes) to read from. + + Mixing different types of sinks will raise a `ValueError`. + """ + + def __init__(self, list sinks): + cdef vector[data_sink *] data_sinks + cdef vector[string] paths + + if not sinks: + raise ValueError("Need to pass at least one sink") + + if isinstance(sinks[0], os.PathLike): + sinks = [os.path.expanduser(s) for s in sinks] + + cdef object initial_sink_cls = type(sinks[0]) + + if not all(isinstance(s, initial_sink_cls) for s in sinks): + raise ValueError("All sinks must be of the same type!") + + if initial_sink_cls in {io.StringIO, io.BytesIO, io.TextIOBase}: + data_sinks.reserve(len(sinks)) + if isinstance(sinks[0], (io.StringIO, io.BytesIO)): + for s in sinks: + self.sink_storage.push_back( + unique_ptr[data_sink](new iobase_data_sink(s)) + ) + elif isinstance(sinks[0], io.TextIOBase): + for s in sinks: + if codecs.lookup(s).name not in ('utf-8', 'ascii'): + raise NotImplementedError(f"Unsupported encoding {s.encoding}") + self.sink_storage.push_back( + unique_ptr[data_sink](new iobase_data_sink(s.buffer)) + ) + data_sinks.push_back(self.sink_storage.back().get()) + elif initial_sink_cls is str: + paths.reserve(len(sinks)) + for s in sinks: + paths.push_back( s.encode()) + else: + raise TypeError( + "Unrecognized input type: {}".format(type(sinks[0])) + ) - self.c_obj = move(source_info(c_host_buffers)) + if data_sinks.size() > 0: + self.c_obj = sink_info(data_sinks) + else: + # we don't have sinks so we must have paths to sinks + self.c_obj = sink_info(paths) diff --git a/python/cudf/cudf/pylibcudf_tests/common/utils.py b/python/cudf/cudf/pylibcudf_tests/common/utils.py index bf927e661fe..f8bfe340ae5 100644 --- a/python/cudf/cudf/pylibcudf_tests/common/utils.py +++ b/python/cudf/cudf/pylibcudf_tests/common/utils.py @@ -1,24 +1,39 @@ # Copyright (c) 2024, NVIDIA CORPORATION. from __future__ import annotations +import io +import os + import pyarrow as pa import pytest from cudf._lib import pylibcudf as plc -def metadata_from_arrow_array( - pa_array: pa.Array, +def metadata_from_arrow_type( + pa_type: pa.Array, + name: str = "", ) -> plc.interop.ColumnMetadata | None: - metadata = None - if pa.types.is_list(dtype := pa_array.type) or pa.types.is_struct(dtype): + metadata = plc.interop.ColumnMetadata(name) # None + if pa.types.is_list(pa_type): + child_meta = [plc.interop.ColumnMetadata("offsets")] + for i in range(pa_type.num_fields): + field_meta = metadata_from_arrow_type( + pa_type.field(i).type, pa_type.field(i).name + ) + child_meta.append(field_meta) + metadata = plc.interop.ColumnMetadata(name, child_meta) + elif pa.types.is_struct(pa_type): + child_meta = [] + for i in range(pa_type.num_fields): + field_meta = metadata_from_arrow_type( + pa_type.field(i).type, pa_type.field(i).name + ) + child_meta.append(field_meta) metadata = plc.interop.ColumnMetadata( - "", + name, # libcudf does not store field names, so just match pyarrow's. - [ - plc.interop.ColumnMetadata(pa_array.type.field(i).name) - for i in range(pa_array.type.num_fields) - ], + child_meta, ) return metadata @@ -32,13 +47,13 @@ def assert_column_eq( rhs, plc.Column ): rhs = plc.interop.to_arrow( - rhs, metadata=metadata_from_arrow_array(lhs) + rhs, metadata=metadata_from_arrow_type(lhs.type) ) elif isinstance(lhs, plc.Column) and isinstance( rhs, (pa.Array, pa.ChunkedArray) ): lhs = plc.interop.to_arrow( - lhs, metadata=metadata_from_arrow_array(rhs) + lhs, metadata=metadata_from_arrow_type(rhs.type) ) else: raise ValueError( @@ -94,21 +109,16 @@ def is_signed_integer(plc_dtype: plc.DataType): ) -def is_unsigned_integer(plc_dtype: plc.DataType): - return plc_dtype.id() in ( - plc.TypeId.UINT8, - plc.TypeId.UINT16, - plc.TypeId.UINT32, - plc.TypeId.UINT64, - ) - - def is_integer(plc_dtype: plc.DataType): return plc_dtype.id() in ( plc.TypeId.INT8, plc.TypeId.INT16, plc.TypeId.INT32, plc.TypeId.INT64, + plc.TypeId.UINT8, + plc.TypeId.UINT16, + plc.TypeId.UINT32, + plc.TypeId.UINT64, ) @@ -135,8 +145,80 @@ def is_fixed_width(plc_dtype: plc.DataType): ) +def nesting_level(typ) -> tuple[int, int]: + """Return list and struct nesting of a pyarrow type.""" + if isinstance(typ, pa.ListType): + list_, struct = nesting_level(typ.value_type) + return list_ + 1, struct + elif isinstance(typ, pa.StructType): + lists, structs = map(max, zip(*(nesting_level(t.type) for t in typ))) + return lists, structs + 1 + else: + return 0, 0 + + +def is_nested_struct(typ): + return nesting_level(typ)[1] > 1 + + +def is_nested_list(typ): + return nesting_level(typ)[0] > 1 + + +def sink_to_str(sink): + """ + Takes a sink (e.g. StringIO/BytesIO, filepath, etc.) + and reads in the contents into a string (str not bytes) + for comparison + """ + if isinstance(sink, (str, os.PathLike)): + with open(sink, "r") as f: + str_result = f.read() + elif isinstance(sink, io.BytesIO): + sink.seek(0) + str_result = sink.read().decode() + else: + sink.seek(0) + str_result = sink.read() + return str_result + + +NUMERIC_PA_TYPES = [pa.int64(), pa.float64(), pa.uint64()] +STRING_PA_TYPES = [pa.string()] +BOOL_PA_TYPES = [pa.bool_()] +LIST_PA_TYPES = [ + pa.list_(pa.int64()), + # Nested case + pa.list_(pa.list_(pa.int64())), +] + # We must explicitly specify this type via a field to ensure we don't include # nullability accidentally. DEFAULT_STRUCT_TESTING_TYPE = pa.struct( [pa.field("v", pa.int64(), nullable=False)] ) +NESTED_STRUCT_TESTING_TYPE = pa.struct( + [ + pa.field("a", pa.int64(), nullable=False), + pa.field( + "b_struct", + pa.struct([pa.field("b", pa.float64(), nullable=False)]), + nullable=False, + ), + ] +) + +DEFAULT_PA_STRUCT_TESTING_TYPES = [ + DEFAULT_STRUCT_TESTING_TYPE, + NESTED_STRUCT_TESTING_TYPE, +] + +DEFAULT_PA_TYPES = ( + NUMERIC_PA_TYPES + + STRING_PA_TYPES + + BOOL_PA_TYPES + + LIST_PA_TYPES + + DEFAULT_PA_STRUCT_TESTING_TYPES +) + +ALL_PA_TYPES = DEFAULT_PA_TYPES diff --git a/python/cudf/cudf/pylibcudf_tests/conftest.py b/python/cudf/cudf/pylibcudf_tests/conftest.py index b169bbdee5b..e4760ea7ac8 100644 --- a/python/cudf/cudf/pylibcudf_tests/conftest.py +++ b/python/cudf/cudf/pylibcudf_tests/conftest.py @@ -1,9 +1,12 @@ # Copyright (c) 2024, NVIDIA CORPORATION. # Tell ruff it's OK that some imports occur after the sys.path.insert # ruff: noqa: E402 +import io import os +import pathlib import sys +import numpy as np import pyarrow as pa import pytest @@ -11,7 +14,7 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "common")) -from utils import DEFAULT_STRUCT_TESTING_TYPE +from utils import ALL_PA_TYPES, DEFAULT_PA_TYPES, NUMERIC_PA_TYPES # This fixture defines the standard set of types that all tests should default to @@ -20,14 +23,7 @@ # across modules. Otherwise it may be defined on a per-module basis. @pytest.fixture( scope="session", - params=[ - pa.int64(), - pa.float64(), - pa.string(), - pa.bool_(), - pa.list_(pa.int64()), - DEFAULT_STRUCT_TESTING_TYPE, - ], + params=DEFAULT_PA_TYPES, ) def pa_type(request): return request.param @@ -35,16 +31,96 @@ def pa_type(request): @pytest.fixture( scope="session", - params=[ - pa.int64(), - pa.float64(), - pa.uint64(), - ], + params=NUMERIC_PA_TYPES, ) def numeric_pa_type(request): return request.param +# TODO: Consider adding another fixture/adapting this +# fixture to consider nullability +@pytest.fixture(scope="session", params=[0, 100]) +def table_data(request): + """ + Returns (TableWithMetadata, pa_table). + + This is the default fixture you should be using for testing + pylibcudf I/O writers. + + Contains one of each category (e.g. int, bool, list, struct) + of dtypes. + """ + nrows = request.param + + table_dict = {} + # Colnames in the format expected by + # plc.io.TableWithMetadata + colnames = [] + + np.random.seed(42) + + for typ in ALL_PA_TYPES: + rand_vals = np.random.randint(0, nrows, nrows) + child_colnames = [] + + def _generate_nested_data(typ): + child_colnames = [] + + # recurse to get vals for children + rand_arrs = [] + for i in range(typ.num_fields): + rand_arr, grandchild_colnames = _generate_nested_data( + typ.field(i).type + ) + rand_arrs.append(rand_arr) + child_colnames.append((typ.field(i).name, grandchild_colnames)) + + if isinstance(typ, pa.StructType): + pa_array = pa.StructArray.from_arrays( + [rand_arr for rand_arr in rand_arrs], + names=[typ.field(i).name for i in range(typ.num_fields)], + ) + elif isinstance(typ, pa.ListType): + pa_array = pa.array( + [list(row_vals) for row_vals in zip(rand_arrs[0])], + type=typ, + ) + child_colnames.append(("", grandchild_colnames)) + else: + # typ is scalar type + pa_array = pa.array(rand_vals).cast(typ) + return pa_array, child_colnames + + if isinstance(typ, (pa.ListType, pa.StructType)): + rand_arr, child_colnames = _generate_nested_data(typ) + else: + rand_arr = pa.array(rand_vals).cast(typ) + + table_dict[f"col_{typ}"] = rand_arr + colnames.append((f"col_{typ}", child_colnames)) + + pa_table = pa.Table.from_pydict(table_dict) + + return plc.io.TableWithMetadata( + plc.interop.from_arrow(pa_table), column_names=colnames + ), pa_table + + +@pytest.fixture( + params=["a.txt", pathlib.Path("a.txt"), io.BytesIO, io.StringIO], +) +def source_or_sink(request, tmp_path): + fp_or_buf = request.param + if isinstance(fp_or_buf, str): + return f"{tmp_path}/{fp_or_buf}" + elif isinstance(fp_or_buf, os.PathLike): + return tmp_path.joinpath(fp_or_buf) + elif issubclass(fp_or_buf, io.IOBase): + # Must construct io.StringIO/io.BytesIO inside + # fixture, or we'll end up re-using it + return fp_or_buf() + + @pytest.fixture( scope="session", params=[opt for opt in plc.types.Interpolation] ) diff --git a/python/cudf/cudf/pylibcudf_tests/test_avro.py b/python/cudf/cudf/pylibcudf_tests/io/test_avro.py similarity index 100% rename from python/cudf/cudf/pylibcudf_tests/test_avro.py rename to python/cudf/cudf/pylibcudf_tests/io/test_avro.py diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_json.py b/python/cudf/cudf/pylibcudf_tests/io/test_json.py new file mode 100644 index 00000000000..d6b8bfa6976 --- /dev/null +++ b/python/cudf/cudf/pylibcudf_tests/io/test_json.py @@ -0,0 +1,116 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +import io + +import pyarrow as pa +import pytest +from utils import sink_to_str + +import cudf._lib.pylibcudf as plc + + +@pytest.mark.parametrize("rows_per_chunk", [8, 100]) +@pytest.mark.parametrize("lines", [True, False]) +def test_write_json_basic(table_data, source_or_sink, lines, rows_per_chunk): + plc_table_w_meta, pa_table = table_data + sink = source_or_sink + + plc.io.json.write_json( + plc.io.SinkInfo([sink]), + plc_table_w_meta, + lines=lines, + rows_per_chunk=rows_per_chunk, + ) + + exp = pa_table.to_pandas() + + # Convert everything to string to make + # comparisons easier + str_result = sink_to_str(sink) + + pd_result = exp.to_json(orient="records", lines=lines) + + assert str_result == pd_result + + +@pytest.mark.parametrize("include_nulls", [True, False]) +@pytest.mark.parametrize("na_rep", ["null", "awef", ""]) +def test_write_json_nulls(na_rep, include_nulls): + names = ["a", "b"] + pa_tbl = pa.Table.from_arrays( + [pa.array([1.0, 2.0, None]), pa.array([True, None, False])], + names=names, + ) + plc_tbl = plc.interop.from_arrow(pa_tbl) + plc_tbl_w_meta = plc.io.types.TableWithMetadata( + plc_tbl, column_names=[(name, []) for name in names] + ) + + sink = io.StringIO() + + plc.io.json.write_json( + plc.io.SinkInfo([sink]), + plc_tbl_w_meta, + na_rep=na_rep, + include_nulls=include_nulls, + ) + + exp = pa_tbl.to_pandas() + + # Convert everything to string to make + # comparisons easier + str_result = sink_to_str(sink) + pd_result = exp.to_json(orient="records") + + if not include_nulls: + # No equivalent in pandas, so we just + # sanity check by making sure na_rep + # doesn't appear in the output + + # don't quote null + for name in names: + assert f'{{"{name}":{na_rep}}}' not in str_result + return + + # pandas doesn't suppport na_rep + # let's just manually do str.replace + pd_result = pd_result.replace("null", na_rep) + + assert str_result == pd_result + + +@pytest.mark.parametrize("true_value", ["True", "correct"]) +@pytest.mark.parametrize("false_value", ["False", "wrong"]) +def test_write_json_bool_opts(true_value, false_value): + names = ["a"] + pa_tbl = pa.Table.from_arrays([pa.array([True, None, False])], names=names) + plc_tbl = plc.interop.from_arrow(pa_tbl) + plc_tbl_w_meta = plc.io.types.TableWithMetadata( + plc_tbl, column_names=[(name, []) for name in names] + ) + + sink = io.StringIO() + + plc.io.json.write_json( + plc.io.SinkInfo([sink]), + plc_tbl_w_meta, + include_nulls=True, + na_rep="null", + true_value=true_value, + false_value=false_value, + ) + + exp = pa_tbl.to_pandas() + + # Convert everything to string to make + # comparisons easier + str_result = sink_to_str(sink) + pd_result = exp.to_json(orient="records") + + # pandas doesn't suppport na_rep + # let's just manually do str.replace + if true_value != "true": + pd_result = pd_result.replace("true", true_value) + if false_value != "false": + pd_result = pd_result.replace("false", false_value) + + assert str_result == pd_result diff --git a/python/cudf/cudf/pylibcudf_tests/test_source_info.py b/python/cudf/cudf/pylibcudf_tests/io/test_source_sink_info.py similarity index 72% rename from python/cudf/cudf/pylibcudf_tests/test_source_info.py rename to python/cudf/cudf/pylibcudf_tests/io/test_source_sink_info.py index 019321b7259..287dd8f21c8 100644 --- a/python/cudf/cudf/pylibcudf_tests/test_source_info.py +++ b/python/cudf/cudf/pylibcudf_tests/io/test_source_sink_info.py @@ -9,6 +9,21 @@ from cudf._lib.pylibcudf.io.datasource import NativeFileDatasource +@pytest.fixture(params=[plc.io.SourceInfo, plc.io.SinkInfo]) +def io_class(request): + return request.param + + +def _skip_invalid_sinks(io_class, sink): + """ + Skip invalid sinks for SinkInfo + """ + if io_class is plc.io.SinkInfo and isinstance( + sink, (bytes, NativeFileDatasource) + ): + pytest.skip(f"{sink} is not a valid input for SinkInfo") + + @pytest.mark.parametrize( "source", [ @@ -18,16 +33,15 @@ NativeFileDatasource(pa.PythonFile(io.BytesIO(), mode="r")), ], ) -def test_source_info_ctor(source, tmp_path): +def test_source_info_ctor(io_class, source, tmp_path): if isinstance(source, str): file = tmp_path / source file.write_bytes("hello world".encode("utf-8")) source = str(file) - plc.io.SourceInfo([source]) + _skip_invalid_sinks(io_class, source) - # TODO: test contents of source_info buffer is correct - # once buffers are exposed on python side + io_class([source]) @pytest.mark.parametrize( @@ -42,7 +56,7 @@ def test_source_info_ctor(source, tmp_path): ], ], ) -def test_source_info_ctor_multiple(sources, tmp_path): +def test_source_info_ctor_multiple(io_class, sources, tmp_path): for i in range(len(sources)): source = sources[i] if isinstance(source, str): @@ -50,10 +64,9 @@ def test_source_info_ctor_multiple(sources, tmp_path): file.write_bytes("hello world".encode("utf-8")) sources[i] = str(file) - plc.io.SourceInfo(sources) + _skip_invalid_sinks(io_class, source) - # TODO: test contents of source_info buffer is correct - # once buffers are exposed on python side + io_class(sources) @pytest.mark.parametrize( @@ -73,7 +86,7 @@ def test_source_info_ctor_multiple(sources, tmp_path): ], ], ) -def test_source_info_ctor_mixing_invalid(sources, tmp_path): +def test_source_info_ctor_mixing_invalid(io_class, sources, tmp_path): # Unlike the previous test # don't create files so that they are missing for i in range(len(sources)): @@ -82,8 +95,9 @@ def test_source_info_ctor_mixing_invalid(sources, tmp_path): file = tmp_path / source file.write_bytes("hello world".encode("utf-8")) sources[i] = str(file) + _skip_invalid_sinks(io_class, source) with pytest.raises(ValueError): - plc.io.SourceInfo(sources) + io_class(sources) def test_source_info_invalid(): diff --git a/python/cudf/cudf/pylibcudf_tests/test_copying.py b/python/cudf/cudf/pylibcudf_tests/test_copying.py index da3ca3a6d1e..0a6df198d46 100644 --- a/python/cudf/cudf/pylibcudf_tests/test_copying.py +++ b/python/cudf/cudf/pylibcudf_tests/test_copying.py @@ -5,19 +5,24 @@ import pytest from utils import ( DEFAULT_STRUCT_TESTING_TYPE, + NESTED_STRUCT_TESTING_TYPE, assert_column_eq, assert_table_eq, cudf_raises, is_fixed_width, is_floating, is_integer, + is_nested_list, + is_nested_struct, is_string, - metadata_from_arrow_array, + metadata_from_arrow_type, ) from cudf._lib import pylibcudf as plc +# TODO: consider moving this to conftest and "pairing" +# it with pa_type, so that they don't get out of sync # TODO: Test nullable data @pytest.fixture(scope="module") def input_column(pa_type): @@ -28,10 +33,27 @@ def input_column(pa_type): elif pa.types.is_boolean(pa_type): pa_array = pa.array([True, True, False], type=pa_type) elif pa.types.is_list(pa_type): - # TODO: Add heterogenous sizes - pa_array = pa.array([[1], [2], [3]], type=pa_type) + if pa_type.value_type == pa.int64(): + pa_array = pa.array([[1], [2, 3], [3]], type=pa_type) + elif ( + isinstance(pa_type.value_type, pa.ListType) + and pa_type.value_type.value_type == pa.int64() + ): + pa_array = pa.array([[[1]], [[2, 3]], [[3]]], type=pa_type) + else: + raise ValueError("Unsupported type " + pa_type.value_type) elif pa.types.is_struct(pa_type): - pa_array = pa.array([{"v": 1}, {"v": 2}, {"v": 3}], type=pa_type) + if not is_nested_struct(pa_type): + pa_array = pa.array([{"v": 1}, {"v": 2}, {"v": 3}], type=pa_type) + else: + pa_array = pa.array( + [ + {"a": 1, "b_struct": {"b": 1.0}}, + {"a": 2, "b_struct": {"b": 2.0}}, + {"a": 3, "b_struct": {"b": 3.0}}, + ], + type=pa_type, + ) else: raise ValueError("Unsupported type") return pa_array, plc.interop.from_arrow(pa_array) @@ -55,13 +77,37 @@ def target_column(pa_type): [False, True, True, False, True, False], type=pa_type ) elif pa.types.is_list(pa_type): - # TODO: Add heterogenous sizes - pa_array = pa.array([[4], [5], [6], [7], [8], [9]], type=pa_type) + if pa_type.value_type == pa.int64(): + pa_array = pa.array( + [[4], [5, 6], [7], [8], [9], [10]], type=pa_type + ) + elif ( + isinstance(pa_type.value_type, pa.ListType) + and pa_type.value_type.value_type == pa.int64() + ): + pa_array = pa.array( + [[[4]], [[5, 6]], [[7]], [[8]], [[9]], [[10]]], type=pa_type + ) + else: + raise ValueError("Unsupported type") elif pa.types.is_struct(pa_type): - pa_array = pa.array( - [{"v": 4}, {"v": 5}, {"v": 6}, {"v": 7}, {"v": 8}, {"v": 9}], - type=pa_type, - ) + if not is_nested_struct(pa_type): + pa_array = pa.array( + [{"v": 4}, {"v": 5}, {"v": 6}, {"v": 7}, {"v": 8}, {"v": 9}], + type=pa_type, + ) + else: + pa_array = pa.array( + [ + {"a": 4, "b_struct": {"b": 4.0}}, + {"a": 5, "b_struct": {"b": 5.0}}, + {"a": 6, "b_struct": {"b": 6.0}}, + {"a": 7, "b_struct": {"b": 7.0}}, + {"a": 8, "b_struct": {"b": 8.0}}, + {"a": 9, "b_struct": {"b": 9.0}}, + ], + type=pa_type, + ) else: raise ValueError("Unsupported type") return pa_array, plc.interop.from_arrow(pa_array) @@ -96,10 +142,22 @@ def source_scalar(pa_type): elif pa.types.is_boolean(pa_type): pa_scalar = pa.scalar(False, type=pa_type) elif pa.types.is_list(pa_type): - # TODO: Longer list? - pa_scalar = pa.scalar([1], type=pa_type) + if pa_type.value_type == pa.int64(): + pa_scalar = pa.scalar([1, 2, 3, 4], type=pa_type) + elif ( + isinstance(pa_type.value_type, pa.ListType) + and pa_type.value_type.value_type == pa.int64() + ): + pa_scalar = pa.scalar([[1, 2, 3, 4]], type=pa_type) + else: + raise ValueError("Unsupported type") elif pa.types.is_struct(pa_type): - pa_scalar = pa.scalar({"v": 1}, type=pa_type) + if not is_nested_struct(pa_type): + pa_scalar = pa.scalar({"v": 1}, type=pa_type) + else: + pa_scalar = pa.scalar( + {"a": 1, "b_struct": {"b": 1.0}}, type=pa_type + ) else: raise ValueError("Unsupported type") return pa_scalar, plc.interop.from_arrow(pa_scalar) @@ -196,27 +254,54 @@ def test_scatter_table( ) if pa.types.is_list(dtype := pa_target_table[0].type): - expected = pa.table( - [pa.array([[4], [1], [2], [3], [8], [9]])] * 3, [""] * 3 - ) + if is_nested_list(dtype): + expected = pa.table( + [pa.array([[[4]], [[1]], [[2, 3]], [[3]], [[9]], [[10]]])] + * 3, + [""] * 3, + ) + else: + expected = pa.table( + [pa.array([[4], [1], [2, 3], [3], [9], [10]])] * 3, + [""] * 3, + ) elif pa.types.is_struct(dtype): - expected = pa.table( - [ - pa.array( - [ - {"v": 4}, - {"v": 1}, - {"v": 2}, - {"v": 3}, - {"v": 8}, - {"v": 9}, - ], - type=DEFAULT_STRUCT_TESTING_TYPE, - ) - ] - * 3, - [""] * 3, - ) + if is_nested_struct(dtype): + expected = pa.table( + [ + pa.array( + [ + {"a": 4, "b_struct": {"b": 4.0}}, + {"a": 1, "b_struct": {"b": 1.0}}, + {"a": 2, "b_struct": {"b": 2.0}}, + {"a": 3, "b_struct": {"b": 3.0}}, + {"a": 8, "b_struct": {"b": 8.0}}, + {"a": 9, "b_struct": {"b": 9.0}}, + ], + type=NESTED_STRUCT_TESTING_TYPE, + ) + ] + * 3, + [""] * 3, + ) + else: + expected = pa.table( + [ + pa.array( + [ + {"v": 4}, + {"v": 1}, + {"v": 2}, + {"v": 3}, + {"v": 8}, + {"v": 9}, + ], + type=DEFAULT_STRUCT_TESTING_TYPE, + ) + ] + * 3, + [""] * 3, + ) else: expected = _pyarrow_boolean_mask_scatter_table( pa_source_table, @@ -627,6 +712,7 @@ def test_split_column_out_of_bounds(target_column): def test_split_table(target_table): pa_target_table, plc_target_table = target_table + upper_bounds = [1, 3, 5] lower_bounds = [0] + upper_bounds[:-1] result = plc.copying.split(plc_target_table, upper_bounds) @@ -718,6 +804,7 @@ def test_copy_if_else_column_scalar( pa_target_column, plc_target_column = target_column pa_source_scalar, plc_source_scalar = source_scalar pa_mask, plc_mask = mask + args = ( (plc_target_column, plc_source_scalar) if array_left @@ -766,27 +853,58 @@ def test_boolean_mask_scatter_from_table( ) if pa.types.is_list(dtype := pa_target_table[0].type): - expected = pa.table( - [pa.array([[1], [5], [2], [7], [3], [9]])] * 3, [""] * 3 - ) + if is_nested_list(dtype): + expected = pa.table( + [ + pa.array( + [[[1]], [[5, 6]], [[2, 3]], [[8]], [[3]], [[10]]] + ) + ] + * 3, + [""] * 3, + ) + else: + expected = pa.table( + [pa.array([[1], [5, 6], [2, 3], [8], [3], [10]])] * 3, + [""] * 3, + ) elif pa.types.is_struct(dtype): - expected = pa.table( - [ - pa.array( - [ - {"v": 1}, - {"v": 5}, - {"v": 2}, - {"v": 7}, - {"v": 3}, - {"v": 9}, - ], - type=DEFAULT_STRUCT_TESTING_TYPE, - ) - ] - * 3, - [""] * 3, - ) + if is_nested_struct(dtype): + expected = pa.table( + [ + pa.array( + [ + {"a": 1, "b_struct": {"b": 1.0}}, + {"a": 5, "b_struct": {"b": 5.0}}, + {"a": 2, "b_struct": {"b": 2.0}}, + {"a": 7, "b_struct": {"b": 7.0}}, + {"a": 3, "b_struct": {"b": 3.0}}, + {"a": 9, "b_struct": {"b": 9.0}}, + ], + type=NESTED_STRUCT_TESTING_TYPE, + ) + ] + * 3, + [""] * 3, + ) + else: + expected = pa.table( + [ + pa.array( + [ + {"v": 1}, + {"v": 5}, + {"v": 2}, + {"v": 7}, + {"v": 3}, + {"v": 9}, + ], + type=DEFAULT_STRUCT_TESTING_TYPE, + ) + ] + * 3, + [""] * 3, + ) else: expected = _pyarrow_boolean_mask_scatter_table( pa_source_table, pa_mask, pa_target_table @@ -887,7 +1005,7 @@ def test_get_element(input_column): assert ( plc.interop.to_arrow( - result, metadata_from_arrow_array(pa_input_column) + result, metadata_from_arrow_type(pa_input_column.type) ).as_py() == pa_input_column[index].as_py() )