Skip to content

Commit

Permalink
Parquet sub-rowgroup reading. (#14360)
Browse files Browse the repository at this point in the history
closes #14270

Implementation of sub-rowgroup reading of Parquet files.  This PR implements an additional layer on top of the existing chunking system.  Currently, the reader takes two parameters:  `input_pass_read_limit` which specifies a limit on temporary memory usage when reading and decompressing file data;  and `output_pass_read_limit` which specifies a limit on how large an output chunk (a table) can be.

Currently when the user specifies a limit via `input_pass_read_limit`, the reader will perform multiple `passes` over the file at row-group granularity.  That is, it will control how many row groups it will read at once to conform to the specified limit.

However, there are cases where this is not sufficient.  So this PR changes things so that we now have `subpasses` below the top level `passes`.  It works as follows:

- We read a set of input chunks based on the `input_pass_read_limit` but we do not decompress them immediately. This constitutes a `pass`.
- Within each pass of compressed data, we progressively decompress batches of pages as `subpasses`.
- Within each `subpass` we apply the output limit to produce `chunks`.

So the overall structure of the reader is:  (read) `pass` -> (decompress) `subpass` -> (decode) `chunk`

Major sections of code changes:

- Previously the incoming page data in the file was unsorted. To handle this we later on produced a `page_index` that could be applied to the array to get them in schema-sorted order.  This was getting very unwieldy so I just sort the pages up front now and the `page_index` array has gone away.

- There are now two sets of pages to be aware of in the code.  Within each `pass_intermediate_data` there is the set of all pages within the current set of loaded row groups.  And then within the `subpass_intermediate_data` struct there is a separate array of pages representing the current batch of decompressed data we are processing.  To keep the confusion down I changed a good amount of code to always reference it's array though it's associated struct.  Ie,  `pass.pages` or `subpass.pages`. In addition, I removed the `page_info` from `ColumnChunkDesc` to help prevent the kernels from getting confused. `ColumnChunkDesc` now only has a `dict_page` field which is constant across all subpasses.

- The primary entry point for the chunking mechanism is in `handle_chunking`. Here we iterate through passes, subpasses and output chunks.  Successive subpasses are computed and preprocessed through here. 

- The volume of diffs you'll see in `reader_impl_chunking.cu` is a little deceptive.  A lot of this is just functions (or pieces of functions) that have been moved over from either `reader_impl_preprocess.cu` or `reader_impl_helpers.cpp`.   The most relevant actual changes are in: ` handle_chunking`, `compute_input_passes`, `compute_next_subpass`, and `compute_chunks_for_subpass`.

Note on tests:   I renamed `parquet_chunked_reader_tests.cpp` to `parquet_chunked_reader_test.cu` as I needed to use thrust. The only actual changes in the file are the addition of the `ParquetChunkedReaderInputLimitConstrainedTest` and `ParquetChunkedReaderInputLimitTest` test suites at the bottom.

Authors:
  - https://github.com/nvdbaranec
  - Nghia Truong (https://github.com/ttnghia)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Nghia Truong (https://github.com/ttnghia)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #14360
  • Loading branch information
nvdbaranec authored Jan 25, 2024
1 parent f800f5a commit 5b1eef3
Show file tree
Hide file tree
Showing 14 changed files with 2,387 additions and 1,070 deletions.
24 changes: 21 additions & 3 deletions cpp/src/io/comp/nvcomp_adapter.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -99,8 +99,8 @@ inline bool operator==(feature_status_parameters const& lhs, feature_status_para
* @param[in] inputs List of input buffers
* @param[out] outputs List of output buffers
* @param[out] results List of output status structures
* @param[in] max_uncomp_chunk_size maximum size of uncompressed chunk
* @param[in] max_total_uncomp_size maximum total size of uncompressed data
* @param[in] max_uncomp_chunk_size Maximum size of any single uncompressed chunk
* @param[in] max_total_uncomp_size Maximum total size of uncompressed data
* @param[in] stream CUDA stream to use
*/
void batched_decompress(compression_type compression,
Expand All @@ -111,6 +111,24 @@ void batched_decompress(compression_type compression,
size_t max_total_uncomp_size,
rmm::cuda_stream_view stream);

/**
* @brief Return the amount of temporary space required in bytes for a given decompression
* operation.
*
* The size returned reflects the size of the scratch buffer to be passed to
* `batched_decompress_async`
*
* @param[in] compression Compression type
* @param[in] num_chunks The number of decompression chunks to be processed
* @param[in] max_uncomp_chunk_size Maximum size of any single uncompressed chunk
* @param[in] max_total_uncomp_size Maximum total size of uncompressed data
* @returns The total required size in bytes
*/
size_t batched_decompress_temp_size(compression_type compression,
size_t num_chunks,
size_t max_uncomp_chunk_size,
size_t max_total_uncomp_size);

/**
* @brief Gets the maximum size any chunk could compress to in the batch.
*
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -1301,16 +1301,15 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
if (((s->col.data_type & 7) == BYTE_ARRAY) && (s->col.str_dict_index)) {
// String dictionary: use index
s->dict_base = reinterpret_cast<uint8_t const*>(s->col.str_dict_index);
s->dict_size = s->col.page_info[0].num_input_values * sizeof(string_index_pair);
s->dict_size = s->col.dict_page->num_input_values * sizeof(string_index_pair);
} else {
s->dict_base =
s->col.page_info[0].page_data; // dictionary is always stored in the first page
s->dict_size = s->col.page_info[0].uncompressed_page_size;
s->dict_base = s->col.dict_page->page_data;
s->dict_size = s->col.dict_page->uncompressed_page_size;
}
s->dict_run = 0;
s->dict_val = 0;
s->dict_bits = (cur < end) ? *cur++ : 0;
if (s->dict_bits > 32 || !s->dict_base) {
if (s->dict_bits > 32 || (!s->dict_base && s->col.dict_page->num_input_values > 0)) {
s->set_error_code(decode_error::INVALID_DICT_WIDTH);
}
break;
Expand Down
27 changes: 15 additions & 12 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,11 @@ struct gpuParsePageHeader {
* @param[in] num_chunks Number of column chunks
*/
// blockDim {128,1,1}
CUDF_KERNEL void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks,
int32_t num_chunks,
kernel_error::pointer error_code)
CUDF_KERNEL
void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks,
chunk_page_info* chunk_pages,
int32_t num_chunks,
kernel_error::pointer error_code)
{
using cudf::detail::warp_size;
gpuParsePageHeader parse_page_header;
Expand Down Expand Up @@ -392,11 +394,10 @@ CUDF_KERNEL void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* ch
bs->page.temp_string_buf = nullptr;
bs->page.kernel_mask = decode_kernel_mask::NONE;
}
num_values = bs->ck.num_values;
page_info = bs->ck.page_info;
num_dict_pages = bs->ck.num_dict_pages;
max_num_pages = (page_info) ? bs->ck.max_num_pages : 0;
values_found = 0;
num_values = bs->ck.num_values;
page_info = chunk_pages ? chunk_pages[chunk].pages : nullptr;
max_num_pages = page_info ? bs->ck.max_num_pages : 0;
values_found = 0;
__syncwarp();
while (values_found < num_values && bs->cur < bs->end) {
int index_out = -1;
Expand Down Expand Up @@ -495,9 +496,9 @@ CUDF_KERNEL void __launch_bounds__(128)
if (!lane_id && ck->num_dict_pages > 0 && ck->str_dict_index) {
// Data type to describe a string
string_index_pair* dict_index = ck->str_dict_index;
uint8_t const* dict = ck->page_info[0].page_data;
int dict_size = ck->page_info[0].uncompressed_page_size;
int num_entries = ck->page_info[0].num_input_values;
uint8_t const* dict = ck->dict_page->page_data;
int dict_size = ck->dict_page->uncompressed_page_size;
int num_entries = ck->dict_page->num_input_values;
int pos = 0, cur = 0;
for (int i = 0; i < num_entries; i++) {
int len = 0;
Expand All @@ -518,13 +519,15 @@ CUDF_KERNEL void __launch_bounds__(128)
}

void __host__ DecodePageHeaders(ColumnChunkDesc* chunks,
chunk_page_info* chunk_pages,
int32_t num_chunks,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
dim3 dim_block(128, 1);
dim3 dim_grid((num_chunks + 3) >> 2, 1); // 1 chunk per warp, 4 warps per block
gpuDecodePageHeaders<<<dim_grid, dim_block, 0, stream.value()>>>(chunks, num_chunks, error_code);
gpuDecodePageHeaders<<<dim_grid, dim_block, 0, stream.value()>>>(
chunks, chunk_pages, num_chunks, error_code);
}

void __host__ BuildStringDictionaryIndex(ColumnChunkDesc* chunks,
Expand Down
10 changes: 6 additions & 4 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -868,14 +868,16 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputePageStringSi
if (col.str_dict_index) {
// String dictionary: use index
dict_base = reinterpret_cast<const uint8_t*>(col.str_dict_index);
dict_size = col.page_info[0].num_input_values * sizeof(string_index_pair);
dict_size = col.dict_page->num_input_values * sizeof(string_index_pair);
} else {
dict_base = col.page_info[0].page_data; // dictionary is always stored in the first page
dict_size = col.page_info[0].uncompressed_page_size;
dict_base = col.dict_page->page_data;
dict_size = col.dict_page->uncompressed_page_size;
}

// FIXME: need to return an error condition...this won't actually do anything
if (s->dict_bits > 32 || !dict_base) { CUDF_UNREACHABLE("invalid dictionary bit size"); }
if (s->dict_bits > 32 || (!dict_base && col.dict_page->num_input_values > 0)) {
CUDF_UNREACHABLE("invalid dictionary bit size");
}

str_bytes = totalDictEntriesSize(
data, dict_base, s->dict_bits, dict_size, (end - data), start_value, end_value);
Expand Down
60 changes: 43 additions & 17 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION.
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -339,6 +339,21 @@ struct PageInfo {
decode_kernel_mask kernel_mask;
};

/**
* @brief Return the column schema id as the key for a PageInfo struct.
*/
struct get_page_key {
__device__ int32_t operator()(PageInfo const& page) const { return page.src_col_schema; }
};

/**
* @brief Return an iterator that returns they keys for a vector of pages.
*/
inline auto make_page_key_iterator(device_span<PageInfo const> pages)
{
return thrust::make_transform_iterator(pages.begin(), get_page_key{});
}

/**
* @brief Struct describing a particular chunk of column data
*/
Expand All @@ -362,7 +377,8 @@ struct ColumnChunkDesc {
int8_t decimal_precision_,
int32_t ts_clock_rate_,
int32_t src_col_index_,
int32_t src_col_schema_)
int32_t src_col_schema_,
float list_bytes_per_row_est_)
: compressed_data(compressed_data_),
compressed_size(compressed_size_),
num_values(num_values_),
Expand All @@ -375,7 +391,7 @@ struct ColumnChunkDesc {
num_data_pages(0),
num_dict_pages(0),
max_num_pages(0),
page_info(nullptr),
dict_page(nullptr),
str_dict_index(nullptr),
valid_map_base{nullptr},
column_data_base{nullptr},
Expand All @@ -386,26 +402,25 @@ struct ColumnChunkDesc {
decimal_precision(decimal_precision_),
ts_clock_rate(ts_clock_rate_),
src_col_index(src_col_index_),
src_col_schema(src_col_schema_)
src_col_schema(src_col_schema_),
list_bytes_per_row_est(list_bytes_per_row_est_)
{
}

uint8_t const* compressed_data{}; // pointer to compressed column chunk data
size_t compressed_size{}; // total compressed data size for this chunk
size_t num_values{}; // total number of values in this column
size_t start_row{}; // starting row of this chunk
uint32_t num_rows{}; // number of rows in this chunk
uint8_t const* compressed_data{}; // pointer to compressed column chunk data
size_t compressed_size{}; // total compressed data size for this chunk
size_t num_values{}; // total number of values in this column
size_t start_row{}; // file-wide, absolute starting row of this chunk
uint32_t num_rows{}; // number of rows in this chunk
int16_t max_level[level_type::NUM_LEVEL_TYPES]{}; // max definition/repetition level
int16_t max_nesting_depth{}; // max nesting depth of the output
uint16_t data_type{}; // basic column data type, ((type_length << 3) |
// parquet::Type)
uint16_t data_type{}; // basic column data type, ((type_length << 3) | // parquet::Type)
uint8_t
level_bits[level_type::NUM_LEVEL_TYPES]{}; // bits to encode max definition/repetition levels
int32_t num_data_pages{}; // number of data pages
int32_t num_dict_pages{}; // number of dictionary pages
int32_t max_num_pages{}; // size of page_info array
PageInfo* page_info{}; // output page info for up to num_dict_pages +
// num_data_pages (dictionary pages first)
level_bits[level_type::NUM_LEVEL_TYPES]{}; // bits to encode max definition/repetition levels
int32_t num_data_pages{}; // number of data pages
int32_t num_dict_pages{}; // number of dictionary pages
int32_t max_num_pages{}; // size of page_info array
PageInfo const* dict_page{};
string_index_pair* str_dict_index{}; // index for string dictionary
bitmask_type** valid_map_base{}; // base pointers of valid bit map for this column
void** column_data_base{}; // base pointers of column data
Expand All @@ -418,6 +433,15 @@ struct ColumnChunkDesc {

int32_t src_col_index{}; // my input column index
int32_t src_col_schema{}; // my schema index in the file

float list_bytes_per_row_est{}; // for LIST columns, an estimate on number of bytes per row
};

/**
* @brief A utility structure for use in decoding page headers.
*/
struct chunk_page_info {
PageInfo* pages;
};

/**
Expand Down Expand Up @@ -578,11 +602,13 @@ constexpr bool is_string_col(ColumnChunkDesc const& chunk)
* @brief Launches kernel for parsing the page headers in the column chunks
*
* @param[in] chunks List of column chunks
* @param[in] chunk_pages List of pages associated with the chunks, in chunk-sorted order
* @param[in] num_chunks Number of column chunks
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use
*/
void DecodePageHeaders(ColumnChunkDesc* chunks,
chunk_page_info* chunk_pages,
int32_t num_chunks,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);
Expand Down
Loading

0 comments on commit 5b1eef3

Please sign in to comment.