Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peformance question for to_arrow, to_pandas, to_duckdb #1032

Closed
jkleinkauff opened this issue Aug 9, 2024 · 12 comments
Closed

Peformance question for to_arrow, to_pandas, to_duckdb #1032

jkleinkauff opened this issue Aug 9, 2024 · 12 comments

Comments

@jkleinkauff
Copy link

jkleinkauff commented Aug 9, 2024

Question

Hey, thanks for this very convenient library.

This is not a bug, just want to better understand something.

I have a question regarding the performance - ie time to query the table (?) - for such methods.

if __name__ == "__main__":
    catalog = SqlCatalog(
        "default",
        **{
            "uri": f"postgresql+psycopg2://postgres:Password1@localhost/postgres",
        },
    )
    table = catalog.load_table("bronze.curitiba_starts_june")
    df = table.scan(limit=100)
    pa_table = df.to_arrow()

The code above will run ok. My question is regarding the last command, to_arrow() transformation takes around 60s (+-) to execute. I believe this is mostly because of the network itself?
The execution time will stay roughly the same with different row limit (1, 10, 100).

Querying the same table in motherduck - using iceberg_scan - is faster:
image

When running the same query locally - without motherduck but still using duckdb - the execution time will be similar to what pyiceberg takes, actually it will be a bit slower. That's why I think this is mostly like a network "issue". Can you help me understand what's happening? Thank you!

Table Data

The table has two parquet files (110mb, 127mb)

@sungwy
Copy link
Collaborator

sungwy commented Aug 9, 2024

Hi @jkleinkauff , that's indeed an interesting observation.

I have some follow up questions to help us understand it better.

  1. Where are your files stored?
  2. Is there a way we can profile your IO and plot it against your IO Download limit?

As a way of comparison, I just ran a scan using to_arrow against a table that has 63, 5.5Mb Parquet files comprising the table. I'd imagine a table with less files to take less time to return (although the limit function here should ensure that we aren't even reading the other parquet files past the first one)

It returned in 6 seconds.

Your observation that limit 1~100 took similar times makes sense to me as well. If you have 100+ Mb files, you are going to have to download the same amount of data regardless to return the limited result.

@kevinjqliu
Copy link
Contributor

There's a nontrivial cost in reading metadata files in Iceberg.
Can you run this command,

table.inspect.manifests().to_pandas()

This will show the number of manifest files https://py.iceberg.apache.org/api/#manifests

@jkleinkauff
Copy link
Author

jkleinkauff commented Aug 9, 2024

