Skip to content

Commit

Permalink
Report number of rows per file read by PQ reader when no row selectio…
Browse files Browse the repository at this point in the history
…n and fix segfault in chunked PQ reader when skip_rows > 0 (#16195)

Closes #15389
Closes #16186

This PR adds the capability to calculate and report the number of rows read from each data source into the table returned by the Parquet reader (both chunked and normal). The returned vector of counts is only valid (non-empty) when row selection (AST filter) is not being used.

This PR also fixes a segfault in chunked parquet reader when skip_rows > 0 and the number of passes > 1. This segfault was being caused by a couple of arithmetic errors when computing the (start_row, num_row)  for row_group_info, pass, column chunk descriptor structs.

Both changes were added to this PR as changes and the gtests from the former work were needed to implement the segfault fix.

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #16195
  • Loading branch information
mhaseeb123 authored Jul 20, 2024
1 parent 535db9b commit 75335f6
Show file tree
Hide file tree
Showing 11 changed files with 796 additions and 43 deletions.
3 changes: 3 additions & 0 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ struct column_name_info {
struct table_metadata {
std::vector<column_name_info>
schema_info; //!< Detailed name information for the entire output hierarchy
std::vector<size_t> num_rows_per_source; //!< Number of rows read from each data source.
//!< Currently only computed for Parquet readers if no
//!< AST filters being used. Empty vector otherwise.
std::map<std::string, std::string> user_data; //!< Format-dependent metadata of the first input
//!< file as key-values pairs (deprecated)
std::vector<std::unordered_map<std::string, std::string>>
Expand Down
86 changes: 83 additions & 3 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <rmm/resource_ref.hpp>

#include <thrust/binary_search.h>
#include <thrust/iterator/counting_iterator.h>

#include <bitset>
Expand Down Expand Up @@ -549,7 +550,17 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode)
out_columns.reserve(_output_buffers.size());

// no work to do (this can happen on the first pass if we have no rows to read)
if (!has_more_work()) { return finalize_output(out_metadata, out_columns); }
if (!has_more_work()) {
// Check if number of rows per source should be included in output metadata.
if (include_output_num_rows_per_source()) {
// Empty dataframe case: Simply initialize to a list of zeros
out_metadata.num_rows_per_source =
std::vector<size_t>(_file_itm_data.num_rows_per_source.size(), 0);
}

// Finalize output
return finalize_output(mode, out_metadata, out_columns);
}

auto& pass = *_pass_itm_data;
auto& subpass = *pass.subpass;
Expand Down Expand Up @@ -585,11 +596,80 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode)
}
}

// Check if number of rows per source should be included in output metadata.
if (include_output_num_rows_per_source()) {
// For chunked reading, compute the output number of rows per source
if (mode == read_mode::CHUNKED_READ) {
out_metadata.num_rows_per_source =
calculate_output_num_rows_per_source(read_info.skip_rows, read_info.num_rows);
}
// Simply move the number of rows per file if reading all at once
else {
// Move is okay here as we are reading in one go.
out_metadata.num_rows_per_source = std::move(_file_itm_data.num_rows_per_source);
}
}

// Add empty columns if needed. Filter output columns based on filter.
return finalize_output(out_metadata, out_columns);
return finalize_output(mode, out_metadata, out_columns);
}

