Skip to content

Commit

Permalink
Write file-level statistics when writing ORC files with zero rows (#1…
Browse files Browse the repository at this point in the history
…4707)

Fixes #14675
Write file-level statistics even when stripe-level statistics don't exist (no stripes).
Written statistics are in line with Pandas - zero sum, no min/max.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - MithunR (https://github.com/mythrocks)
  - Nghia Truong (https://github.com/ttnghia)

URL: #14707
  • Loading branch information
vuule authored Jan 8, 2024
1 parent c0aa8bb commit ba7550a
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 98 deletions.
8 changes: 8 additions & 0 deletions cpp/include/cudf/io/detail/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ class writer {
* @brief Finishes the chunked/streamed write process.
*/
void close();

/**
* @brief Skip work done in `close()`; should be called if `write()` failed.
*
* Calling skip_close() prevents the writer from writing the (invalid) file footer and the
* postscript.
*/
void skip_close();
};
} // namespace orc::detail
} // namespace cudf::io
11 changes: 10 additions & 1 deletion cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,16 @@ void write_orc(orc_writer_options const& options)
auto writer = std::make_unique<orc::detail::writer>(
std::move(sinks[0]), options, io_detail::single_write_mode::YES, cudf::get_default_stream());

writer->write(options.get_table());
try {
writer->write(options.get_table());
} catch (...) {
// If an exception is thrown, the output is incomplete/corrupted.
// Make sure the writer will not close with such corrupted data.
// In addition, the writer may throw an exception while trying to close, which would terminate
// the process.
writer->skip_close();
throw;
}
}

