Skip to content

Commit

Permalink
feat: Out of core CSV support using Apache Arrow CSV reader (fast 🔥!)
Browse files Browse the repository at this point in the history
  • Loading branch information
maartenbreddels committed Nov 18, 2021
1 parent 98e570d commit 47b7e48
Show file tree
Hide file tree
Showing 6 changed files with 446 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Opening/reading in your data.
vaex.from_arrays
vaex.from_dict
vaex.from_csv
vaex.from_csv_arrow
vaex.from_ascii
vaex.from_pandas
vaex.from_astropy_table
Expand Down
31 changes: 31 additions & 0 deletions packages/vaex-core/src/superutils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,35 @@ std::size_t hash_func(T v) {
return h(v);
}

int64_t find_byte(py::buffer buffer, unsigned char needle) {
py::buffer_info info = buffer.request();
if (info.ndim != 1) {
throw std::runtime_error("Expected a 1d byte buffer");
}
// if(info.format != "O") {
// throw std::runtime_error("Expected an object array");
// }
py::gil_scoped_release release;
unsigned char* begin = (unsigned char*)info.ptr;
unsigned char* end = begin + info.shape[0];
unsigned char* i = std::find(begin, end, needle);
return i == end ? -1 : i - begin;
}

int64_t count_byte(py::buffer buffer, unsigned char needle) {
py::buffer_info info = buffer.request();
if (info.ndim != 1) {
throw std::runtime_error("Expected a 1d byte buffer");
}
// if(info.format != "O") {
// throw std::runtime_error("Expected an object array");
// }
py::gil_scoped_release release;
unsigned char* begin = (unsigned char*)info.ptr;
unsigned char* end = begin + info.shape[0];
return std::count(begin, end, needle);
}