std::vector<size_t> reader::impl::calculate_output_num_rows_per_source(size_t const chunk_start_row,
size_t const chunk_num_rows)
{
// Handle base cases.
if (_file_itm_data.num_rows_per_source.size() == 0) {
return {};
} else if (_file_itm_data.num_rows_per_source.size() == 1) {
return {chunk_num_rows};
}

std::vector<size_t> num_rows_per_source(_file_itm_data.num_rows_per_source.size(), 0);

// Subtract global skip rows from the start_row as we took care of that when computing
// _file_itm_data.num_rows_per_source
auto const start_row = chunk_start_row - _file_itm_data.global_skip_rows;
auto const end_row = start_row + chunk_num_rows;
CUDF_EXPECTS(start_row <= end_row and end_row <= _file_itm_data.global_num_rows,
"Encountered invalid output chunk row bounds.");

// Copy reference to a const local variable for better readability
auto const& partial_sum_nrows_source = _file_itm_data.exclusive_sum_num_rows_per_source;

// Binary search start_row and end_row in exclusive_sum_num_rows_per_source vector
auto const start_iter =
std::upper_bound(partial_sum_nrows_source.cbegin(), partial_sum_nrows_source.cend(), start_row);
auto const end_iter =
(end_row == _file_itm_data.global_skip_rows + _file_itm_data.global_num_rows)
? partial_sum_nrows_source.cend() - 1
: std::upper_bound(start_iter, partial_sum_nrows_source.cend(), end_row);

// Compute the array offset index for both iterators
auto const start_idx = std::distance(partial_sum_nrows_source.cbegin(), start_iter);
auto const end_idx = std::distance(partial_sum_nrows_source.cbegin(), end_iter);

CUDF_EXPECTS(start_idx <= end_idx,
"Encountered invalid source files indexes for output chunk row bounds");

// If the entire chunk is from the same source file, then the count is simply num_rows
if (start_idx == end_idx) {
num_rows_per_source[start_idx] = chunk_num_rows;
} else {
// Compute the number of rows from the first source file
num_rows_per_source[start_idx] = partial_sum_nrows_source[start_idx] - start_row;
// Compute the number of rows from the last source file
num_rows_per_source[end_idx] = end_row - partial_sum_nrows_source[end_idx - 1];
// Simply copy the number of rows for each source in range: (start_idx, end_idx)
std::copy(_file_itm_data.num_rows_per_source.cbegin() + start_idx + 1,
_file_itm_data.num_rows_per_source.cbegin() + end_idx,
num_rows_per_source.begin() + start_idx + 1);
}

return num_rows_per_source;
}

table_with_metadata reader::impl::finalize_output(table_metadata& out_metadata,
table_with_metadata reader::impl::finalize_output(read_mode mode,
table_metadata& out_metadata,
std::vector<std::unique_ptr<column>>& out_columns)
{
// Create empty columns as needed (this can happen if we've ended up with no actual data to read)
Expand Down
31 changes: 29 additions & 2 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,13 @@ class reader::impl {
* @brief Finalize the output table by adding empty columns for the non-selected columns in
* schema.
*
* @param read_mode Value indicating if the data sources are read all at once or chunk by chunk
* @param out_metadata The output table metadata
* @param out_columns The columns for building the output table
* @return The output table along with columns' metadata
*/
table_with_metadata finalize_output(table_metadata& out_metadata,
table_with_metadata finalize_output(read_mode mode,
table_metadata& out_metadata,
std::vector<std::unique_ptr<column>>& out_columns);

/**
Expand Down Expand Up @@ -336,11 +338,36 @@ class reader::impl {
: true;
}

/**
* @brief Check if this is the first output chunk
*
* @return True if this is the first output chunk
*/
[[nodiscard]] bool is_first_output_chunk() const
{
return _file_itm_data._output_chunk_count == 0;
}

/**
* @brief Check if number of rows per source should be included in output metadata.
*
* @return True if AST filter is not present
*/
[[nodiscard]] bool include_output_num_rows_per_source() const
{
return not _expr_conv.get_converted_expr().has_value();
}

/**
* @brief Calculate the number of rows read from each source in the output chunk
*
* @param chunk_start_row The offset of the first row in the output chunk
* @param chunk_num_rows The number of rows in the the output chunk
* @return Vector of number of rows from each respective data source in the output chunk
*/
[[nodiscard]] std::vector<size_t> calculate_output_num_rows_per_source(size_t chunk_start_row,
size_t chunk_num_rows);

rmm::cuda_stream_view _stream;
rmm::device_async_resource_ref _mr{rmm::mr::get_current_device_resource()};

Expand Down Expand Up @@ -387,7 +414,7 @@ class reader::impl {

// chunked reading happens in 2 parts:
//
// At the top level, the entire file is divided up into "passes" omn which we try and limit the
// At the top level, the entire file is divided up into "passes" on which we try and limit the
// total amount of temporary memory (compressed data, decompressed data) in use
// via _input_pass_read_limit.
//
Expand Down
53 changes: 35 additions & 18 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1232,22 +1232,22 @@ void reader::impl::setup_next_pass(read_mode mode)
pass.skip_rows = _file_itm_data.global_skip_rows;
pass.num_rows = _file_itm_data.global_num_rows;
} else {
auto const global_start_row = _file_itm_data.global_skip_rows;
auto const global_end_row = global_start_row + _file_itm_data.global_num_rows;
auto const start_row =
std::max(_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass],
global_start_row);
auto const end_row =
std::min(_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass + 1],
global_end_row);

// skip_rows is always global in the sense that it is relative to the first row of
// everything we will be reading, regardless of what pass we are on.
// num_rows is how many rows we are reading this pass.
pass.skip_rows =
global_start_row +
// pass_start_row and pass_end_row are computed from the selected row groups relative to the
// global_skip_rows.
auto const pass_start_row =
_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass];
pass.num_rows = end_row - start_row;
auto const pass_end_row =
std::min(_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass + 1],
_file_itm_data.global_num_rows);

// pass.skip_rows is always global in the sense that it is relative to the first row of
// the data source (global row number 0), regardless of what pass we are on. Therefore,
// we must re-add global_skip_rows to the pass_start_row which is relative to the
// global_skip_rows.
pass.skip_rows = _file_itm_data.global_skip_rows + pass_start_row;
// num_rows is how many rows we are reading this pass. Since this is a difference, adding
// global_skip_rows to both variables is redundant.
pass.num_rows = pass_end_row - pass_start_row;
}