/**
Expand Down
76 changes: 38 additions & 38 deletions cpp/src/io/orc/stats_enc.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -282,7 +282,7 @@ __global__ void __launch_bounds__(encode_threads_per_block)
// optional sint64 maximum = 2;
// optional sint64 sum = 3;
// }
if (s->chunk.has_minmax || s->chunk.has_sum) {
{
*cur = 2 * 8 + ProtofType::FIXEDLEN;
cur += 2;
if (s->chunk.has_minmax) {
Expand All @@ -301,7 +301,7 @@ __global__ void __launch_bounds__(encode_threads_per_block)
// optional double maximum = 2;
// optional double sum = 3;
// }
if (s->chunk.has_minmax || s->chunk.has_sum) {
{
*cur = 3 * 8 + ProtofType::FIXEDLEN;
cur += 2;
if (s->chunk.has_minmax) {
Expand All @@ -319,14 +319,14 @@ __global__ void __launch_bounds__(encode_threads_per_block)
// optional string maximum = 2;
// optional sint64 sum = 3; // sum will store the total length of all strings
// }
if (s->chunk.has_minmax || s->chunk.has_sum) {
{
uint32_t sz = 0;
if (s->chunk.has_minmax) {
sz += (pb_put_uint(cur, 1, s->chunk.min_value.str_val.length) - cur) +
(pb_put_uint(cur, 2, s->chunk.max_value.str_val.length) - cur) +
s->chunk.min_value.str_val.length + s->chunk.max_value.str_val.length;
}
if (s->chunk.has_sum) { sz += pb_put_int(cur, 3, s->chunk.sum.i_val) - cur; }
sz += pb_put_int(cur, 3, s->chunk.sum.i_val) - cur;

cur[0] = 4 * 8 + ProtofType::FIXEDLEN;
cur = pb_encode_uint(cur + 1, sz);
Expand All @@ -337,15 +337,15 @@ __global__ void __launch_bounds__(encode_threads_per_block)
cur = pb_put_binary(
cur, 2, s->chunk.max_value.str_val.ptr, s->chunk.max_value.str_val.length);
}
if (s->chunk.has_sum) { cur = pb_put_int(cur, 3, s->chunk.sum.i_val); }
cur = pb_put_int(cur, 3, s->chunk.sum.i_val);
}
break;
case dtype_bool:
// bucketStatistics = 5
// message BucketStatistics {
// repeated uint64 count = 1 [packed=true];
// }
if (s->chunk.has_sum) {
{
cur[0] = 5 * 8 + ProtofType::FIXEDLEN;
// count is equal to the number of 'true' values, despite what specs say
cur = pb_put_packed_uint(cur + 2, 1, s->chunk.sum.u_val);
Expand All @@ -360,7 +360,7 @@ __global__ void __launch_bounds__(encode_threads_per_block)
// optional string maximum = 2;
// optional string sum = 3;
// }
if (s->chunk.has_minmax or s->chunk.has_sum) {
{
auto const scale = s->group.col_dtype.scale();

uint32_t sz = 0;
Expand All @@ -373,9 +373,8 @@ __global__ void __launch_bounds__(encode_threads_per_block)
sz += (pb_put_uint(cur, 1, min_size) - cur) + min_size +
(pb_put_uint(cur, 1, max_size) - cur) + max_size;
}
auto const sum_size =
s->chunk.has_sum ? fixed_point_string_size(s->chunk.sum.d128_val, scale) : 0;
if (s->chunk.has_sum) { sz += (pb_put_uint(cur, 1, sum_size) - cur) + sum_size; }
auto const sum_size = fixed_point_string_size(s->chunk.sum.d128_val, scale);
sz += (pb_put_uint(cur, 1, sum_size) - cur) + sum_size;

cur[0] = 6 * 8 + ProtofType::FIXEDLEN;
cur = pb_encode_uint(cur + 1, sz);
Expand All @@ -384,9 +383,7 @@ __global__ void __launch_bounds__(encode_threads_per_block)
cur = pb_put_decimal(cur, 1, s->chunk.min_value.d128_val, scale, min_size); // minimum
cur = pb_put_decimal(cur, 2, s->chunk.max_value.d128_val, scale, max_size); // maximum
}
if (s->chunk.has_sum) {
cur = pb_put_decimal(cur, 3, s->chunk.sum.d128_val, scale, sum_size); // sum
}
cur = pb_put_decimal(cur, 3, s->chunk.sum.d128_val, scale, sum_size); // sum
}
break;
case dtype_date32:
Expand All @@ -395,11 +392,13 @@ __global__ void __launch_bounds__(encode_threads_per_block)
// optional sint32 minimum = 1;
// optional sint32 maximum = 2;
// }
if (s->chunk.has_minmax) {
{
cur[0] = 7 * 8 + ProtofType::FIXEDLEN;
cur += 2;
cur = pb_put_int(cur, 1, s->chunk.min_value.i_val);
cur = pb_put_int(cur, 2, s->chunk.max_value.i_val);
if (s->chunk.has_minmax) {
cur = pb_put_int(cur, 1, s->chunk.min_value.i_val);
cur = pb_put_int(cur, 2, s->chunk.max_value.i_val);
}
fld_start[1] = cur - (fld_start + 2);
}
break;
Expand All @@ -414,31 +413,32 @@ __global__ void __launch_bounds__(encode_threads_per_block)
// precision
// optional int32 maximumNanos = 6;
// }
if (s->chunk.has_minmax) {
{
cur[0] = 9 * 8 + ProtofType::FIXEDLEN;
cur += 2;
auto const [min_ms, min_ns_remainder] =
split_nanosecond_timestamp(s->chunk.min_value.i_val);
auto const [max_ms, max_ns_remainder] =
split_nanosecond_timestamp(s->chunk.max_value.i_val);

// minimum/maximum are the same as minimumUtc/maximumUtc as we always write files in UTC
cur = pb_put_int(cur, 1, min_ms); // minimum
cur = pb_put_int(cur, 2, max_ms); // maximum
cur = pb_put_int(cur, 3, min_ms); // minimumUtc
cur = pb_put_int(cur, 4, max_ms); // maximumUtc

if constexpr (enable_nanosecond_statistics) {
if (min_ns_remainder != DEFAULT_MIN_NANOS) {
// using uint because positive values are not zigzag encoded
cur = pb_put_uint(cur, 5, min_ns_remainder + 1); // minimumNanos
}
if (max_ns_remainder != DEFAULT_MAX_NANOS) {
// using uint because positive values are not zigzag encoded
cur = pb_put_uint(cur, 6, max_ns_remainder + 1); // maximumNanos
if (s->chunk.has_minmax) {
auto const [min_ms, min_ns_remainder] =
split_nanosecond_timestamp(s->chunk.min_value.i_val);
auto const [max_ms, max_ns_remainder] =
split_nanosecond_timestamp(s->chunk.max_value.i_val);

// minimum/maximum are the same as minimumUtc/maximumUtc as we always write files in UTC
cur = pb_put_int(cur, 1, min_ms); // minimum
cur = pb_put_int(cur, 2, max_ms); // maximum
cur = pb_put_int(cur, 3, min_ms); // minimumUtc
cur = pb_put_int(cur, 4, max_ms); // maximumUtc

if constexpr (enable_nanosecond_statistics) {
if (min_ns_remainder != DEFAULT_MIN_NANOS) {
// using uint because positive values are not zigzag encoded
cur = pb_put_uint(cur, 5, min_ns_remainder + 1); // minimumNanos
}
if (max_ns_remainder != DEFAULT_MAX_NANOS) {
// using uint because positive values are not zigzag encoded
cur = pb_put_uint(cur, 6, max_ns_remainder + 1); // maximumNanos
}
}
}

fld_start[1] = cur - (fld_start + 2);
}
break;
Expand Down
Loading

0 comments on commit ba7550a

Please sign in to comment.