PYBIND11_MODULE(superutils, m) {
_import_array();

Expand Down Expand Up @@ -195,4 +224,6 @@ PYBIND11_MODULE(superutils, m) {
vaex::init_hash_primitives_prime(m);
vaex::init_hash_string(m);
vaex::init_hash_object(m);
m.def("find_byte", find_byte);
m.def("count_byte", count_byte);
}
7 changes: 7 additions & 0 deletions packages/vaex-core/vaex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,13 @@ def from_json(path_or_buffer, orient=None, precise_float=False, lines=False, cop
copy_index=copy_index)


def from_csv_arrow(file, chunk_size="10MiB", newline_readahead="64kiB", read_options=None, parse_options=None, convert_options=None):
'''Experimental Lazy CSV reader using Apache Arrow'''
import vaex.csv
ds = vaex.csv.DatasetCsv(file, chunk_size=chunk_size, newline_readahead=newline_readahead, read_options=read_options, parse_options=parse_options, convert_options=convert_options)
return vaex.from_dataset(ds)


def from_csv(filename_or_buffer, copy_index=False, chunk_size=None, convert=False, fs_options={}, fs=None, **kwargs):
"""
Read a CSV file as a DataFrame, and optionally convert to an hdf5 file.
Expand Down
277 changes: 277 additions & 0 deletions packages/vaex-core/vaex/csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
import mmap
import numpy as np
import io

import pyarrow as pa
import pyarrow.csv
from dask.utils import parse_bytes

import vaex.dataset
from vaex.dataset import Dataset
from .itertools import pmap, pwait, buffer, consume, filter_none
from .multithreading import thread_count_default_io, get_main_io_pool


MB = 1024**2


def file_chunks(file, chunk_size, newline_readahead):
"""Bytes chunks, split by chunk_size bytes, on newline boundaries"""
offset = 0
with open(file, 'rb') as file:
file.seek(0, 2)
file_size = file.tell()
file.seek(0)
begin_offset = 0

done = False
while not done:
# find the next newline boundary
end_offset = min(file_size, begin_offset + chunk_size)
done = end_offset == file_size
if end_offset < file_size:
file.seek(end_offset)
sample = file.read(newline_readahead)
offset = vaex.superutils.find_byte(sample, ord(b'\n'))
if offset != -1:
end_offset += offset + 1 # include the newline
else:
raise ValueError('expected newline')

def reader(file_offset=begin_offset, length=end_offset - begin_offset):
file_threadsafe = vaex.file.dup(file)
file_threadsafe.seek(file_offset)
return file_threadsafe.read(length)
yield reader
begin_offset = end_offset


def file_chunks_mmap(file, chunk_size, newline_readahead):
"""Bytes chunks, split by chunk_size bytes, on newline boundaries
Using memory mapping (which avoids a memcpy)
"""
offset = 0
with open(file, 'rb') as file:
file.seek(0, 2)
file_size = file.tell()
file.seek(0)
begin_offset = 0
kwargs = {}
if vaex.utils.osname == "windows":
kwargs["access"] = mmap.ACCESS_READ
else:
kwargs["prot"] = mmap.PROT_READ

file_map = mmap.mmap(file.fileno(), file_size, **kwargs)
data = memoryview(file_map)

done = False
while not done:
# find the next newline boundary
end_offset = min(file_size, begin_offset + chunk_size)
if end_offset < file_size:
sample = data[end_offset:end_offset+newline_readahead]
offset = vaex.superutils.find_byte(sample, ord(b'\n'))
if offset != -1:
end_offset += offset + 1 # include the newline
else:
raise ValueError('expected newline')
done = end_offset == file_size

length = end_offset - begin_offset
assert length > 0
def reader(file_offset=begin_offset, length=length):
return data[file_offset:file_offset+length]
yield reader
begin_offset = end_offset


def _row_count(chunk):
ar = np.frombuffer(chunk, dtype=np.uint8)
lines = vaex.superutils.count_byte(ar, ord(b'\n'))
if ar[-1] != ord(b'\n'):
lines += 1
return lines


def _copy_or_create(cls, obj, **kwargs):
if obj is not None:
# take default from obj, buy kwargs have precendence
kwargs = kwargs.copy()
for name in dir(obj):
if not name.startswith('__'):
if name not in kwargs:
kwargs[name] = getattr(obj, name)
return cls(**kwargs)


class DatasetCsv(Dataset):
def __init__(self, path, chunk_size=10*MB, newline_readahead=1*MB, row_count=None, read_options=None, parse_options=None, convert_options=None):
self.path = path
self._given_row_count = row_count
self.chunk_size = parse_bytes(chunk_size)
self.newline_readahead = parse_bytes(newline_readahead)
self.read_options = read_options
self.parse_options = parse_options
self.convert_options = convert_options
reader = pyarrow.csv.open_csv(path, read_options=read_options, parse_options=parse_options, convert_options=convert_options)
self._arrow_schema = reader.read_next_batch().schema
self._schema = dict(zip(self._arrow_schema.names, self._arrow_schema.types))
self._create_columns()

def _create_columns(self):
self._columns = {name: vaex.dataset.ColumnProxy(self, name, type) for name, type in
zip(self._arrow_schema.names, self._arrow_schema.types)}
pool = get_main_io_pool()
workers = pool._max_workers
self._fragment_info = {}
if self._given_row_count is None:
chunks = file_chunks_mmap(self.path, self.chunk_size, self.newline_readahead)
def process(i, chunk_reader):
row_count = _row_count(chunk_reader())
if i == 0:
row_count -= 1 # we counted the header (TODO: depends on ReadOptions)
self._fragment_info[i] = dict(row_count=row_count)
consume(pwait(buffer(pmap(process, enumerate(chunks), pool=pool), workers+3)))
row_start = 0
for i in range(len(self._fragment_info)):
row_end = row_start + self._fragment_info[i]['row_count']
self._fragment_info[i]['row_start'] = row_start
self._fragment_info[i]['row_end'] = row_end
row_start = row_end
self._row_count = row_start
pool.map(process, enumerate(chunks))
else:
self._row_count = _given_row_count
self._ids = {}

def hashed(self):
raise NotImplementedError

def slice(self, start, end):
# TODO: we can be smarter here, and trim off some fragments
if start == 0 and end == self.row_count:
return self
return vaex.dataset.DatasetSliced(self, start=start, end=end)

def is_masked(self, column):
return False

def shape(self, column):
return tuple()

def __getitem__(self, item):
if isinstance(item, slice):
assert item.step in [1, None]
return vaex.dataset.DatasetSliced(self, item.start or 0, item.stop or self.row_count)
return self._columns[item]

def close(self):
# no need to close it, it seem
pass

def _chunk_producer(self, columns, chunk_size=None, reverse=False, start=0, end=None):
pool = get_main_io_pool()

first = True
previous = None
for i, reader in enumerate(file_chunks_mmap(self.path, self.chunk_size, self.newline_readahead)):
fragment_info = self._fragment_info.get(i)
# bail out/continue early
if fragment_info:
if start >= fragment_info['row_end']: # we didn't find the beginning yet
continue
# TODO, not triggered, should be <=
if end < fragment_info['row_start']: # we are past the end
# assert False
break

def chunk_reader(reader=reader, first=first, previous=previous, fragment_info=fragment_info, i=i):
bytes = reader()
file_like = pa.input_stream(bytes)
use_threads = True
block_size = len(bytes)
if i == 0:
read_options = _copy_or_create(pyarrow.csv.ReadOptions, self.read_options, use_threads=use_threads, block_size=block_size)
else:
read_options = pyarrow.csv.ReadOptions(use_threads=use_threads, column_names=list(self._schema.keys()), block_size=block_size)
convert_options = _copy_or_create(pyarrow.csv.ConvertOptions, self.convert_options, column_types=self._arrow_schema, include_columns=columns)
table = pyarrow.csv.read_csv(file_like, read_options=read_options, convert_options=convert_options)


row_count = len(table)
row_start = 0
if i not in self._fragment_info:
if previous:
row_start, row_end, chunks = previous.result()
row_start = row_end
row_end = row_start + len(table)
self._fragment_info[i] = dict(
# begin_offset=begin_offset,
# end_offset=end_offset,
row_count=row_count,
row_start=row_start,
row_end=row_end,
)
else:
row_start = self._fragment_info[i]['row_start']
row_end = self._fragment_info[i]['row_end']

# this is the bail out when we didn't have the fragments info cached yet
fragment_info = self._fragment_info[i]
if start >= fragment_info['row_end']: # we didn't find the beginning yet
return None
if end <= fragment_info['row_start']: # we are past the end
return None
# print(start, end, fragment_info, row_start, row_end)


if start > row_start:
# this means we have to cut off a piece of the beginning
if end < row_end:
# AND the end
length = end - row_start # without the start cut off
length -= start - row_start # correcting for the start cut off
# print(start, end, length, row_start, row_end)
table = table.slice(start - row_start, length)
else:
table = table.slice(start - row_start)
else:
if end < row_end:
# we only need to cut off a piece of the end
length = end - row_start
table = table.slice(0, length)

# table = table.combine_chunks()
assert len(table)
chunks = dict(zip(table.column_names, table.columns))
return row_start, row_end, chunks


previous = pool.submit(chunk_reader)
yield previous
first = False

def chunk_iterator(self, columns, chunk_size=None, reverse=False, start=0, end=None):
chunk_size = chunk_size or 1024*1024
i1 = 0
chunks_ready_list = []
i1 = i2 = 0

# althoug arrow uses threading, we still manage to get some extra performance out of it with more threads
chunk_generator = self._chunk_producer(columns, chunk_size, start=start, end=end or self._row_count)
for c1, c2, chunks in filter_none(pwait(buffer(chunk_generator, thread_count_default_io//2+3))):
chunks_ready_list.append(chunks)
total_row_count = sum([len(list(k.values())[0]) for k in chunks_ready_list])
if total_row_count > chunk_size:
chunks_current_list, current_row_count = vaex.dataset._slice_of_chunks(chunks_ready_list, chunk_size)
i2 += current_row_count
yield i1, i2, vaex.dataset._concat_chunk_list(chunks_current_list)
i1 = i2

while chunks_ready_list:
chunks_current_list, current_row_count = vaex.dataset._slice_of_chunks(chunks_ready_list, chunk_size)
i2 += current_row_count
yield i1, i2, vaex.dataset._concat_chunk_list(chunks_current_list)
i1 = i2
6 changes: 6 additions & 0 deletions packages/vaex-core/vaex/itertools.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ def consume(i):
pass


def filter_none(i):
for item in i:
if item is not None:
yield item


def chunked(i, count):
'''Yield list 'subslices' of iterator i with max length count.
Expand Down
Loading

0 comments on commit 47b7e48

Please sign in to comment.