Skip to content

Commit

Permalink
Start migrating I/O to pylibcudf (#15899)
Browse files Browse the repository at this point in the history
xref #15162 

Starts migrating cudf I/O cython to use pylibcudf APIs, starting with avro.

Authors:
  - Thomas Li (https://github.com/lithomas1)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #15899
  • Loading branch information
lithomas1 authored Jun 6, 2024
1 parent 7fd6918 commit 3b734ec
Show file tree
Hide file tree
Showing 21 changed files with 541 additions and 72 deletions.
1 change: 1 addition & 0 deletions docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This page provides API documentation for pylibcudf.
filling
gpumemoryview
groupby
io/index.rst
join
lists
merge
Expand Down
6 changes: 6 additions & 0 deletions docs/cudf/source/user_guide/api_docs/pylibcudf/io/avro.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
====
Avro
====

.. automodule:: cudf._lib.pylibcudf.io.avro
:members:
18 changes: 18 additions & 0 deletions docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
===
I/O
===

I/O Utility Classes
===================

.. automodule:: cudf._lib.pylibcudf.io.types
:members:


I/O Functions
=============

.. toctree::
:maxdepth: 1

avro
50 changes: 14 additions & 36 deletions python/cudf/cudf/_lib/avro.pyx
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector
from cudf._lib.utils cimport data_from_pylibcudf_io

from cudf._lib.io.utils cimport make_source_info
from cudf._lib.pylibcudf.libcudf.io.avro cimport (
avro_reader_options,
read_avro as libcudf_read_avro,
)
from cudf._lib.pylibcudf.libcudf.io.types cimport table_with_metadata
from cudf._lib.pylibcudf.libcudf.types cimport size_type
from cudf._lib.utils cimport data_from_unique_ptr
import cudf._lib.pylibcudf as plc
from cudf._lib.pylibcudf.io.types import SourceInfo


cpdef read_avro(datasource, columns=None, skip_rows=-1, num_rows=-1):
cpdef read_avro(datasource, columns=None, skip_rows=0, num_rows=-1):
"""
Cython function to call libcudf read_avro, see `read_avro`.
Expand All @@ -28,28 +20,14 @@ cpdef read_avro(datasource, columns=None, skip_rows=-1, num_rows=-1):

if not isinstance(num_rows, int) or num_rows < -1:
raise TypeError("num_rows must be an int >= -1")
if not isinstance(skip_rows, int) or skip_rows < -1:
raise TypeError("skip_rows must be an int >= -1")

cdef vector[string] c_columns
if columns is not None and len(columns) > 0:
c_columns.reserve(len(columns))
for col in columns:
c_columns.push_back(str(col).encode())

cdef avro_reader_options options = move(
avro_reader_options.builder(make_source_info([datasource]))
.columns(c_columns)
.skip_rows(<size_type> skip_rows)
.num_rows(<size_type> num_rows)
.build()
if not isinstance(skip_rows, int) or skip_rows < 0:
raise TypeError("skip_rows must be an int >= 0")

return data_from_pylibcudf_io(
plc.io.avro.read_avro(
SourceInfo([datasource]),
columns,
skip_rows,
num_rows
)
)

cdef table_with_metadata c_result

with nogil:
c_result = move(libcudf_read_avro(options))

names = [info.name.decode() for info in c_result.metadata.schema_info]

return data_from_unique_ptr(move(c_result.tbl), column_names=names)
8 changes: 4 additions & 4 deletions python/cudf/cudf/_lib/csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ cdef csv_reader_options make_csv_reader_options(
)

if quoting == 1:
c_quoting = quote_style.QUOTE_ALL
c_quoting = quote_style.ALL
elif quoting == 2:
c_quoting = quote_style.QUOTE_NONNUMERIC
c_quoting = quote_style.NONNUMERIC
elif quoting == 3:
c_quoting = quote_style.QUOTE_NONE
c_quoting = quote_style.NONE
else:
# Default value
c_quoting = quote_style.QUOTE_MINIMAL
c_quoting = quote_style.MINIMAL

cdef csv_reader_options csv_reader_options_c = move(
csv_reader_options.builder(c_source_info)
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ def write_parquet(
"Valid values are '1.0' and '2.0'"
)

dict_policy = (
cdef cudf_io_types.dictionary_policy dict_policy = (
cudf_io_types.dictionary_policy.ADAPTIVE
if use_dictionary
else cudf_io_types.dictionary_policy.NEVER
Expand Down
1 change: 1 addition & 0 deletions python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ link_to_pyarrow_headers(pylibcudf_interop)

add_subdirectory(libcudf)
add_subdirectory(strings)
add_subdirectory(io)
25 changes: 25 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# =============================================================================
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
# =============================================================================

set(cython_sources avro.pyx types.pyx)

set(linked_libraries cudf::cudf)
rapids_cython_create_modules(
CXX
SOURCE_FILES "${cython_sources}"
LINKED_LIBRARIES "${linked_libraries}" MODULE_PREFIX pylibcudf_io_ ASSOCIATED_TARGETS cudf
)

set(targets_using_arrow_headers pylibcudf_io_avro pylibcudf_io_types)
link_to_pyarrow_headers("${targets_using_arrow_headers}")
4 changes: 4 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from . cimport avro, types
from .types cimport SourceInfo, TableWithMetadata
4 changes: 4 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/io/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from . import avro, types
from .types import SourceInfo, TableWithMetadata
12 changes: 12 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/io/avro.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
from cudf._lib.pylibcudf.io.types cimport SourceInfo, TableWithMetadata
from cudf._lib.pylibcudf.libcudf.io.avro cimport avro_reader_options
from cudf._lib.pylibcudf.libcudf.types cimport size_type


cpdef TableWithMetadata read_avro(
SourceInfo source_info,
list columns = *,
size_type skip_rows = *,
size_type num_rows = *
)
58 changes: 58 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/io/avro.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

from cudf._lib.pylibcudf.io.types cimport SourceInfo, TableWithMetadata
from cudf._lib.pylibcudf.libcudf.io.avro cimport (
avro_reader_options,
read_avro as cpp_read_avro,
)
from cudf._lib.pylibcudf.libcudf.types cimport size_type


cpdef TableWithMetadata read_avro(
SourceInfo source_info,
list columns = None,
size_type skip_rows = 0,
size_type num_rows = -1
):
"""
Reads an Avro dataset into a set of columns.
Parameters
----------
source_info: SourceInfo
The SourceInfo object to read the avro dataset from.
columns: list, default None
Optional columns to read, if not provided, reads all columns in the file.
skip_rows: size_type, default 0
The number of rows to skip.
num_rows: size_type, default -1
The number of rows to read, after skipping rows.
If -1 is passed, all rows will be read.
Returns
-------
TableWithMetadata
The Table and its corresponding metadata that was read in.
"""
cdef vector[string] c_columns
if columns is not None and len(columns) > 0:
c_columns.reserve(len(columns))
for col in columns:
c_columns.push_back(str(col).encode())

cdef avro_reader_options avro_opts = move(
avro_reader_options.builder(source_info.c_obj)
.columns(c_columns)
.skip_rows(skip_rows)
.num_rows(num_rows)
.build()
)

with nogil:
c_result = move(cpp_read_avro(avro_opts))

return TableWithMetadata.from_libcudf(c_result)
29 changes: 29 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/io/types.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
from cudf._lib.pylibcudf.libcudf.io.types cimport (
column_encoding,
column_in_metadata,
column_name_info,
compression_type,
dictionary_policy,
io_type,
partition_info,
quote_style,
sink_info,
source_info,
statistics_freq,
table_input_metadata,
table_metadata,
table_with_metadata,
)
from cudf._lib.pylibcudf.table cimport Table


cdef class TableWithMetadata:
cdef public Table tbl
cdef table_metadata metadata

@staticmethod
cdef TableWithMetadata from_libcudf(table_with_metadata& tbl)

cdef class SourceInfo:
cdef source_info c_obj
110 changes: 110 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/io/types.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

from cudf._lib.pylibcudf.libcudf.io.types cimport (
host_buffer,
source_info,
table_with_metadata,
)

import errno
import io
import os


cdef class TableWithMetadata:
"""A container holding a table and its associated metadata
(e.g. column names)
For details, see :cpp:class:`cudf::io::table_with_metadata`.
"""

@property
def columns(self):
"""
Return a list containing the columns of the table
"""
return self.tbl.columns()

@property
def column_names(self):
"""
Return a list containing the column names of the table
"""
cdef list names = []
for col_info in self.metadata.schema_info:
# TODO: Handle nesting (columns with child columns)
assert col_info.children.size() == 0, "Child column names are not handled!"
names.append(col_info.name.decode())
return names

@staticmethod
cdef TableWithMetadata from_libcudf(table_with_metadata& tbl_with_meta):
"""Create a Python TableWithMetadata from a libcudf table_with_metadata"""
cdef TableWithMetadata out = TableWithMetadata.__new__(TableWithMetadata)
out.tbl = Table.from_libcudf(move(tbl_with_meta.tbl))
out.metadata = tbl_with_meta.metadata
return out

cdef class SourceInfo:
"""A class containing details on a source to read from.
For details, see :cpp:class:`cudf::io::source_info`.
Parameters
----------
sources : List[Union[str, os.PathLike, bytes, io.BytesIO]]
A homogeneous list of sources (this can be a string filename,
an os.PathLike, bytes, or an io.BytesIO) to read from.
Mixing different types of sources will raise a `ValueError`.
"""

def __init__(self, list sources):
if not sources:
raise ValueError("Need to pass at least one source")

cdef vector[string] c_files

if isinstance(sources[0], (os.PathLike, str)):
c_files.reserve(len(sources))

for src in sources:
if not isinstance(src, (os.PathLike, str)):
raise ValueError("All sources must be of the same type!")
if not os.path.isfile(src):
raise FileNotFoundError(errno.ENOENT,
os.strerror(errno.ENOENT),
src)

c_files.push_back(<string> str(src).encode())

self.c_obj = move(source_info(c_files))
return

# TODO: host_buffer is deprecated API, use host_span instead
cdef vector[host_buffer] c_host_buffers
cdef const unsigned char[::1] c_buffer
cdef bint empty_buffer = False
if isinstance(sources[0], bytes):
empty_buffer = True
for buffer in sources:
if not isinstance(buffer, bytes):
raise ValueError("All sources must be of the same type!")
if (len(buffer) > 0):
c_buffer = buffer
c_host_buffers.push_back(host_buffer(<char*>&c_buffer[0],
c_buffer.shape[0]))
empty_buffer = False
elif isinstance(sources[0], io.BytesIO):
for bio in sources:
if not isinstance(bio, io.BytesIO):
raise ValueError("All sources must be of the same type!")
c_buffer = bio.getbuffer() # check if empty?
c_host_buffers.push_back(host_buffer(<char*>&c_buffer[0],
c_buffer.shape[0]))

self.c_obj = source_info(c_host_buffers)
6 changes: 4 additions & 2 deletions python/cudf/cudf/_lib/pylibcudf/libcudf/io/orc.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ cdef extern from "cudf/io/orc.hpp" \
orc_writer_options_builder& compression(
cudf_io_types.compression_type comp
) except +
orc_writer_options_builder& enable_statistics(bool val) except +
orc_writer_options_builder& enable_statistics(
cudf_io_types.statistics_freq val
) except +
orc_writer_options_builder& stripe_size_bytes(size_t val) except +
orc_writer_options_builder& stripe_size_rows(size_type val) except +
orc_writer_options_builder& row_index_stride(size_type val) except +
Expand Down Expand Up @@ -147,7 +149,7 @@ cdef extern from "cudf/io/orc.hpp" \
cudf_io_types.compression_type comp
) except +
chunked_orc_writer_options_builder& enable_statistics(
bool val
cudf_io_types.statistics_freq val
) except +
orc_writer_options_builder& stripe_size_bytes(size_t val) except +
orc_writer_options_builder& stripe_size_rows(size_type val) except +
Expand Down
Loading

0 comments on commit 3b734ec

Please sign in to comment.