// load page information for the chunk. this retrieves the compressed bytes for all the
Expand Down Expand Up @@ -1509,6 +1509,7 @@ void reader::impl::create_global_chunk_info()

// Initialize column chunk information
auto remaining_rows = num_rows;
auto skip_rows = _file_itm_data.global_skip_rows;
for (auto const& rg : row_groups_info) {
auto const& row_group = _metadata->get_row_group(rg.index, rg.source_index);
auto const row_group_start = rg.start_row;
Expand Down Expand Up @@ -1561,7 +1562,12 @@ void reader::impl::create_global_chunk_info()
schema.type == BYTE_ARRAY and _strings_to_categorical));
}

remaining_rows -= row_group_rows;
// Adjust for skip_rows when updating the remaining rows after the first group
remaining_rows -=
(skip_rows) ? std::min<int>(rg.start_row + row_group.num_rows - skip_rows, remaining_rows)
: row_group_rows;
// Set skip_rows = 0 as it is no longer needed for subsequent row_groups
skip_rows = 0;
}
}

Expand Down Expand Up @@ -1598,6 +1604,9 @@ void reader::impl::compute_input_passes()
_file_itm_data.input_pass_row_group_offsets.push_back(0);
_file_itm_data.input_pass_start_row_count.push_back(0);

// To handle global_skip_rows when computing input passes
int skip_rows = _file_itm_data.global_skip_rows;

for (size_t cur_rg_index = 0; cur_rg_index < row_groups_info.size(); cur_rg_index++) {
auto const& rgi = row_groups_info[cur_rg_index];
auto const& row_group = _metadata->get_row_group(rgi.index, rgi.source_index);
Expand All @@ -1606,14 +1615,22 @@ void reader::impl::compute_input_passes()
auto const [compressed_rg_size, _ /*compressed + uncompressed*/] =
get_row_group_size(row_group);

// We must use the effective size of the first row group we are reading to accurately calculate
// the first non-zero input_pass_start_row_count.
auto const row_group_rows =
(skip_rows) ? rgi.start_row + row_group.num_rows - skip_rows : row_group.num_rows;

// Set skip_rows = 0 as it is no longer needed for subsequent row_groups
skip_rows = 0;

// can we add this row group
if (cur_pass_byte_size + compressed_rg_size >= comp_read_limit) {
// A single row group (the current one) is larger than the read limit:
// We always need to include at least one row group, so end the pass at the end of the current
// row group
if (cur_rg_start == cur_rg_index) {
_file_itm_data.input_pass_row_group_offsets.push_back(cur_rg_index + 1);
_file_itm_data.input_pass_start_row_count.push_back(cur_row_count + row_group.num_rows);
_file_itm_data.input_pass_start_row_count.push_back(cur_row_count + row_group_rows);
cur_rg_start = cur_rg_index + 1;
cur_pass_byte_size = 0;
}
Expand All @@ -1627,7 +1644,7 @@ void reader::impl::compute_input_passes()
} else {
cur_pass_byte_size += compressed_rg_size;
}
cur_row_count += row_group.num_rows;
cur_row_count += row_group_rows;
}

// add the last pass if necessary
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/io/parquet/reader_impl_chunking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ struct file_intermediate_data {
// is not capped by global_skip_rows and global_num_rows.
std::vector<std::size_t> input_pass_start_row_count{};

// number of rows to be read from each data source
std::vector<std::size_t> num_rows_per_source{};

// partial sum of the number of rows per data source
std::vector<std::size_t> exclusive_sum_num_rows_per_source{};

size_t _current_input_pass{0}; // current input pass index
size_t _output_chunk_count{0}; // how many output chunks we have produced

Expand Down
32 changes: 26 additions & 6 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ std::vector<std::string> aggregate_reader_metadata::get_pandas_index_names() con
return names;
}

std::tuple<int64_t, size_type, std::vector<row_group_info>>
std::tuple<int64_t, size_type, std::vector<row_group_info>, std::vector<size_t>>
aggregate_reader_metadata::select_row_groups(
host_span<std::vector<size_type> const> row_group_indices,
int64_t skip_rows_opt,
Expand Down Expand Up @@ -976,6 +976,9 @@ aggregate_reader_metadata::select_row_groups(
static_cast<size_type>(from_opts.second)};
}();

// Get number of rows in each data source
std::vector<size_t> num_rows_per_source(per_file_metadata.size(), 0);

if (!row_group_indices.empty()) {
CUDF_EXPECTS(row_group_indices.size() == per_file_metadata.size(),
"Must specify row groups for each source");
Expand All @@ -989,28 +992,45 @@ aggregate_reader_metadata::select_row_groups(
selection.emplace_back(rowgroup_idx, rows_to_read, src_idx);
// if page-level indexes are present, then collect extra chunk and page info.
column_info_for_row_group(selection.back(), 0);
rows_to_read += get_row_group(rowgroup_idx, src_idx).num_rows;
auto const rows_this_rg = get_row_group(rowgroup_idx, src_idx).num_rows;
rows_to_read += rows_this_rg;
num_rows_per_source[src_idx] += rows_this_rg;
}
}
} else {
size_type count = 0;
for (size_t src_idx = 0; src_idx < per_file_metadata.size(); ++src_idx) {
auto const& fmd = per_file_metadata[src_idx];
for (size_t rg_idx = 0; rg_idx < fmd.row_groups.size(); ++rg_idx) {
for (size_t rg_idx = 0;
rg_idx < fmd.row_groups.size() and count < rows_to_skip + rows_to_read;
++rg_idx) {
auto const& rg = fmd.row_groups[rg_idx];
auto const chunk_start_row = count;
count += rg.num_rows;
if (count > rows_to_skip || count == 0) {
// start row of this row group adjusted with rows_to_skip
num_rows_per_source[src_idx] += count;
num_rows_per_source[src_idx] -=
(chunk_start_row <= rows_to_skip) ? rows_to_skip : chunk_start_row;

// We need the unadjusted start index of this row group to correctly initialize
// ColumnChunkDesc for this row group in create_global_chunk_info() and calculate
// the row offset for the first pass in compute_input_passes().
selection.emplace_back(rg_idx, chunk_start_row, src_idx);
// if page-level indexes are present, then collect extra chunk and page info.

// If page-level indexes are present, then collect extra chunk and page info.
// The page indexes rely on absolute row numbers, not adjusted for skip_rows.
column_info_for_row_group(selection.back(), chunk_start_row);
}
if (count >= rows_to_skip + rows_to_read) { break; }
// Adjust the number of rows for the last source file.
if (count >= rows_to_skip + rows_to_read) {
num_rows_per_source[src_idx] -= count - rows_to_skip - rows_to_read;
}
}
}
}

return {rows_to_skip, rows_to_read, std::move(selection)};
return {rows_to_skip, rows_to_read, std::move(selection), std::move(num_rows_per_source)};
}

std::tuple<std::vector<input_column_info>,
Expand Down
20 changes: 10 additions & 10 deletions cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,17 +282,17 @@ class aggregate_reader_metadata {
* @param output_column_schemas schema indices of output columns
* @param filter Optional AST expression to filter row groups based on Column chunk statistics
* @param stream CUDA stream used for device memory operations and kernel launches
* @return A tuple of corrected row_start, row_count and list of row group indexes and its
* starting row
* @return A tuple of corrected row_start, row_count, list of row group indexes and its
* starting row, and list of number of rows per source.
*/
[[nodiscard]] std::tuple<int64_t, size_type, std::vector<row_group_info>> select_row_groups(
host_span<std::vector<size_type> const> row_group_indices,
int64_t row_start,
std::optional<size_type> const& row_count,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
std::optional<std::reference_wrapper<ast::expression const>> filter,
rmm::cuda_stream_view stream) const;
[[nodiscard]] std::tuple<int64_t, size_type, std::vector<row_group_info>, std::vector<size_t>>
select_row_groups(host_span<std::vector<size_type> const> row_group_indices,
int64_t row_start,
std::optional<size_type> const& row_count,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
std::optional<std::reference_wrapper<ast::expression const>> filter,
rmm::cuda_stream_view stream) const;

/**
* @brief Filters and reduces down to a selection of columns
Expand Down
Loading

0 comments on commit 75335f6

Please sign in to comment.