Skip to content

Commit

Permalink
feat: support compressed csv reading for pandas and arrow
Browse files Browse the repository at this point in the history
Only lazy csv reading is not supported.
  • Loading branch information
maartenbreddels committed Sep 23, 2022
1 parent c5ced62 commit c267f64
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 1 deletion.
15 changes: 14 additions & 1 deletion packages/vaex-core/vaex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,20 @@ def from_csv(filename_or_buffer, copy_index=False, chunk_size=None, convert=Fals
def _read_csv_read(filename_or_buffer, copy_index, chunk_size, fs_options={}, fs=None, **kwargs):
import pandas as pd
if not chunk_size:
with vaex.file.open(filename_or_buffer, fs_options=fs_options, fs=fs, for_arrow=True) as f:
with vaex.file.open(filename_or_buffer, fs_options=fs_options, fs=fs, for_arrow=False) as f:
if "compression" not in kwargs:
try:
path = vaex.file.stringyfy(filename_or_buffer)
except:
path = None
if path:
parts = path.rsplit('.', 3)
if len(parts) == 3:
# we need to do infer here, because pandas does not look at the fileobj.name
# to infer the compression
extension_to_compression = {"gz": "gzip", "bz2": "bz2", "zip": "zip", "xz": "xz"}
if parts[-1] in extension_to_compression:
kwargs = {"compression": extension_to_compression[parts[-1]], **kwargs}
full_df = pd.read_csv(f, **kwargs)
return from_pandas(full_df, copy_index=copy_index)
else:
Expand Down
12 changes: 12 additions & 0 deletions packages/vaex-core/vaex/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ class DatasetCsvLazy(DatasetFile):
snake_name = "arrow-csv-lazy"
def __init__(self, path, chunk_size=10*MB, newline_readahead=1*MB, row_count=None, read_options=None, parse_options=None, convert_options=None, fs=None, fs_options={}):
super().__init__(path, fs=fs, fs_options=fs_options)
try:
codec = pa.Codec.detect(self.path)
except Exception:
codec = None
if codec:
raise NotImplementedError("We don't support compressed csv files for lazy reading, cannot read file: %s" % self.path)
self._given_row_count = row_count
self._row_count = None
self.chunk_size = parse_bytes(chunk_size)
Expand Down Expand Up @@ -338,6 +344,12 @@ def _read_file(self):
import pyarrow.csv

with vaex.file.open(self.path, fs=self.fs, fs_options=self.fs_options, for_arrow=True) as f:
try:
codec = pa.Codec.detect(self.path)
except Exception:
codec = None
if codec:
f = pa.CompressedInputStream(f, codec.name)
self._arrow_table = pyarrow.csv.read_csv(f, read_options=self.read_options, parse_options=self.parse_options, convert_options=self.convert_options)
self._columns = dict(zip(self._arrow_table.schema.names, self._arrow_table.columns))
self._set_row_count()
Expand Down
14 changes: 14 additions & 0 deletions tests/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,17 @@ def test_chunk_iterator(l1, l2):
ds_dropped = ds.dropped('x')
assert 'x' not in ds_dropped


def test_gz():
import vaex

df = vaex.from_csv_arrow(HERE / "data" / "small2.csv.gz")
assert df.x.tolist() == [1, 3]

df = vaex.from_csv(HERE / "data" / "small2.csv.gz")
assert df.x.tolist() == [1, 3]


with pytest.raises(NotImplementedError):
df = vaex.from_csv_arrow(HERE / "data" / "small2.csv.gz", lazy=True)
assert df.x.tolist() == [1, 3]
Binary file added tests/data/small2.csv.gz
Binary file not shown.

0 comments on commit c267f64

Please sign in to comment.