-
Notifications
You must be signed in to change notification settings - Fork 886
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEA] Proposal for sub-rowgroup reading in the parquet reader. #14270
Comments
A couple of thoughts. First, I'll point out that #14000 would go a long way towards fixing this mess. But that's in the hands of the parquet-format devs, so of no use right now.
Possible Option 3: |
Thanks for the writeup! I have some questions.
Optionally, Parquet file could store the compression codec in the If it is available, we could use it to know the codec of a column chunk before loading its content, and have a better understanding of how much temporary space nvComp is going to use.
IIUC, here basically we replace the hardcoded value of |
closes #14270 Implementation of sub-rowgroup reading of Parquet files. This PR implements an additional layer on top of the existing chunking system. Currently, the reader takes two parameters: `input_pass_read_limit` which specifies a limit on temporary memory usage when reading and decompressing file data; and `output_pass_read_limit` which specifies a limit on how large an output chunk (a table) can be. Currently when the user specifies a limit via `input_pass_read_limit`, the reader will perform multiple `passes` over the file at row-group granularity. That is, it will control how many row groups it will read at once to conform to the specified limit. However, there are cases where this is not sufficient. So this PR changes things so that we now have `subpasses` below the top level `passes`. It works as follows: - We read a set of input chunks based on the `input_pass_read_limit` but we do not decompress them immediately. This constitutes a `pass`. - Within each pass of compressed data, we progressively decompress batches of pages as `subpasses`. - Within each `subpass` we apply the output limit to produce `chunks`. So the overall structure of the reader is: (read) `pass` -> (decompress) `subpass` -> (decode) `chunk` Major sections of code changes: - Previously the incoming page data in the file was unsorted. To handle this we later on produced a `page_index` that could be applied to the array to get them in schema-sorted order. This was getting very unwieldy so I just sort the pages up front now and the `page_index` array has gone away. - There are now two sets of pages to be aware of in the code. Within each `pass_intermediate_data` there is the set of all pages within the current set of loaded row groups. And then within the `subpass_intermediate_data` struct there is a separate array of pages representing the current batch of decompressed data we are processing. To keep the confusion down I changed a good amount of code to always reference it's array though it's associated struct. Ie, `pass.pages` or `subpass.pages`. In addition, I removed the `page_info` from `ColumnChunkDesc` to help prevent the kernels from getting confused. `ColumnChunkDesc` now only has a `dict_page` field which is constant across all subpasses. - The primary entry point for the chunking mechanism is in `handle_chunking`. Here we iterate through passes, subpasses and output chunks. Successive subpasses are computed and preprocessed through here. - The volume of diffs you'll see in `reader_impl_chunking.cu` is a little deceptive. A lot of this is just functions (or pieces of functions) that have been moved over from either `reader_impl_preprocess.cu` or `reader_impl_helpers.cpp`. The most relevant actual changes are in: ` handle_chunking`, `compute_input_passes`, `compute_next_subpass`, and `compute_chunks_for_subpass`. Note on tests: I renamed `parquet_chunked_reader_tests.cpp` to `parquet_chunked_reader_test.cu` as I needed to use thrust. The only actual changes in the file are the addition of the `ParquetChunkedReaderInputLimitConstrainedTest` and `ParquetChunkedReaderInputLimitTest` test suites at the bottom. Authors: - https://github.com/nvdbaranec - Nghia Truong (https://github.com/ttnghia) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - Nghia Truong (https://github.com/ttnghia) - Vukasin Milovanovic (https://github.com/vuule) URL: #14360
The Spark RAPIDS plugin team has customers with parquet files that have proven tricky to load with the existing reader. There have been several categories of problems:
Up to this point, we have handled the first two items. The first pass at the chunked reader takes care of output sizes by breaking decoded output up into "chunks". The second item is handled with more functionality in the chunked reader that divides the input stage up into "passes" that attempt to limit how much memory is used for the temporary decompression data.
However we have run into situations where this is not enough. We have at least one case where there is a file containing a single row group with a single column in it. The compressed size is
~120 MB
, but the decompressed size is~19 GB
. This causes out of memory issues immediately.So the solution is to change things such that our input passes happen at the sub-rowgroup level. A quick overview of the way things are arranged in Parquet:
The file is broken up into "row groups", which are the format's mechanism for allowing the user to select what rows to load out of the file. Within each row group, each column contains a single "chunk". Each chunk is the unit of IO, containing page header information and the compressed data for the pages themselves. Each chunk contains a variable number of pages, and the pages themselves are individually compressed/decompressed.
So while we have to read whole rowgroup chunks in one shot, we do not have to decompress them in one shot.
At the input level, the reader currently does the following:
Additionally, the last step may produce more than one "chunk" of actual output but that detail is left out here. The issue we face is when R contains just 1 rowgroup that is too large by itself to process. So the proposal here is to add another layer which processes groups of pages below the rowgroups themselves.
At the high level this is a reasonably straightforward change. The code in the reader's
load_and_decompress_data
cudf/cpp/src/io/parquet/reader_impl_preprocess.cu
Line 751 in 301dce1
reader_and_decompress_column_chunks
cudf/cpp/src/io/parquet/reader_impl_preprocess.cu
Line 682 in 301dce1
decompress_page_data
cudf/cpp/src/io/parquet/reader_impl_preprocess.cu
Line 348 in 301dce1
There are a couple of fuzzy details though. At the point where we call
select_more_row_groups(L)
we will not know the number of pages or their individual compressed and decompressed sizes. All we will know is the total compressed and decompressed size for an entire column chunk. In our bad case here, this is120 MB
and19 GB
, respectively. We expect to see limit values (L) of maybe 2 GB or 4 GB. It is generally safe to expect that individual pages are roughly1 MB
of uncompressed data (For our example data, the19 GB
of decompressed data is contained in15,000
pages so it holds roughly true). So how do we choose the granularity of chunks to load?Option 1 : Only consider compressed size and fill the limit L up to some hardcoded ratio of compressed/decompressed data size. Say,
25%
of L. So if the user has specified a4 GB
limit, we will read up to1 GB
of compressed data and use the remaining3 GB
for decompression. In our example case, we have120 MB
of compressed data, which would leave >3.8 GB
of space for decompressing and processing the pages in batches. But if we had a different file structure where we had 100 of these extremely large chunks, we might buffer up ~6 of them (120 MB x 6) before starting the decompression passes.Option 2 : Consider (compressed + decompressed size). Read until the sum goes over the used specified limit L. The idea here would be that when we have huge disparities in compressed/decompressed size, we will prefer to read as little as we can and leave maximum space for batches of decompression.
Option 3 : ?
For our case, there would be no difference between options 1 and 2 since a single chunk explodes everything. But for other cases, option 1 would bias things towards doing larger IO reads and the cost of having less decompression space to work with. And option 2 would bias things towards maximizing decompression space for extreme cases.
It is worth mentioning that depending on the compression codec used, nvcomp itself may require a very large amount of temporary memory. For example using ZSTD, it may require as much as 2.5x the size of the decompressed data as purely temporary memory. Whereas SNAPPY requires no scratch space at all. And unfortunately, we will not know the compression types (they can be mixed within the file) until we have actually performed the reads and gotten the page headers.
Note that all of these chunked reader behaviors are optional and not default. The default behavior is to just attempt to read everything at once, requiring no user tweaking. Any implementation here would leave that as is and would be done in such a way that there should be no performance impact when "no limit" is chosen.
The text was updated successfully, but these errors were encountered: