From a025db54a92ad967827ad6f6f2b251065fe09c73 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Sat, 26 Aug 2023 01:25:16 -0700 Subject: [PATCH] Fix for encodings listed in the Parquet column chunk metadata (#13907) With the addition of V2 page headers, the encodings used have also changed. This PR correctly determines the encodings used in each column chunk and writes that information to the column chunk metadata. Authors: - Ed Seidl (https://github.com/etseidl) - Nghia Truong (https://github.com/ttnghia) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/13907 --- cpp/src/io/parquet/page_enc.cu | 21 ++++++++ cpp/src/io/parquet/parquet_common.hpp | 1 + cpp/src/io/parquet/parquet_gpu.hpp | 13 ++++- cpp/src/io/parquet/writer_impl.cu | 29 ++++++----- cpp/tests/io/parquet_test.cpp | 70 +++++++++++++++++++++++++++ 5 files changed, 120 insertions(+), 14 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index d066b454840..0af561be8da 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -229,6 +229,16 @@ Encoding __device__ determine_encoding(PageType page_type, } } +// operator to use with warp_reduce. stolen from cub::Sum +struct BitwiseOr { + /// Binary OR operator, returns a | b + template + __host__ __device__ __forceinline__ T operator()(T const& a, T const& b) const + { + return a | b; + } +}; + } // anonymous namespace // blockDim {512,1,1} @@ -1445,6 +1455,7 @@ __global__ void __launch_bounds__(decide_compression_block_size) uint32_t uncompressed_data_size = 0; uint32_t compressed_data_size = 0; + uint32_t encodings = 0; auto const num_pages = ck_g[warp_id].num_pages; for (auto page_id = lane_id; page_id < num_pages; page_id += cudf::detail::warp_size) { auto const& curr_page = ck_g[warp_id].pages[page_id]; @@ -1457,10 +1468,14 @@ __global__ void __launch_bounds__(decide_compression_block_size) atomicOr(&compression_error[warp_id], 1); } } + // collect encoding info for the chunk metadata + encodings |= encoding_to_mask(curr_page.encoding); } uncompressed_data_size = warp_reduce(temp_storage[warp_id][0]).Sum(uncompressed_data_size); compressed_data_size = warp_reduce(temp_storage[warp_id][1]).Sum(compressed_data_size); __syncwarp(); + encodings = warp_reduce(temp_storage[warp_id][0]).Reduce(encodings, BitwiseOr{}); + __syncwarp(); if (lane_id == 0) { auto const write_compressed = compressed_data_size != 0 and compression_error[warp_id] == 0 and @@ -1469,6 +1484,12 @@ __global__ void __launch_bounds__(decide_compression_block_size) chunks[chunk_id].bfr_size = uncompressed_data_size; chunks[chunk_id].compressed_size = write_compressed ? compressed_data_size : uncompressed_data_size; + + // if there is repetition or definition level data add RLE encoding + auto const rle_bits = + ck_g[warp_id].col_desc->num_def_level_bits() + ck_g[warp_id].col_desc->num_rep_level_bits(); + if (rle_bits > 0) { encodings |= encoding_to_mask(Encoding::RLE); } + chunks[chunk_id].encodings = encodings; } } diff --git a/cpp/src/io/parquet/parquet_common.hpp b/cpp/src/io/parquet/parquet_common.hpp index ab6290c4ed6..5f8f1617cb9 100644 --- a/cpp/src/io/parquet/parquet_common.hpp +++ b/cpp/src/io/parquet/parquet_common.hpp @@ -92,6 +92,7 @@ enum class Encoding : uint8_t { DELTA_BYTE_ARRAY = 7, RLE_DICTIONARY = 8, BYTE_STREAM_SPLIT = 9, + NUM_ENCODINGS = 10, }; /** diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 0a8640aef26..e82b6abc13d 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -345,8 +345,8 @@ struct parquet_column_device_view : stats_column_desc { ConvertedType converted_type; //!< logical data type uint8_t level_bits; //!< bits to encode max definition (lower nibble) & repetition (upper nibble) //!< levels - constexpr uint8_t num_def_level_bits() { return level_bits & 0xf; } - constexpr uint8_t num_rep_level_bits() { return level_bits >> 4; } + constexpr uint8_t num_def_level_bits() const { return level_bits & 0xf; } + constexpr uint8_t num_rep_level_bits() const { return level_bits >> 4; } size_type const* const* nesting_offsets; //!< If column is a nested type, contains offset array of each nesting level @@ -384,6 +384,12 @@ constexpr size_t kDictScratchSize = (1 << kDictHashBits) * sizeof(uint32_t); struct EncPage; struct slot_type; +// convert Encoding to a mask value +constexpr uint32_t encoding_to_mask(Encoding encoding) +{ + return 1 << static_cast(encoding); +} + /** * @brief Struct describing an encoder column chunk */ @@ -420,6 +426,7 @@ struct EncColumnChunk { bool use_dictionary; //!< True if the chunk uses dictionary encoding uint8_t* column_index_blob; //!< Binary blob containing encoded column index for this chunk uint32_t column_index_size; //!< Size of column index blob + uint32_t encodings; //!< Mask representing the set of encodings used for this chunk }; /** @@ -748,6 +755,8 @@ void EncodePages(device_span pages, /** * @brief Launches kernel to make the compressed vs uncompressed chunk-level decision * + * Also calculates the set of page encodings used for each chunk. + * * @param[in,out] chunks Column chunks (updated with actual compressed/uncompressed sizes) * @param[in] stream CUDA stream to use */ diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index c5fc852d20b..d2976a3f5d9 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -193,6 +193,20 @@ parquet::Compression to_parquet_compression(compression_type compression) } } +/** + * @brief Convert a mask of encodings to a vector. + * + * @param encodings Vector of `Encoding`s to populate + * @param enc_mask Mask of encodings used + */ +void update_chunk_encodings(std::vector& encodings, uint32_t enc_mask) +{ + for (uint8_t enc = 0; enc < static_cast(Encoding::NUM_ENCODINGS); enc++) { + auto const enc_enum = static_cast(enc); + if ((enc_mask & gpu::encoding_to_mask(enc_enum)) != 0) { encodings.push_back(enc_enum); } + } +} + /** * @brief Compute size (in bytes) of the data stored in the given column. * @@ -1671,6 +1685,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, ck.start_row = start_row; ck.num_rows = (uint32_t)row_group.num_rows; ck.first_fragment = c * num_fragments + f; + ck.encodings = 0; auto chunk_fragments = row_group_fragments[c].subspan(f, fragments_in_chunk); // In fragment struct, add a pointer to the chunk it belongs to // In each fragment in chunk_fragments, update the chunk pointer here. @@ -1687,7 +1702,6 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, }); auto& column_chunk_meta = row_group.columns[c].meta_data; column_chunk_meta.type = parquet_columns[c].physical_type(); - column_chunk_meta.encodings = {Encoding::PLAIN, Encoding::RLE}; column_chunk_meta.path_in_schema = parquet_columns[c].get_path_in_schema(); column_chunk_meta.codec = UNCOMPRESSED; column_chunk_meta.num_values = ck.num_values; @@ -1703,17 +1717,6 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, row_group_fragments.host_to_device_async(stream); [[maybe_unused]] auto dict_info_owner = build_chunk_dictionaries( chunks, col_desc, row_group_fragments, compression, dict_policy, max_dictionary_size, stream); - for (size_t p = 0; p < partitions.size(); p++) { - for (int rg = 0; rg < num_rg_in_part[p]; rg++) { - size_t global_rg = global_rowgroup_base[p] + rg; - for (int col = 0; col < num_columns; col++) { - if (chunks.host_view()[rg][col].use_dictionary) { - agg_meta->file(p).row_groups[global_rg].columns[col].meta_data.encodings.push_back( - Encoding::PLAIN_DICTIONARY); - } - } - } - } // The code preceding this used a uniform fragment size for all columns. Now recompute // fragments with a (potentially) varying number of fragments per column. @@ -1949,6 +1952,8 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, } max_write_size = std::max(max_write_size, ck.compressed_size); + update_chunk_encodings(column_chunk_meta.encodings, ck.encodings); + if (ck.ck_stat_size != 0) { std::vector const stats_blob = cudf::detail::make_std_vector_sync( device_span(dev_bfr, ck.ck_stat_size), stream); diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 8c7d598d33f..b210452f619 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -6599,4 +6599,74 @@ TEST_F(ParquetWriterTest, TimestampMicrosINT96NoOverflow) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } +TEST_P(ParquetV2Test, CheckEncodings) +{ + using cudf::io::parquet::Encoding; + constexpr auto num_rows = 100'000; + auto const is_v2 = GetParam(); + + auto const validity = cudf::test::iterators::no_nulls(); + // data should be PLAIN for v1, RLE for V2 + auto col0_data = + cudf::detail::make_counting_transform_iterator(0, [](auto i) -> bool { return i % 2 == 0; }); + // data should be PLAIN for both + auto col1_data = random_values(num_rows); + // data should be PLAIN_DICTIONARY for v1, PLAIN and RLE_DICTIONARY for v2 + auto col2_data = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return 1; }); + + cudf::test::fixed_width_column_wrapper col0{col0_data, col0_data + num_rows, validity}; + column_wrapper col1{col1_data.begin(), col1_data.end(), validity}; + column_wrapper col2{col2_data, col2_data + num_rows, validity}; + + auto expected = table_view{{col0, col1, col2}}; + + auto const filename = is_v2 ? "CheckEncodingsV2.parquet" : "CheckEncodingsV1.parquet"; + auto filepath = temp_env->get_temp_filepath(filename); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .max_page_size_rows(num_rows) + .write_v2_headers(is_v2); + cudf::io::write_parquet(out_opts); + + // make sure the expected encodings are present + auto contains = [](auto const& vec, auto const& enc) { + return std::find(vec.begin(), vec.end(), enc) != vec.end(); + }; + + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::FileMetaData fmd; + + read_footer(source, &fmd); + auto const& chunk0_enc = fmd.row_groups[0].columns[0].meta_data.encodings; + auto const& chunk1_enc = fmd.row_groups[0].columns[1].meta_data.encodings; + auto const& chunk2_enc = fmd.row_groups[0].columns[2].meta_data.encodings; + if (is_v2) { + // col0 should have RLE for rep/def and data + EXPECT_TRUE(chunk0_enc.size() == 1); + EXPECT_TRUE(contains(chunk0_enc, Encoding::RLE)); + // col1 should have RLE for rep/def and PLAIN for data + EXPECT_TRUE(chunk1_enc.size() == 2); + EXPECT_TRUE(contains(chunk1_enc, Encoding::RLE)); + EXPECT_TRUE(contains(chunk1_enc, Encoding::PLAIN)); + // col2 should have RLE for rep/def, PLAIN for dict, and RLE_DICTIONARY for data + EXPECT_TRUE(chunk2_enc.size() == 3); + EXPECT_TRUE(contains(chunk2_enc, Encoding::RLE)); + EXPECT_TRUE(contains(chunk2_enc, Encoding::PLAIN)); + EXPECT_TRUE(contains(chunk2_enc, Encoding::RLE_DICTIONARY)); + } else { + // col0 should have RLE for rep/def and PLAIN for data + EXPECT_TRUE(chunk0_enc.size() == 2); + EXPECT_TRUE(contains(chunk0_enc, Encoding::RLE)); + EXPECT_TRUE(contains(chunk0_enc, Encoding::PLAIN)); + // col1 should have RLE for rep/def and PLAIN for data + EXPECT_TRUE(chunk1_enc.size() == 2); + EXPECT_TRUE(contains(chunk1_enc, Encoding::RLE)); + EXPECT_TRUE(contains(chunk1_enc, Encoding::PLAIN)); + // col2 should have RLE for rep/def and PLAIN_DICTIONARY for data and dict + EXPECT_TRUE(chunk2_enc.size() == 2); + EXPECT_TRUE(contains(chunk2_enc, Encoding::RLE)); + EXPECT_TRUE(contains(chunk2_enc, Encoding::PLAIN_DICTIONARY)); + } +} + CUDF_TEST_PROGRAM_MAIN()