Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for encodings listed in the Parquet column chunk metadata #13907

Merged
merged 21 commits into from
Aug 26, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,16 @@ Encoding __device__ determine_encoding(PageType page_type,
}
}

// operator to use with warp_reduce. stolen from cub::Sum
struct Or {
vuule marked this conversation as resolved.
Show resolved Hide resolved
/// Binary OR operator, returns <tt>a | b</tt>
template <typename T>
__host__ __device__ __forceinline__ T operator()(const T& a, const T& b) const
etseidl marked this conversation as resolved.
Show resolved Hide resolved
{
return a | b;
}
};

} // anonymous namespace

// blockDim {512,1,1}
Expand Down Expand Up @@ -1445,6 +1455,7 @@ __global__ void __launch_bounds__(decide_compression_block_size)

uint32_t uncompressed_data_size = 0;
uint32_t compressed_data_size = 0;
uint32_t encodings = 0;
auto const num_pages = ck_g[warp_id].num_pages;
for (auto page_id = lane_id; page_id < num_pages; page_id += cudf::detail::warp_size) {
auto const& curr_page = ck_g[warp_id].pages[page_id];
Expand All @@ -1457,10 +1468,14 @@ __global__ void __launch_bounds__(decide_compression_block_size)
atomicOr(&compression_error[warp_id], 1);
}
}
// collect encoding info for the chunk metadata
encodings |= encoding_to_mask(curr_page.encoding);
}
uncompressed_data_size = warp_reduce(temp_storage[warp_id][0]).Sum(uncompressed_data_size);
compressed_data_size = warp_reduce(temp_storage[warp_id][1]).Sum(compressed_data_size);
__syncwarp();
encodings = warp_reduce(temp_storage[warp_id][0]).Reduce(encodings, Or{});
__syncwarp();

if (lane_id == 0) {
auto const write_compressed = compressed_data_size != 0 and compression_error[warp_id] == 0 and
Expand All @@ -1469,6 +1484,12 @@ __global__ void __launch_bounds__(decide_compression_block_size)
chunks[chunk_id].bfr_size = uncompressed_data_size;
chunks[chunk_id].compressed_size =
write_compressed ? compressed_data_size : uncompressed_data_size;

// if there is repetition or definition level data add RLE encoding
auto const rle_bits =
ck_g[warp_id].col_desc->num_def_level_bits() + ck_g[warp_id].col_desc->num_rep_level_bits();
if (rle_bits > 0) { encodings |= EncodingMask::RLE; }
chunks[chunk_id].encodings = encodings;
}
}

Expand Down
29 changes: 27 additions & 2 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ struct parquet_column_device_view : stats_column_desc {
ConvertedType converted_type; //!< logical data type
uint8_t level_bits; //!< bits to encode max definition (lower nibble) & repetition (upper nibble)
//!< levels
constexpr uint8_t num_def_level_bits() { return level_bits & 0xf; }
constexpr uint8_t num_rep_level_bits() { return level_bits >> 4; }
constexpr uint8_t num_def_level_bits() const { return level_bits & 0xf; }
constexpr uint8_t num_rep_level_bits() const { return level_bits >> 4; }
size_type const* const*
nesting_offsets; //!< If column is a nested type, contains offset array of each nesting level

Expand Down Expand Up @@ -365,6 +365,28 @@ constexpr size_t kDictScratchSize = (1 << kDictHashBits) * sizeof(uint32_t);
struct EncPage;
struct slot_type;

// convert Encoding to a mask value
constexpr uint32_t encoding_to_mask(Encoding encoding)
{
return 1 << static_cast<uint32_t>(encoding);
vuule marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @brief Encoding values as mask bits
*/
enum EncodingMask {
PLAIN = encoding_to_mask(Encoding::PLAIN),
GROUP_VAR_INT = encoding_to_mask(Encoding::GROUP_VAR_INT),
PLAIN_DICTIONARY = encoding_to_mask(Encoding::PLAIN_DICTIONARY),
RLE = encoding_to_mask(Encoding::RLE),
BIT_PACKED = encoding_to_mask(Encoding::BIT_PACKED),
DELTA_BINARY_PACKED = encoding_to_mask(Encoding::DELTA_BINARY_PACKED),
DELTA_LENGTH_BYTE_ARRAY = encoding_to_mask(Encoding::DELTA_LENGTH_BYTE_ARRAY),
DELTA_BYTE_ARRAY = encoding_to_mask(Encoding::DELTA_BYTE_ARRAY),
RLE_DICTIONARY = encoding_to_mask(Encoding::RLE_DICTIONARY),
BYTE_STREAM_SPLIT = encoding_to_mask(Encoding::BYTE_STREAM_SPLIT)
};

/**
* @brief Struct describing an encoder column chunk
*/
Expand Down Expand Up @@ -401,6 +423,7 @@ struct EncColumnChunk {
bool use_dictionary; //!< True if the chunk uses dictionary encoding
uint8_t* column_index_blob; //!< Binary blob containing encoded column index for this chunk
uint32_t column_index_size; //!< Size of column index blob
uint32_t encodings; //!< Mask representing the set of encodings used for this chunk
};

/**
Expand Down Expand Up @@ -693,6 +716,8 @@ void EncodePages(device_span<EncPage> pages,
/**
* @brief Launches kernel to make the compressed vs uncompressed chunk-level decision
*
* Also calculates the set of page encodings used for each chunk.
*
* @param[in,out] chunks Column chunks (updated with actual compressed/uncompressed sizes)
* @param[in] stream CUDA stream to use
*/
Expand Down
43 changes: 31 additions & 12 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,34 @@ parquet::Compression to_parquet_compression(compression_type compression)
}
}

