Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write file-level statistics when writing ORC files with zero rows #14707

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cpp/include/cudf/io/detail/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ class writer {
* @brief Finishes the chunked/streamed write process.
*/
void close();

/**
* @brief Skip writing the footer and close the writer.
*/
void skip_close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of a better name for this function. I fear the name implies that it's the close() that's skipped.
skip_footer_write_on_close() is too long.

If we can't think of a succinct name, maybe we should keep skip_close().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omg, my updated code for this function got lost between commits.

I think skip_close is a pretty accurate name, given that we raise the _closed flag in skip_close and close() returns immediately if that flag is raised.

I'll update the docs above, sorry for the confusion.

};
} // namespace orc::detail
} // namespace cudf::io
8 changes: 7 additions & 1 deletion cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,13 @@ void write_orc(orc_writer_options const& options)
auto writer = std::make_unique<orc::detail::writer>(
std::move(sinks[0]), options, io_detail::single_write_mode::YES, cudf::get_default_stream());

writer->write(options.get_table());
try {
writer->write(options.get_table());
} catch (const std::exception& e) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
// If an exception is thrown, make sure the writer does not throw while trying to close
vuule marked this conversation as resolved.
Show resolved Hide resolved
writer->skip_close();
throw;
}
}

/**
Expand Down
76 changes: 38 additions & 38 deletions cpp/src/io/orc/stats_enc.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -282,7 +282,7 @@ __global__ void __launch_bounds__(encode_threads_per_block)
// optional sint64 maximum = 2;
// optional sint64 sum = 3;
// }
if (s->chunk.has_minmax || s->chunk.has_sum) {
{
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
*cur = 2 * 8 + ProtofType::FIXEDLEN;
cur += 2;
if (s->chunk.has_minmax) {
Expand All @@ -301,7 +301,7 @@ __global__ void __launch_bounds__(encode_threads_per_block)
// optional double maximum = 2;
// optional double sum = 3;
// }
if (s->chunk.has_minmax || s->chunk.has_sum) {
{
*cur = 3 * 8 + ProtofType::FIXEDLEN;
cur += 2;
if (s->chunk.has_minmax) {
Expand All @@ -319,14 +319,14 @@ __global__ void __launch_bounds__(encode_threads_per_block)
// optional string maximum = 2;
// optional sint64 sum = 3; // sum will store the total length of all strings
// }
if (s->chunk.has_minmax || s->chunk.has_sum) {
{
uint32_t sz = 0;
if (s->chunk.has_minmax) {
sz += (pb_put_uint(cur, 1, s->chunk.min_value.str_val.length) - cur) +
(pb_put_uint(cur, 2, s->chunk.max_value.str_val.length) - cur) +
s->chunk.min_value.str_val.length + s->chunk.max_value.str_val.length;
}
if (s->chunk.has_sum) { sz += pb_put_int(cur, 3, s->chunk.sum.i_val) - cur; }
sz += pb_put_int(cur, 3, s->chunk.sum.i_val) - cur;

cur[0] = 4 * 8 + ProtofType::FIXEDLEN;
cur = pb_encode_uint(cur + 1, sz);
Expand All @@ -337,15 +337,15 @@ __global__ void __launch_bounds__(encode_threads_per_block)
cur = pb_put_binary(
cur, 2, s->chunk.max_value.str_val.ptr, s->chunk.max_value.str_val.length);
}
if (s->chunk.has_sum) { cur = pb_put_int(cur, 3, s->chunk.sum.i_val); }
cur = pb_put_int(cur, 3, s->chunk.sum.i_val);
}
break;
case dtype_bool:
// bucketStatistics = 5
// message BucketStatistics {
// repeated uint64 count = 1 [packed=true];
// }
if (s->chunk.has_sum) {
{
cur[0] = 5 * 8 + ProtofType::FIXEDLEN;
// count is equal to the number of 'true' values, despite what specs say
cur = pb_put_packed_uint(cur + 2, 1, s->chunk.sum.u_val);
Expand All @@ -360,7 +360,7 @@ __global__ void __launch_bounds__(encode_threads_per_block)
// optional string maximum = 2;
// optional string sum = 3;
// }
if (s->chunk.has_minmax or s->chunk.has_sum) {
{
auto const scale = s->group.col_dtype.scale();

uint32_t sz = 0;
Expand All @@ -373,9 +373,8 @@ __global__ void __launch_bounds__(encode_threads_per_block)
sz += (pb_put_uint(cur, 1, min_size) - cur) + min_size +
(pb_put_uint(cur, 1, max_size) - cur) + max_size;
}
auto const sum_size =
s->chunk.has_sum ? fixed_point_string_size(s->chunk.sum.d128_val, scale) : 0;
if (s->chunk.has_sum) { sz += (pb_put_uint(cur, 1, sum_size) - cur) + sum_size; }
auto const sum_size = fixed_point_string_size(s->chunk.sum.d128_val, scale);
sz += (pb_put_uint(cur, 1, sum_size) - cur) + sum_size;

cur[0] = 6 * 8 + ProtofType::FIXEDLEN;
cur = pb_encode_uint(cur + 1, sz);
Expand All @@ -384,9 +383,7 @@ __global__ void __launch_bounds__(encode_threads_per_block)
cur = pb_put_decimal(cur, 1, s->chunk.min_value.d128_val, scale, min_size); // minimum
cur = pb_put_decimal(cur, 2, s->chunk.max_value.d128_val, scale, max_size); // maximum
}
if (s->chunk.has_sum) {
cur = pb_put_decimal(cur, 3, s->chunk.sum.d128_val, scale, sum_size); // sum
}
cur = pb_put_decimal(cur, 3, s->chunk.sum.d128_val, scale, sum_size); // sum
}
break;
case dtype_date32:
Expand All @@ -395,11 +392,13 @@ __global__ void __launch_bounds__(encode_threads_per_block)
// optional sint32 minimum = 1;
// optional sint32 maximum = 2;
// }
if (s->chunk.has_minmax) {
{
cur[0] = 7 * 8 + ProtofType::FIXEDLEN;
cur += 2;
cur = pb_put_int(cur, 1, s->chunk.min_value.i_val);
cur = pb_put_int(cur, 2, s->chunk.max_value.i_val);
if (s->chunk.has_minmax) {
cur = pb_put_int(cur, 1, s->chunk.min_value.i_val);
cur = pb_put_int(cur, 2, s->chunk.max_value.i_val);
}
fld_start[1] = cur - (fld_start + 2);
}
break;
Expand All @@ -414,31 +413,32 @@ __global__ void __launch_bounds__(encode_threads_per_block)
// precision
// optional int32 maximumNanos = 6;
// }
if (s->chunk.has_minmax) {
{
cur[0] = 9 * 8 + ProtofType::FIXEDLEN;
cur += 2;
auto const [min_ms, min_ns_remainder] =
split_nanosecond_timestamp(s->chunk.min_value.i_val);
auto const [max_ms, max_ns_remainder] =
split_nanosecond_timestamp(s->chunk.max_value.i_val);

// minimum/maximum are the same as minimumUtc/maximumUtc as we always write files in UTC
cur = pb_put_int(cur, 1, min_ms); // minimum
cur = pb_put_int(cur, 2, max_ms); // maximum
cur = pb_put_int(cur, 3, min_ms); // minimumUtc
cur = pb_put_int(cur, 4, max_ms); // maximumUtc

if constexpr (enable_nanosecond_statistics) {
if (min_ns_remainder != DEFAULT_MIN_NANOS) {
// using uint because positive values are not zigzag encoded
cur = pb_put_uint(cur, 5, min_ns_remainder + 1); // minimumNanos
}
if (max_ns_remainder != DEFAULT_MAX_NANOS) {
// using uint because positive values are not zigzag encoded
cur = pb_put_uint(cur, 6, max_ns_remainder + 1); // maximumNanos
if (s->chunk.has_minmax) {
auto const [min_ms, min_ns_remainder] =
split_nanosecond_timestamp(s->chunk.min_value.i_val);
auto const [max_ms, max_ns_remainder] =
split_nanosecond_timestamp(s->chunk.max_value.i_val);

// minimum/maximum are the same as minimumUtc/maximumUtc as we always write files in UTC
cur = pb_put_int(cur, 1, min_ms); // minimum
cur = pb_put_int(cur, 2, max_ms); // maximum
cur = pb_put_int(cur, 3, min_ms); // minimumUtc
cur = pb_put_int(cur, 4, max_ms); // maximumUtc

if constexpr (enable_nanosecond_statistics) {
if (min_ns_remainder != DEFAULT_MIN_NANOS) {
// using uint because positive values are not zigzag encoded
cur = pb_put_uint(cur, 5, min_ns_remainder + 1); // minimumNanos
}
if (max_ns_remainder != DEFAULT_MAX_NANOS) {
// using uint because positive values are not zigzag encoded
cur = pb_put_uint(cur, 6, max_ns_remainder + 1); // maximumNanos
}
}
}

fld_start[1] = cur - (fld_start + 2);
}
break;
Expand Down
Loading
Loading