Hey, thank you for taking a time to answer me!

  1. My files are in S3.
  2. Sure! It's something I could do on my end? Do you have any recommendation on that?
    (I'm not sure if it's the same, running a download profiler (I've written something using psutil) in one file it takes something between 25s to complete it)

I mean probably it's just my network?

Yeah, even with limit=1 it seems scan is returning both files (just an observation, maybe it's intended):

    df = table.scan(limit=1)
    # pa_table = df.to_arrow()
    [print(task.file.file_path) for task in df.plan_files()]
# s3://xxx/xxx/curitiba_starts_june/data/00000-0-6984da88-fe64-4765-9137-739072becfb1.parquet
# s3://xxx/xxx/curitiba_starts_june/data/00000-0-1de29b8f-2e8c-4543-9663-f769d53b17b7.parquet 

Output of table.inspect.manifests().to_pandas()

   content                                               path  length  partition_spec_id  ...  added_delete_files_count  existing_delete_files_count  deleted_delete_files_count  partition_summaries
0        0  s3://xxx/bronze/curitiba_starts_june...   10433                  0  ...                         0                            0                           0                   []
1        0  s3://xxx/bronze/curitiba_starts_june...   10430                  0  ...                         0                            0                           0                   []

[2 rows x 12 columns]

    table = catalog.load_table("bronze.curitiba_starts_june")
    df = table.scan(limit=100)
    # print(table.inspect.manifests().to_pandas())
    pa_table = df.to_arrow()
❯ python pyiceberg_duckdb.py
~/development/duckdb_playground 48s  

I can also share the files or a direct link to my files. Thank you!

@kevinjqliu
Copy link
Contributor

okay, this doesn't look like an issue with reading many metadata files.

I wonder if the limit is respected for table scans.
Things I want to compare

  • reading raw parquet file with pyarrow
  • reading entire iceberg table, without limits
  • reading iceberg table, with limit of 1
  • reading iceberg table with duckdb
  • reading iceberg table with duckdb, with limit of 1

I think this will give us some insights about read performance in pyiceberg

For reading raw parquet files, you can do something like this,

import pyarrow.parquet as pq
import time

parquet_file_path = ""
start_time = time.time()
table = pq.read_table(parquet_file_path)
end_time = time.time()
time_taken = end_time - start_time
print(f"Time taken to read the Parquet file: {time_taken} seconds")

@jkleinkauff
Copy link
Author

Hi @kevinjqliu thank you for your time!

Those are my findings:

I've included a read_parquet method from awswrangler. Don't know why, but it's by far the fastest method.

reading raw parquet file with awswrangler:
read_raw_parquet_awswrangler.py

1st run: Time taken to read the Parquet file: 16.10819697380066 seconds
2nd run: Time taken to read the Parquet file: 16.06696915626526 seconds
3rd run: Time taken to read the Parquet file: 14.28455901145935 seconds

reading raw parquet file with pyarrow
read_raw_parquet_pyarrow.py

1st run: Time taken to read the Parquet file: 39.86264896392822 seconds
2nd run: Time taken to read the Parquet file: 39.484612226486206 seconds
3rd run: Time taken to read the Parquet file: 26.693129062652588 seconds

reading entire iceberg table, without limits
read_iceberg_full.py

1st run: Time taken to read the Iceberg table: 21.632921934127808 seconds
2nd run: Time taken to read the Iceberg table: 36.94430899620056 seconds
3rd run: Time taken to read the Iceberg table: 49.66138482093811 seconds

reading iceberg table, with limit of 1
read_iceberg_limit.py

1st run: Time taken to read the Iceberg table: 45.886711835861206 seconds
2nd run: Time taken to read the Iceberg table: 29.464744091033936 seconds
3rd run: Time taken to read the Iceberg table: 44.78428387641907 seconds

reading iceberg table with duckdb
read_iceberg_duckdb.py

1st run: Time taken to read the Parquet file: 59.5912652015686 seconds
2nd run: Time taken to read the Parquet file: 61.646626710891724 seconds
3rd run: Time taken to read the Parquet file: 58.97534728050232 seconds

Proxying through motherduck (con = duckdb.connect("md:db")):
1st run: Time taken to read the Parquet file: 105.63072204589844 seconds
2nd run: Time taken to read the Parquet file: 144.91437602043152 seconds
3rd run: Time taken to read the Parquet file: 176.27135396003723 seconds

eading iceberg table with duckdb, with limit of 1
read_iceberg_duckdb_limit.py

1st run: Time taken to read the Parquet file: 63.78661298751831 seconds
2nd run: Time taken to read the Parquet file: 79.1733546257019 seconds
3rd run: Time taken to read the Parquet file: 80.755441904068 seconds
Proxying through motherduck:
Why md is faster here? It somehow is pushing it
1st run: Time taken to read the Parquet file: 3.524472951889038 seconds
2nd run: Time taken to read the Parquet file: 3.4903008937835693 seconds
3rd run: Time taken to read the Parquet file: 3.258246898651123 seconds

@kevinjqliu
Copy link
Contributor

kevinjqliu commented Aug 11, 2024

Thanks for looking into the different scenarios. It looks like there are varying results depending on the engines.

Read Path

I took a deeper look into the read path for PyIceberg and discovered a missing optimization.
For context, reading an Iceberg table is done through the internal TableScan API. For example, catalog.load_table(“foo.bar”).scan().to_arrow() produces a pyarrow table. Tracing the call stack, here are the main areas to look at

  • to_arrow, gathers the TableScan parameters and materializes a Pyarrow Table. It calls project_table with an iterable of FileScanTask (self.plan_files()) and specified limit, if any.
  • project_table, takes the FileScanTasks, which corresponds to parquet files, and processes them asynchronously with futures. The futures will stop early if the limit is satisfied.

So given an Iceberg table with 2 files. scan(limit=1).to_arrow() will produce 2 FileScanTasks (corresponding with each parquet file). Both tasks will be submitted simultaneously. After reading the entire parquet file, one of the tasks is completed. The limit is satisfied and the other future is cancelled.

The important note here is that the unit of work is per FileScanTask, which means per parquet file read. The limit will be checked only after each future completion. This means at least 1 parquet file will be read entirely before the limit is applied. The limit is never pushed down to the parquet file read level. Even when limit=1 is specified, the entire parquet file will be read. In your example, with or without limit, an entire ~100mb data file will be read.

The potential optimization here is to pushdown the limit to _task_to_table and in turn to _task_to_record_batches. Perhaps check the limit against the number of records when yielding each batch.
In fact, this optimization is already done in project_batches , which is used by to_arrow_batch_reader (an alternative to to_arrow).

Unify implementations

As a side note, there might be an opportunity to unify the implementation details of to_arrow and to_arrow_batch_reader.

to_arrow_batch_reader bug

There is also a bug with to_arrow_batch_reader, as it does not respecting the given limit, returning more records than specified. The bug is in project_batches, specifically with the way yield interacts with the two for-loops. Here’s a Jupyter notebook reproducing the issue, see the last cell and the number of rows read by using to_arrow vs to_arrow_batch_reader.

Summary

@jkleinkauff
Copy link
Author

@kevinjqliu that's awesome! Thank you so much !
I have one more question regarding the read_parquet from awswrangler.
Do you know why it's faster than the other methods? Is there any optimization on their end or something?

@corleyma
Copy link

@jkleinkauff Without profiling it's hard to say for sure, but I can make a bet based on my experience with object storage clients/fsspec implementations in particular:

This is one thing that will have a big impact, but there are likely other differences too since, as you can see, awswrangler is not using the fsspec s3fs implementation and has its own wrappers for i/o over S3... There's also a lot of stuff I see there in awswranglerthat is similar to some of the optimizations pyarrow makes in its native blob storage filesystem implementations; maybe the fsspec s3fs implementation supports some of these too, but I'm less familiar with current state of that project. At least historically, a lot of the defaults were tuned for random access, and not really the best for maximizing throughput in columnar workloads; pyiceberg probably should not be using all defaults.

I believe pyiceberg also provides FileIO implementations that use pyarrow native filesystems instead of fsspec. I am curious what you find if you benchmark that? There may also be defaults there that pyiceberg should be tuning, but I suspect they will still be better tuned for the kinds of reads pyiceberg is doing.

@kevinjqliu
Copy link
Contributor

I have one more question regarding the read_parquet from awswrangler.
Do you know why it's faster than the other methods? Is there any optimization on their end or something?

I was also surprised by the performance difference. It's hard for me to say unless I look into the implementation details (in awswrangler/s3/_read_parquet.py). There's definitely room for optimizations on the PyIceberg side.

If you look at another engine like daft, which is optimized for reading parquet on S3, that's a good target for potential performance gains.

On the PyIceberg side, there's a future opportunity to integrate with iceberg-rust, which might speed up reading files.

@kevinjqliu
Copy link
Contributor

you're benchmarking the fsspec FileIO path in pyiceberg, which if I understand correctly is using fsspec s3fs directly with a lot of defaults. Probably it keeps the default block size (5mb).

There are two FileIO implementations, fsspec and pyarrow. In the case above, I believe pyarrow is used, since its preferred over fsspec (source)

Looks like the pyarrow default buffer size is 1MB

buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),

@jkleinkauff
Copy link
Author

@sungwy @kevinjqliu I'm really enjoying this discussion and learning a ton from it. Would love to keep it going but feel free to close it as this is not an issue. Thank you folks!

@kevinjqliu
Copy link
Contributor

Thanks for reporting this. I learned a lot from exploring this thread, and we have some solid improvements coming up. Please let us know if anything else comes up!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants