Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Proposal for sub-rowgroup reading in the parquet reader. #14270

Closed
nvdbaranec opened this issue Oct 11, 2023 · 2 comments · Fixed by #14360
Closed

[FEA] Proposal for sub-rowgroup reading in the parquet reader. #14270

nvdbaranec opened this issue Oct 11, 2023 · 2 comments · Fixed by #14360
Assignees
Labels
2 - In Progress Currently a work in progress cuIO cuIO issue feature request New feature or request improvement Improvement / enhancement to an existing function libcudf Affects libcudf (C++/CUDA) code. no-oom Reducing memory footprint of cudf algorithms

Comments

@nvdbaranec
Copy link
Contributor

The Spark RAPIDS plugin team has customers with parquet files that have proven tricky to load with the existing reader. There have been several categories of problems:

  • Output sizes that are larger than the 32 bit size limits of cuDF.
  • Files that are large enough such that their compressed and decompressed intermediate data does not fit on the GPU all at once.
  • Files where individual row groups are large enough such that their compressed and decompressed intermediate data does not fit on the GPU all at once.

Up to this point, we have handled the first two items. The first pass at the chunked reader takes care of output sizes by breaking decoded output up into "chunks". The second item is handled with more functionality in the chunked reader that divides the input stage up into "passes" that attempt to limit how much memory is used for the temporary decompression data.

However we have run into situations where this is not enough. We have at least one case where there is a file containing a single row group with a single column in it. The compressed size is ~120 MB, but the decompressed size is ~19 GB. This causes out of memory issues immediately.

So the solution is to change things such that our input passes happen at the sub-rowgroup level. A quick overview of the way things are arranged in Parquet:

Column A                          Column B                      ...
   Row Group 1, Chunk A             Row Group 1, Chunk B
      Page 0                            Page 0
      Page 1                            Page 1                    
      ...                               ...

The file is broken up into "row groups", which are the format's mechanism for allowing the user to select what rows to load out of the file. Within each row group, each column contains a single "chunk". Each chunk is the unit of IO, containing page header information and the compressed data for the pages themselves. Each chunk contains a variable number of pages, and the pages themselves are individually compressed/decompressed.

So while we have to read whole rowgroup chunks in one shot, we do not have to decompress them in one shot.

At the input level, the reader currently does the following:

L = user requested limit on temporary memory usage for io and decompression
while(rowgroups left to read){
   R = select_more_row_groups_that_will_fit_in_memory(L);
   D = load_and_decompress_rowgroups(R);
   O = decode_decompressed_data(D);   
}

Additionally, the last step may produce more than one "chunk" of actual output but that detail is left out here. The issue we face is when R contains just 1 rowgroup that is too large by itself to process. So the proposal here is to add another layer which processes groups of pages below the rowgroups themselves.

L = user requested limit on temporary memory usage for io and decompression
while(rowgroups left to read){
   R = select_more_row_groups(L);
   while(there_are_pages_left_to_decode(R)){
      P = load_and_decompress_more_pages(R, L);
      O = decode_decompressed_data(P);   
   }
}

At the high level this is a reasonably straightforward change. The code in the reader's load_and_decompress_data