/**
* @brief Convert a mask of encodings to a vector.
*
* @param encodings Vector of `Encoding`s to populate
* @param enc_mask Mask of encodings used
*/
void update_chunk_encodings(std::vector<Encoding>& encodings, uint32_t enc_mask)
vuule marked this conversation as resolved.
Show resolved Hide resolved
{
// only convert the encodings we support
if ((enc_mask & gpu::EncodingMask::PLAIN) != 0) { encodings.push_back(Encoding::PLAIN); }
if ((enc_mask & gpu::EncodingMask::PLAIN_DICTIONARY) != 0) {
encodings.push_back(Encoding::PLAIN_DICTIONARY);
}
if ((enc_mask & gpu::EncodingMask::RLE) != 0) { encodings.push_back(Encoding::RLE); }
if ((enc_mask & gpu::EncodingMask::DELTA_BINARY_PACKED) != 0) {
encodings.push_back(Encoding::DELTA_BINARY_PACKED);
}
if ((enc_mask & gpu::EncodingMask::DELTA_LENGTH_BYTE_ARRAY) != 0) {
encodings.push_back(Encoding::DELTA_LENGTH_BYTE_ARRAY);
}
if ((enc_mask & gpu::EncodingMask::DELTA_BYTE_ARRAY) != 0) {
encodings.push_back(Encoding::DELTA_BYTE_ARRAY);
}
if ((enc_mask & gpu::EncodingMask::RLE_DICTIONARY) != 0) {
encodings.push_back(Encoding::RLE_DICTIONARY);
}
}

/**
* @brief Compute size (in bytes) of the data stored in the given column.
*
Expand Down Expand Up @@ -1671,6 +1699,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
ck.start_row = start_row;
ck.num_rows = (uint32_t)row_group.num_rows;
ck.first_fragment = c * num_fragments + f;
ck.encodings = 0;
auto chunk_fragments = row_group_fragments[c].subspan(f, fragments_in_chunk);
// In fragment struct, add a pointer to the chunk it belongs to
// In each fragment in chunk_fragments, update the chunk pointer here.
Expand All @@ -1687,7 +1716,6 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
});
auto& column_chunk_meta = row_group.columns[c].meta_data;
column_chunk_meta.type = parquet_columns[c].physical_type();
column_chunk_meta.encodings = {Encoding::PLAIN, Encoding::RLE};
column_chunk_meta.path_in_schema = parquet_columns[c].get_path_in_schema();
column_chunk_meta.codec = UNCOMPRESSED;
column_chunk_meta.num_values = ck.num_values;
Expand All @@ -1703,17 +1731,6 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
row_group_fragments.host_to_device_async(stream);
[[maybe_unused]] auto dict_info_owner = build_chunk_dictionaries(
chunks, col_desc, row_group_fragments, compression, dict_policy, max_dictionary_size, stream);
for (size_t p = 0; p < partitions.size(); p++) {
for (int rg = 0; rg < num_rg_in_part[p]; rg++) {
size_t global_rg = global_rowgroup_base[p] + rg;
for (int col = 0; col < num_columns; col++) {
if (chunks.host_view()[rg][col].use_dictionary) {
agg_meta->file(p).row_groups[global_rg].columns[col].meta_data.encodings.push_back(
vuule marked this conversation as resolved.
Show resolved Hide resolved
Encoding::PLAIN_DICTIONARY);
}
}
}
}

// The code preceding this used a uniform fragment size for all columns. Now recompute
// fragments with a (potentially) varying number of fragments per column.
Expand Down Expand Up @@ -1949,6 +1966,8 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
}
max_write_size = std::max(max_write_size, ck.compressed_size);

update_chunk_encodings(column_chunk_meta.encodings, ck.encodings);

if (ck.ck_stat_size != 0) {
std::vector<uint8_t> const stats_blob = cudf::detail::make_std_vector_sync(
device_span<uint8_t const>(dev_bfr, ck.ck_stat_size), stream);
Expand Down
73 changes: 73 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6599,4 +6599,77 @@ TEST_F(ParquetWriterTest, TimestampMicrosINT96NoOverflow)
CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());
}

TEST_P(ParquetV2Test, CheckEncodings)
{
using cudf::io::parquet::Encoding;
constexpr auto num_rows = 100'000;
auto const is_v2 = GetParam();

auto const validity = cudf::test::iterators::no_nulls();
// data should be PLAIN for v1, RLE for V2
auto col0_data =
cudf::detail::make_counting_transform_iterator(0, [](auto i) -> bool { return i % 2 == 0; });
// data should be PLAIN for both
auto col1_data = random_values<int32_t>(num_rows);
// data should be PLAIN_DICTIONARY for v1, PLAIN and RLE_DICTIONARY for v2
auto col2_data = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return 1; });

cudf::test::fixed_width_column_wrapper<bool> col0{col0_data, col0_data + num_rows, validity};
column_wrapper<int32_t> col1{col1_data.begin(), col1_data.end(), validity};
column_wrapper<int32_t> col2{col2_data, col2_data + num_rows, validity};

auto expected = table_view{{col0, col1, col2}};

auto const filename = is_v2 ? "CheckEncodingsV2.parquet" : "CheckEncodingsV1.parquet";
auto filepath = temp_env->get_temp_filepath(filename);
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected)
.max_page_size_rows(num_rows)
.write_v2_headers(is_v2);
cudf::io::write_parquet(out_opts);

// make sure the expected encodings are present
auto contains = [](auto vec, auto enc) {
for (size_t i = 0; i < vec.size(); i++) {
if (vec[i] == enc) return true;
}
return false;
};
etseidl marked this conversation as resolved.
Show resolved Hide resolved

auto const source = cudf::io::datasource::create(filepath);
cudf::io::parquet::FileMetaData fmd;

read_footer(source, &fmd);
vuule marked this conversation as resolved.
Show resolved Hide resolved
auto const& chunk0_enc = fmd.row_groups[0].columns[0].meta_data.encodings;
auto const& chunk1_enc = fmd.row_groups[0].columns[1].meta_data.encodings;
auto const& chunk2_enc = fmd.row_groups[0].columns[2].meta_data.encodings;
if (is_v2) {
// col0 should have RLE for rep/def and data
EXPECT_TRUE(chunk0_enc.size() == 1);
EXPECT_TRUE(contains(chunk0_enc, Encoding::RLE));
// col1 should have RLE for rep/def and PLAIN for data
EXPECT_TRUE(chunk1_enc.size() == 2);
EXPECT_TRUE(contains(chunk1_enc, Encoding::RLE));
EXPECT_TRUE(contains(chunk1_enc, Encoding::PLAIN));
// col2 should have RLE for rep/def, PLAIN for dict, and PLAIN_DICTIONARY for data
EXPECT_TRUE(chunk2_enc.size() == 3);
EXPECT_TRUE(contains(chunk2_enc, Encoding::RLE));
EXPECT_TRUE(contains(chunk2_enc, Encoding::PLAIN));
EXPECT_TRUE(contains(chunk2_enc, Encoding::RLE_DICTIONARY));
} else {
// col0 should have RLE for rep/def and PLAIN for data
EXPECT_TRUE(chunk0_enc.size() == 2);
EXPECT_TRUE(contains(chunk0_enc, Encoding::RLE));
EXPECT_TRUE(contains(chunk0_enc, Encoding::PLAIN));
// col1 should have RLE for rep/def and PLAIN for data
EXPECT_TRUE(chunk1_enc.size() == 2);
EXPECT_TRUE(contains(chunk1_enc, Encoding::RLE));
EXPECT_TRUE(contains(chunk1_enc, Encoding::PLAIN));
// col2 should have RLE for rep/def and PLAIN_DICTIONARY for data and dict
EXPECT_TRUE(chunk2_enc.size() == 2);
EXPECT_TRUE(contains(chunk2_enc, Encoding::RLE));
EXPECT_TRUE(contains(chunk2_enc, Encoding::PLAIN_DICTIONARY));
}
}

CUDF_TEST_PROGRAM_MAIN()