Skip to content

Commit

Permalink
Implement read_csv in cudf-polars using pylibcudf (#16307)
Browse files Browse the repository at this point in the history
Replace cudf-classic with pylibcudf for CSV reading in cudf-polars

Authors:
  - Thomas Li (https://github.com/lithomas1)
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #16307
  • Loading branch information
lithomas1 authored Jul 19, 2024
1 parent 5dde41d commit e169e8e
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 24 deletions.
50 changes: 26 additions & 24 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,6 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
with_columns = options.with_columns
row_index = options.row_index
if self.typ == "csv":
dtype_map = {
name: cudf._lib.types.PYLIBCUDF_TO_SUPPORTED_NUMPY_TYPES[typ.id()]
for name, typ in self.schema.items()
}
parse_options = self.reader_options["parse_options"]
sep = chr(parse_options["separator"])
quote = chr(parse_options["quote_char"])
Expand Down Expand Up @@ -280,31 +276,37 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
pieces = []
for p in self.paths:
skiprows = self.reader_options["skip_rows"]
# TODO: read_csv expands globs which we should not do,
# because polars will already have handled them.
path = Path(p)
with path.open() as f:
while f.readline() == "\n":
skiprows += 1
pieces.append(
cudf.read_csv(
path,
sep=sep,
quotechar=quote,
lineterminator=eol,
names=column_names,
header=header,
usecols=usecols,
na_filter=True,
na_values=null_values,
keep_default_na=False,
skiprows=skiprows,
comment=comment,
decimal=decimal,
dtype=dtype_map,
)
tbl_w_meta = plc.io.csv.read_csv(
plc.io.SourceInfo([path]),
delimiter=sep,
quotechar=quote,
lineterminator=eol,
col_names=column_names,
header=header,
usecols=usecols,
na_filter=True,
na_values=null_values,
keep_default_na=False,
skiprows=skiprows,
comment=comment,
decimal=decimal,
dtypes=self.schema,
)
pieces.append(tbl_w_meta)
tables, colnames = zip(
*(
(piece.tbl, piece.column_names(include_children=False))
for piece in pieces
)
df = DataFrame.from_cudf(cudf.concat(pieces))
)
df = DataFrame.from_table(
plc.concatenate.concatenate(list(tables)),
colnames[0],
)
elif self.typ == "parquet":
cdf = cudf.read_parquet(self.paths, columns=with_columns)
assert isinstance(cdf, cudf.DataFrame)
Expand Down
38 changes: 38 additions & 0 deletions python/cudf_polars/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import os

import pytest

import polars as pl
Expand Down Expand Up @@ -129,6 +131,42 @@ def test_scan_csv_column_renames_projection_schema(tmp_path):
assert_gpu_result_equal(q)


@pytest.mark.parametrize(
"filename,glob",
[
(["test1.csv", "test2.csv"], True),
("test*.csv", True),
# Make sure we don't expand glob when
# trying to read a file like test*.csv
# when glob=False
("test*.csv", False),
],
)
def test_scan_csv_multi(tmp_path, filename, glob):
with (tmp_path / "test1.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2\n3,4,5""")
with (tmp_path / "test2.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2\n3,4,5""")
with (tmp_path / "test*.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2\n3,4,5""")
os.chdir(tmp_path)
q = pl.scan_csv(filename, glob=glob)

assert_gpu_result_equal(q)


def test_scan_csv_multi_differing_colnames(tmp_path):
with (tmp_path / "test1.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2\n3,4,5""")
with (tmp_path / "test2.csv").open("w") as f:
f.write("""abc,def,ghi\n1,2\n3,4,5""")
q = pl.scan_csv(
[tmp_path / "test1.csv", tmp_path / "test2.csv"],
)
with pytest.raises(pl.exceptions.ComputeError):
q.explain()


def test_scan_csv_skip_after_header_not_implemented(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2,3\n3,4,5""")
Expand Down

0 comments on commit e169e8e

Please sign in to comment.