Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] optimize read, pushdown limit to file level for to_arrow #1038

Closed
kevinjqliu opened this issue Aug 11, 2024 · 3 comments
Closed

[feat] optimize read, pushdown limit to file level for to_arrow #1038

kevinjqliu opened this issue Aug 11, 2024 · 3 comments
Assignees

Comments

@kevinjqliu
Copy link
Contributor

Feature Request / Improvement

As of now, limit is checked only after an entire parquet file is read.

executor = ExecutorFactory.get_or_create()
futures = [
executor.submit(
_task_to_table,
fs,
task,
bound_row_filter,
projected_schema,
projected_field_ids,
deletes_per_file.get(task.file.file_path),
case_sensitive,
table_metadata.name_mapping(),
use_large_types,
)
for task in tasks
]
total_row_count = 0
# for consistent ordering, we need to maintain future order
futures_index = {f: i for i, f in enumerate(futures)}
completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
for future in concurrent.futures.as_completed(futures):
completed_futures.add(future)
if table_result := future.result():
total_row_count += len(table_result)
# stop early if limit is satisfied
if limit is not None and total_row_count >= limit:
break
# by now, we've either completed all tasks or satisfied the limit
if limit is not None:
_ = [f.cancel() for f in futures if not f.done()]

Optimization to pushdown limit to the parquet reading level

For more details, see this comment

@soumya-ghosh
Copy link
Contributor

@kevinjqliu I would like to work on this one.

@kevinjqliu
Copy link
Contributor Author

sure @soumya-ghosh, assigned to you

The solution might look similar to what is already done for project_batches in #1042

for task in tasks:
# stop early if limit is satisfied
if limit is not None and total_row_count >= limit:
break
batches = _task_to_record_batches(
fs,
task,
bound_row_filter,
projected_schema,
projected_field_ids,
deletes_per_file.get(task.file.file_path),
case_sensitive,
table_metadata.name_mapping(),
use_large_types,
)
for batch in batches:
if limit is not None:
if total_row_count >= limit:
break
elif total_row_count + len(batch) >= limit:
batch = batch.slice(0, limit - total_row_count)
yield batch
total_row_count += len(batch)

@kevinjqliu
Copy link
Contributor Author

Closed by #1043 (see comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants