Skip to content

Commit

Permalink
Fix for encodings listed in the Parquet column chunk metadata (#13907)
Browse files Browse the repository at this point in the history
With the addition of V2 page headers, the encodings used have also changed. This PR correctly determines the encodings used in each column chunk and writes that information to the column chunk metadata.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Nghia Truong (https://github.com/ttnghia)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Nghia Truong (https://github.com/ttnghia)

URL: #13907
  • Loading branch information
etseidl authored Aug 26, 2023
1 parent b6d08ca commit a025db5
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 14 deletions.
21 changes: 21 additions & 0 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,16 @@ Encoding __device__ determine_encoding(PageType page_type,
}
}

// operator to use with warp_reduce. stolen from cub::Sum
struct BitwiseOr {
/// Binary OR operator, returns <tt>a | b</tt>
template <typename T>
__host__ __device__ __forceinline__ T operator()(T const& a, T const& b) const
{
return a | b;
}
};

} // anonymous namespace

// blockDim {512,1,1}
Expand Down Expand Up @@ -1445,6 +1455,7 @@ __global__ void __launch_bounds__(decide_compression_block_size)

uint32_t uncompressed_data_size = 0;
uint32_t compressed_data_size = 0;
uint32_t encodings = 0;
auto const num_pages = ck_g[warp_id].num_pages;
for (auto page_id = lane_id; page_id < num_pages; page_id += cudf::detail::warp_size) {
auto const& curr_page = ck_g[warp_id].pages[page_id];
Expand All @@ -1457,10 +1468,14 @@ __global__ void __launch_bounds__(decide_compression_block_size)
atomicOr(&compression_error[warp_id], 1);
}
}
// collect encoding info for the chunk metadata
encodings |= encoding_to_mask(curr_page.encoding);
}
uncompressed_data_size = warp_reduce(temp_storage[warp_id][0]).Sum(uncompressed_data_size);
compressed_data_size = warp_reduce(temp_storage[warp_id][1]).Sum(compressed_data_size);
__syncwarp();
encodings = warp_reduce(temp_storage[warp_id][0]).Reduce(encodings, BitwiseOr{});
__syncwarp();

if (lane_id == 0) {
auto const write_compressed = compressed_data_size != 0 and compression_error[warp_id] == 0 and
Expand All @@ -1469,6 +1484,12 @@ __global__ void __launch_bounds__(decide_compression_block_size)
chunks[chunk_id].bfr_size = uncompressed_data_size;
chunks[chunk_id].compressed_size =
write_compressed ? compressed_data_size : uncompressed_data_size;

// if there is repetition or definition level data add RLE encoding
auto const rle_bits =
ck_g[warp_id].col_desc->num_def_level_bits() + ck_g[warp_id].col_desc->num_rep_level_bits();
if (rle_bits > 0) { encodings |= encoding_to_mask(Encoding::RLE); }
chunks[chunk_id].encodings = encodings;
}
}

Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/parquet_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ enum class Encoding : uint8_t {
DELTA_BYTE_ARRAY = 7,
RLE_DICTIONARY = 8,
BYTE_STREAM_SPLIT = 9,
NUM_ENCODINGS = 10,
};

/**
Expand Down
13 changes: 11 additions & 2 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,8 @@ struct parquet_column_device_view : stats_column_desc {
ConvertedType converted_type; //!< logical data type
uint8_t level_bits; //!< bits to encode max definition (lower nibble) & repetition (upper nibble)
//!< levels
constexpr uint8_t num_def_level_bits() { return level_bits & 0xf; }
constexpr uint8_t num_rep_level_bits() { return level_bits >> 4; }
constexpr uint8_t num_def_level_bits() const { return level_bits & 0xf; }
constexpr uint8_t num_rep_level_bits() const { return level_bits >> 4; }
size_type const* const*
nesting_offsets; //!< If column is a nested type, contains offset array of each nesting level

Expand Down Expand Up @@ -384,6 +384,12 @@ constexpr size_t kDictScratchSize = (1 << kDictHashBits) * sizeof(uint32_t);
struct EncPage;
struct slot_type;

// convert Encoding to a mask value
constexpr uint32_t encoding_to_mask(Encoding encoding)
{
return 1 << static_cast<uint32_t>(encoding);
}

/**
* @brief Struct describing an encoder column chunk
*/
Expand Down Expand Up @@ -420,6 +426,7 @@ struct EncColumnChunk {
bool use_dictionary; //!< True if the chunk uses dictionary encoding
uint8_t* column_index_blob; //!< Binary blob containing encoded column index for this chunk
uint32_t column_index_size; //!< Size of column index blob
uint32_t encodings; //!< Mask representing the set of encodings used for this chunk
};

/**
Expand Down Expand Up @@ -748,6 +755,8 @@ void EncodePages(device_span<EncPage> pages,
/**
* @brief Launches kernel to make the compressed vs uncompressed chunk-level decision
*
* Also calculates the set of page encodings used for each chunk.
*
* @param[in,out] chunks Column chunks (updated with actual compressed/uncompressed sizes)
* @param[in] stream CUDA stream to use
*/
Expand Down
29 changes: 17 additions & 12 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,20 @@ parquet::Compression to_parquet_compression(compression_type compression)
}
}

/**
* @brief Convert a mask of encodings to a vector.
*
* @param encodings Vector of `Encoding`s to populate
* @param enc_mask Mask of encodings used
*/
void update_chunk_encodings(std::vector<Encoding>& encodings, uint32_t enc_mask)
{
for (uint8_t enc = 0; enc < static_cast<uint8_t>(Encoding::NUM_ENCODINGS); enc++) {
auto const enc_enum = static_cast<Encoding>(enc);
if ((enc_mask & gpu::encoding_to_mask(enc_enum)) != 0) { encodings.push_back(enc_enum); }
}
}

/**
* @brief Compute size (in bytes) of the data stored in the given column.
*
Expand Down Expand Up @@ -1671,6 +1685,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
ck.start_row = start_row;
ck.num_rows = (uint32_t)row_group.num_rows;
ck.first_fragment = c * num_fragments + f;
ck.encodings = 0;
auto chunk_fragments = row_group_fragments[c].subspan(f, fragments_in_chunk);
// In fragment struct, add a pointer to the chunk it belongs to
// In each fragment in chunk_fragments, update the chunk pointer here.
Expand All @@ -1687,7 +1702,6 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
});
auto& column_chunk_meta = row_group.columns[c].meta_data;
column_chunk_meta.type = parquet_columns[c].physical_type();
column_chunk_meta.encodings = {Encoding::PLAIN, Encoding::RLE};
column_chunk_meta.path_in_schema = parquet_columns[c].get_path_in_schema();
column_chunk_meta.codec = UNCOMPRESSED;
column_chunk_meta.num_values = ck.num_values;
Expand All @@ -1703,17 +1717,6 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
row_group_fragments.host_to_device_async(stream);
[[maybe_unused]] auto dict_info_owner = build_chunk_dictionaries(
chunks, col_desc, row_group_fragments, compression, dict_policy, max_dictionary_size, stream);
for (size_t p = 0; p < partitions.size(); p++) {
for (int rg = 0; rg < num_rg_in_part[p]; rg++) {
size_t global_rg = global_rowgroup_base[p] + rg;
for (int col = 0; col < num_columns; col++) {
if (chunks.host_view()[rg][col].use_dictionary) {
agg_meta->file(p).row_groups[global_rg].columns[col].meta_data.encodings.push_back(
Encoding::PLAIN_DICTIONARY);
}
}
}
}

// The code preceding this used a uniform fragment size for all columns. Now recompute
// fragments with a (potentially) varying number of fragments per column.
Expand Down Expand Up @@ -1949,6 +1952,8 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
}
max_write_size = std::max(max_write_size, ck.compressed_size);

update_chunk_encodings(column_chunk_meta.encodings, ck.encodings);

if (ck.ck_stat_size != 0) {
std::vector<uint8_t> const stats_blob = cudf::detail::make_std_vector_sync(
device_span<uint8_t const>(dev_bfr, ck.ck_stat_size), stream);
Expand Down
70 changes: 70 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6599,4 +6599,74 @@ TEST_F(ParquetWriterTest, TimestampMicrosINT96NoOverflow)
CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());
}

TEST_P(ParquetV2Test, CheckEncodings)
{
using cudf::io::parquet::Encoding;
constexpr auto num_rows = 100'000;
auto const is_v2 = GetParam();

auto const validity = cudf::test::iterators::no_nulls();
// data should be PLAIN for v1, RLE for V2
auto col0_data =
cudf::detail::make_counting_transform_iterator(0, [](auto i) -> bool { return i % 2 == 0; });
// data should be PLAIN for both
auto col1_data = random_values<int32_t>(num_rows);
// data should be PLAIN_DICTIONARY for v1, PLAIN and RLE_DICTIONARY for v2
auto col2_data = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return 1; });

cudf::test::fixed_width_column_wrapper<bool> col0{col0_data, col0_data + num_rows, validity};
column_wrapper<int32_t> col1{col1_data.begin(), col1_data.end(), validity};
column_wrapper<int32_t> col2{col2_data, col2_data + num_rows, validity};

auto expected = table_view{{col0, col1, col2}};

auto const filename = is_v2 ? "CheckEncodingsV2.parquet" : "CheckEncodingsV1.parquet";
auto filepath = temp_env->get_temp_filepath(filename);
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected)
.max_page_size_rows(num_rows)
.write_v2_headers(is_v2);
cudf::io::write_parquet(out_opts);

// make sure the expected encodings are present
auto contains = [](auto const& vec, auto const& enc) {
return std::find(vec.begin(), vec.end(), enc) != vec.end();
};

auto const source = cudf::io::datasource::create(filepath);
cudf::io::parquet::FileMetaData fmd;

read_footer(source, &fmd);
auto const& chunk0_enc = fmd.row_groups[0].columns[0].meta_data.encodings;
auto const& chunk1_enc = fmd.row_groups[0].columns[1].meta_data.encodings;
auto const& chunk2_enc = fmd.row_groups[0].columns[2].meta_data.encodings;
if (is_v2) {
// col0 should have RLE for rep/def and data
EXPECT_TRUE(chunk0_enc.size() == 1);
EXPECT_TRUE(contains(chunk0_enc, Encoding::RLE));
// col1 should have RLE for rep/def and PLAIN for data
EXPECT_TRUE(chunk1_enc.size() == 2);
EXPECT_TRUE(contains(chunk1_enc, Encoding::RLE));
EXPECT_TRUE(contains(chunk1_enc, Encoding::PLAIN));
// col2 should have RLE for rep/def, PLAIN for dict, and RLE_DICTIONARY for data
EXPECT_TRUE(chunk2_enc.size() == 3);
EXPECT_TRUE(contains(chunk2_enc, Encoding::RLE));
EXPECT_TRUE(contains(chunk2_enc, Encoding::PLAIN));
EXPECT_TRUE(contains(chunk2_enc, Encoding::RLE_DICTIONARY));
} else {
// col0 should have RLE for rep/def and PLAIN for data
EXPECT_TRUE(chunk0_enc.size() == 2);
EXPECT_TRUE(contains(chunk0_enc, Encoding::RLE));
EXPECT_TRUE(contains(chunk0_enc, Encoding::PLAIN));
// col1 should have RLE for rep/def and PLAIN for data
EXPECT_TRUE(chunk1_enc.size() == 2);
EXPECT_TRUE(contains(chunk1_enc, Encoding::RLE));
EXPECT_TRUE(contains(chunk1_enc, Encoding::PLAIN));
// col2 should have RLE for rep/def and PLAIN_DICTIONARY for data and dict
EXPECT_TRUE(chunk2_enc.size() == 2);
EXPECT_TRUE(contains(chunk2_enc, Encoding::RLE));
EXPECT_TRUE(contains(chunk2_enc, Encoding::PLAIN_DICTIONARY));
}
}

CUDF_TEST_PROGRAM_MAIN()

0 comments on commit a025db5

Please sign in to comment.