void reader::impl::load_and_decompress_data()
, reader_and_decompress_column_chunks
std::pair<bool, std::vector<std::future<void>>> reader::impl::read_and_decompress_column_chunks()
and decompress_page_data
[[nodiscard]] rmm::device_buffer decompress_page_data(
functions needs to be broken up and arranged such that it is done iteratively as part of the overall chunking/input pass loop.

There are a couple of fuzzy details though. At the point where we call select_more_row_groups(L) we will not know the number of pages or their individual compressed and decompressed sizes. All we will know is the total compressed and decompressed size for an entire column chunk. In our bad case here, this is 120 MB and 19 GB, respectively. We expect to see limit values (L) of maybe 2 GB or 4 GB. It is generally safe to expect that individual pages are roughly 1 MB of uncompressed data (For our example data, the 19 GB of decompressed data is contained in 15,000 pages so it holds roughly true). So how do we choose the granularity of chunks to load?

Option 1 : Only consider compressed size and fill the limit L up to some hardcoded ratio of compressed/decompressed data size. Say, 25% of L. So if the user has specified a 4 GB limit, we will read up to 1 GB of compressed data and use the remaining 3 GB for decompression. In our example case, we have 120 MB of compressed data, which would leave > 3.8 GB of space for decompressing and processing the pages in batches. But if we had a different file structure where we had 100 of these extremely large chunks, we might buffer up ~6 of them (120 MB x 6) before starting the decompression passes.

Option 2 : Consider (compressed + decompressed size). Read until the sum goes over the used specified limit L. The idea here would be that when we have huge disparities in compressed/decompressed size, we will prefer to read as little as we can and leave maximum space for batches of decompression.

Option 3 : ?

For our case, there would be no difference between options 1 and 2 since a single chunk explodes everything. But for other cases, option 1 would bias things towards doing larger IO reads and the cost of having less decompression space to work with. And option 2 would bias things towards maximizing decompression space for extreme cases.

It is worth mentioning that depending on the compression codec used, nvcomp itself may require a very large amount of temporary memory. For example using ZSTD, it may require as much as 2.5x the size of the decompressed data as purely temporary memory. Whereas SNAPPY requires no scratch space at all. And unfortunately, we will not know the compression types (they can be mixed within the file) until we have actually performed the reads and gotten the page headers.


Note that all of these chunked reader behaviors are optional and not default. The default behavior is to just attempt to read everything at once, requiring no user tweaking. Any implementation here would leave that as is and would be done in such a way that there should be no performance impact when "no limit" is chosen.

@nvdbaranec nvdbaranec added feature request New feature or request Needs Triage Need team to review and classify cuIO cuIO issue improvement Improvement / enhancement to an existing function labels Oct 11, 2023
@nvdbaranec nvdbaranec self-assigned this Oct 11, 2023
@GregoryKimball GregoryKimball added 2 - In Progress Currently a work in progress libcudf Affects libcudf (C++/CUDA) code. and removed Needs Triage Need team to review and classify labels Oct 11, 2023
@etseidl
Copy link
Contributor

etseidl commented Oct 12, 2023

A couple of thoughts. First, I'll point out that #14000 would go a long way towards fixing this mess. But that's in the hands of the parquet-format devs, so of no use right now.

Each chunk is the unit of IO, containing page header information and the compressed data for the pages themselves. Each chunk contains a variable number of pages, and the pages themselves are individually compressed/decompressed.
So while we have to read whole rowgroup chunks in one shot, we do not have to decompress them in one shot.

Possible Option 3:
I would argue that with the advent of the column and offset indexes the page is now the smallest unit of I/O. Of course this requires that the file is written with page indexes enabled. The offset index will give you page starting offsets and compressed sizes, and a readv like operation could be done to read all the page headers in a chunk to get the uncompressed sizes. Armed with this, you can calculate page ranges that will at least fit in RAM uncompressed (but not yet fully decoded), and then do a second readv to read in groups of pages, rather than whole column chunks. This approach would dovetail nicely with using the column index to do pushdown predicates. One wrinkle is dictionary pages...they would have to be read in with the first batch of pages from the row group, and ideally cached for later use by subsequent batches.

@gaohao95
Copy link
Contributor

Thanks for the writeup! I have some questions.

It is worth mentioning that depending on the compression codec used, nvcomp itself may require a very large amount of temporary memory. For example using ZSTD, it may require as much as 2.5x the size of the decompressed data as purely temporary memory. Whereas SNAPPY requires no scratch space at all. And unfortunately, we will not know the compression types (they can be mixed within the file) until we have actually performed the reads and gotten the page headers.

Optionally, Parquet file could store the compression codec in the ColumnMetaData, within the file footer. Is this generally available in the customer data?

If it is available, we could use it to know the codec of a column chunk before loading its content, and have a better understanding of how much temporary space nvComp is going to use.

Option 2 : Consider (compressed + decompressed size). Read until the sum goes over the used specified limit L. The idea here would be that when we have huge disparities in compressed/decompressed size, we will prefer to read as little as we can and leave maximum space for batches of decompression.

IIUC, here basically we replace the hardcoded value of 25% of L in Option 1 to a value that takes the compression ratio into account. For example, we could load 25% of L if the compression ratio is less than 3, and 12.5% of L if the compression ratio is 6, 6.25% of L if the compression ratio is 12 etc. The benefit is to avoid OOM when the compression ratio is extreme so that we don't have enough memory left for decompression. Is that the right understanding? If so, how could this be worse than Option 1? When the compression ratio is low, it performs exactly the same as Option 1. When the compression ratio is high, Optional 1 likely runs out of memory and Option 2 could still succeed.

@wence- wence- added the no-oom Reducing memory footprint of cudf algorithms label Nov 22, 2023
rapids-bot bot pushed a commit that referenced this issue Jan 25, 2024
closes #14270

Implementation of sub-rowgroup reading of Parquet files.  This PR implements an additional layer on top of the existing chunking system.  Currently, the reader takes two parameters:  `input_pass_read_limit` which specifies a limit on temporary memory usage when reading and decompressing file data;  and `output_pass_read_limit` which specifies a limit on how large an output chunk (a table) can be.

Currently when the user specifies a limit via `input_pass_read_limit`, the reader will perform multiple `passes` over the file at row-group granularity.  That is, it will control how many row groups it will read at once to conform to the specified limit.

However, there are cases where this is not sufficient.  So this PR changes things so that we now have `subpasses` below the top level `passes`.  It works as follows:

- We read a set of input chunks based on the `input_pass_read_limit` but we do not decompress them immediately. This constitutes a `pass`.
- Within each pass of compressed data, we progressively decompress batches of pages as `subpasses`.
- Within each `subpass` we apply the output limit to produce `chunks`.

So the overall structure of the reader is:  (read) `pass` -> (decompress) `subpass` -> (decode) `chunk`

Major sections of code changes:

- Previously the incoming page data in the file was unsorted. To handle this we later on produced a `page_index` that could be applied to the array to get them in schema-sorted order.  This was getting very unwieldy so I just sort the pages up front now and the `page_index` array has gone away.

- There are now two sets of pages to be aware of in the code.  Within each `pass_intermediate_data` there is the set of all pages within the current set of loaded row groups.  And then within the `subpass_intermediate_data` struct there is a separate array of pages representing the current batch of decompressed data we are processing.  To keep the confusion down I changed a good amount of code to always reference it's array though it's associated struct.  Ie,  `pass.pages` or `subpass.pages`. In addition, I removed the `page_info` from `ColumnChunkDesc` to help prevent the kernels from getting confused. `ColumnChunkDesc` now only has a `dict_page` field which is constant across all subpasses.

- The primary entry point for the chunking mechanism is in `handle_chunking`. Here we iterate through passes, subpasses and output chunks.  Successive subpasses are computed and preprocessed through here. 

- The volume of diffs you'll see in `reader_impl_chunking.cu` is a little deceptive.  A lot of this is just functions (or pieces of functions) that have been moved over from either `reader_impl_preprocess.cu` or `reader_impl_helpers.cpp`.   The most relevant actual changes are in: ` handle_chunking`, `compute_input_passes`, `compute_next_subpass`, and `compute_chunks_for_subpass`.

Note on tests:   I renamed `parquet_chunked_reader_tests.cpp` to `parquet_chunked_reader_test.cu` as I needed to use thrust. The only actual changes in the file are the addition of the `ParquetChunkedReaderInputLimitConstrainedTest` and `ParquetChunkedReaderInputLimitTest` test suites at the bottom.

Authors:
  - https://github.com/nvdbaranec
  - Nghia Truong (https://github.com/ttnghia)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Nghia Truong (https://github.com/ttnghia)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #14360
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
2 - In Progress Currently a work in progress cuIO cuIO issue feature request New feature or request improvement Improvement / enhancement to an existing function libcudf Affects libcudf (C++/CUDA) code. no-oom Reducing memory footprint of cudf algorithms
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

6 participants