Skip to content

Commit

Permalink
Merge branch 'branch-24.08' into pylibcudf-io-writers
Browse files Browse the repository at this point in the history
  • Loading branch information
lithomas1 authored Jun 17, 2024
2 parents e242182 + 87f6a7e commit 564358f
Show file tree
Hide file tree
Showing 109 changed files with 1,645 additions and 1,113 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pr_issue_status_automation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
update-status:
# This job sets the PR and its linked issues to "In Progress" status
uses: rapidsai/shared-workflows/.github/workflows/project-get-set-single-select-field.yaml@branch-24.08
if: github.event.pull_request.state == 'open'
if: ${{ github.event.pull_request.state == 'open' && needs.get-project-id.outputs.ITEM_PROJECT_ID != '' }}
needs: get-project-id
with:
PROJECT_ID: "PVT_kwDOAp2shc4AiNzl"
Expand All @@ -51,7 +51,7 @@ jobs:
update-sprint:
# This job sets the PR and its linked issues to the current "Weekly Sprint"
uses: rapidsai/shared-workflows/.github/workflows/project-get-set-iteration-field.yaml@branch-24.08
if: github.event.pull_request.state == 'open'
if: ${{ github.event.pull_request.state == 'open' && needs.get-project-id.outputs.ITEM_PROJECT_ID != '' }}
needs: get-project-id
with:
PROJECT_ID: "PVT_kwDOAp2shc4AiNzl"
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ repos:
(?x)^(
^cpp/src/io/parquet/ipc/Schema_generated.h|
^cpp/src/io/parquet/ipc/Message_generated.h|
^cpp/include/cudf_test/cxxopts.hpp|
^cpp/include/cudf_test/cxxopts.hpp
)
- repo: https://github.com/sirosen/texthooks
rev: 0.6.6
Expand Down
11 changes: 10 additions & 1 deletion ci/cudf_pandas_scripts/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

set -eoxu pipefail

RAPIDS_TESTS_DIR=${RAPIDS_TESTS_DIR:-"${PWD}/test-results"}
RAPIDS_COVERAGE_DIR=${RAPIDS_COVERAGE_DIR:-"${PWD}/coverage-results"}
mkdir -p "${RAPIDS_TESTS_DIR}" "${RAPIDS_COVERAGE_DIR}"

# Function to display script usage
function display_usage {
echo "Usage: $0 [--no-cudf]"
Expand Down Expand Up @@ -36,4 +40,9 @@ else
python -m pip install $(ls ./local-cudf-dep/cudf*.whl)[test,cudf-pandas-tests]
fi

python -m pytest -p cudf.pandas ./python/cudf/cudf_pandas_tests/
python -m pytest -p cudf.pandas \
--cov-config=./python/cudf/.coveragerc \
--cov=cudf \
--cov-report=xml:"${RAPIDS_COVERAGE_DIR}/cudf-pandas-coverage.xml" \
--cov-report=term \
./python/cudf/cudf_pandas_tests/
5 changes: 5 additions & 0 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ ConfigureNVBench(PARQUET_MULTITHREAD_READER_NVBENCH io/parquet/parquet_reader_mu
# * orc reader benchmark --------------------------------------------------------------------------
ConfigureNVBench(ORC_READER_NVBENCH io/orc/orc_reader_input.cpp io/orc/orc_reader_options.cpp)

# ##################################################################################################
# * orc multithreaded benchmark
# --------------------------------------------------------------------------
ConfigureNVBench(ORC_MULTITHREADED_NVBENCH io/orc/orc_reader_multithreaded.cpp)

# ##################################################################################################
# * csv reader benchmark --------------------------------------------------------------------------
ConfigureNVBench(CSV_READER_NVBENCH io/csv/csv_reader_input.cpp io/csv/csv_reader_options.cpp)
Expand Down
336 changes: 336 additions & 0 deletions cpp/benchmarks/io/orc/orc_reader_multithreaded.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,336 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <benchmarks/common/generate_input.hpp>
#include <benchmarks/fixture/benchmark_fixture.hpp>
#include <benchmarks/io/cuio_common.hpp>
#include <benchmarks/io/nvbench_helpers.hpp>

#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/io/orc.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/pinned_memory.hpp>
#include <cudf/utilities/thread_pool.hpp>

#include <nvbench/nvbench.cuh>

#include <vector>

size_t get_num_read_threads(nvbench::state const& state) { return state.get_int64("num_threads"); }

size_t get_read_size(nvbench::state const& state)
{
auto const num_reads = get_num_read_threads(state);
return state.get_int64("total_data_size") / num_reads;
}

std::string get_label(std::string const& test_name, nvbench::state const& state)
{
auto const num_cols = state.get_int64("num_cols");
size_t const read_size_mb = get_read_size(state) / (1024 * 1024);
return {test_name + ", " + std::to_string(num_cols) + " columns, " +
std::to_string(get_num_read_threads(state)) + " threads " + " (" +
std::to_string(read_size_mb) + " MB each)"};
}

std::tuple<std::vector<cuio_source_sink_pair>, size_t, size_t> write_file_data(
nvbench::state& state, std::vector<cudf::type_id> const& d_types)
{
auto const cardinality = state.get_int64("cardinality");
auto const run_length = state.get_int64("run_length");
auto const num_cols = state.get_int64("num_cols");
size_t const num_files = get_num_read_threads(state);
size_t const per_file_data_size = get_read_size(state);

std::vector<cuio_source_sink_pair> source_sink_vector;

size_t total_file_size = 0;

for (size_t i = 0; i < num_files; ++i) {
cuio_source_sink_pair source_sink{io_type::HOST_BUFFER};

auto const tbl = create_random_table(
cycle_dtypes(d_types, num_cols),
table_size_bytes{per_file_data_size},
data_profile_builder().cardinality(cardinality).avg_run_length(run_length));
auto const view = tbl->view();

cudf::io::orc_writer_options const write_opts =
cudf::io::orc_writer_options::builder(source_sink.make_sink_info(), view)
.compression(cudf::io::compression_type::SNAPPY);

cudf::io::write_orc(write_opts);
total_file_size += source_sink.size();

source_sink_vector.push_back(std::move(source_sink));
}

return {std::move(source_sink_vector), total_file_size, num_files};
}

void BM_orc_multithreaded_read_common(nvbench::state& state,
std::vector<cudf::type_id> const& d_types,
std::string const& label)
{
auto const data_size = state.get_int64("total_data_size");
auto const num_threads = state.get_int64("num_threads");

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);

auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
std::vector<cudf::io::source_info> source_info_vector;
std::transform(source_sink_vector.begin(),
source_sink_vector.end(),
std::back_inserter(source_info_vector),
[](auto& source_sink) { return source_sink.make_source_info(); });

auto mem_stats_logger = cudf::memory_stats_logger();

{
cudf::scoped_range range{("(read) " + label).c_str()};
state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer,
[&](nvbench::launch& launch, auto& timer) {
auto read_func = [&](int index) {
auto const stream = streams[index % num_threads];
cudf::io::orc_reader_options read_opts =
cudf::io::orc_reader_options::builder(source_info_vector[index]);
cudf::io::read_orc(read_opts, stream, rmm::mr::get_current_device_resource());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
timer.start();
threads.paused = false;
threads.wait_for_tasks();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
}

auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_element_count(static_cast<double>(data_size) / time, "bytes_per_second");
state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
state.add_buffer_size(total_file_size, "encoded_file_size", "encoded_file_size");
}

void BM_orc_multithreaded_read_mixed(nvbench::state& state)
{
auto label = get_label("mixed", state);
cudf::scoped_range range{label.c_str()};
BM_orc_multithreaded_read_common(
state, {cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label);
}

void BM_orc_multithreaded_read_fixed_width(nvbench::state& state)
{
auto label = get_label("fixed width", state);
cudf::scoped_range range{label.c_str()};
BM_orc_multithreaded_read_common(state, {cudf::type_id::INT32}, label);
}

void BM_orc_multithreaded_read_string(nvbench::state& state)
{
auto label = get_label("string", state);
cudf::scoped_range range{label.c_str()};
BM_orc_multithreaded_read_common(state, {cudf::type_id::STRING}, label);
}

void BM_orc_multithreaded_read_list(nvbench::state& state)
{
auto label = get_label("list", state);
cudf::scoped_range range{label.c_str()};
BM_orc_multithreaded_read_common(state, {cudf::type_id::LIST}, label);
}

void BM_orc_multithreaded_read_chunked_common(nvbench::state& state,
std::vector<cudf::type_id> const& d_types,
std::string const& label)
{
size_t const data_size = state.get_int64("total_data_size");
auto const num_threads = state.get_int64("num_threads");
size_t const input_limit = state.get_int64("input_limit");
size_t const output_limit = state.get_int64("output_limit");

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);
auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
std::vector<cudf::io::source_info> source_info_vector;
std::transform(source_sink_vector.begin(),
source_sink_vector.end(),
std::back_inserter(source_info_vector),
[](auto& source_sink) { return source_sink.make_source_info(); });

auto mem_stats_logger = cudf::memory_stats_logger();

{
cudf::scoped_range range{("(read) " + label).c_str()};
std::vector<cudf::io::table_with_metadata> chunks;
state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer,
[&](nvbench::launch& launch, auto& timer) {
auto read_func = [&](int index) {
auto const stream = streams[index % num_threads];
cudf::io::orc_reader_options read_opts =
cudf::io::orc_reader_options::builder(source_info_vector[index]);
// divide chunk limits by number of threads so the number of chunks produced is
// the same for all cases. this seems better than the alternative, which is to
// keep the limits the same. if we do that, as the number of threads goes up, the
// number of chunks goes down - so are actually benchmarking the same thing in
// that case?
auto reader = cudf::io::chunked_orc_reader(
output_limit / num_threads, input_limit / num_threads, read_opts, stream);

// read all the chunks
do {
auto table = reader.read_chunk();
} while (reader.has_next());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
timer.start();
threads.paused = false;
threads.wait_for_tasks();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
}

auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_element_count(static_cast<double>(data_size) / time, "bytes_per_second");
state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
state.add_buffer_size(total_file_size, "encoded_file_size", "encoded_file_size");
}

void BM_orc_multithreaded_read_chunked_mixed(nvbench::state& state)
{
auto label = get_label("mixed", state);
cudf::scoped_range range{label.c_str()};
BM_orc_multithreaded_read_chunked_common(
state, {cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label);
}

void BM_orc_multithreaded_read_chunked_fixed_width(nvbench::state& state)
{
auto label = get_label("fixed width", state);
cudf::scoped_range range{label.c_str()};
BM_orc_multithreaded_read_chunked_common(state, {cudf::type_id::INT32}, label);
}

void BM_orc_multithreaded_read_chunked_string(nvbench::state& state)
{
auto label = get_label("string", state);
cudf::scoped_range range{label.c_str()};
BM_orc_multithreaded_read_chunked_common(state, {cudf::type_id::STRING}, label);
}

void BM_orc_multithreaded_read_chunked_list(nvbench::state& state)
{
auto label = get_label("list", state);
cudf::scoped_range range{label.c_str()};
BM_orc_multithreaded_read_chunked_common(state, {cudf::type_id::LIST}, label);
}
auto const thread_range = std::vector<nvbench::int64_t>{1, 2, 4, 8};
auto const total_data_size = std::vector<nvbench::int64_t>{512 * 1024 * 1024, 1024 * 1024 * 1024};

// mixed data types: fixed width and strings
NVBENCH_BENCH(BM_orc_multithreaded_read_mixed)
.set_name("orc_multithreaded_read_decode_mixed")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", total_data_size)
.add_int64_axis("num_threads", thread_range)
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8});

NVBENCH_BENCH(BM_orc_multithreaded_read_fixed_width)
.set_name("orc_multithreaded_read_decode_fixed_width")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", total_data_size)
.add_int64_axis("num_threads", thread_range)
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8});

NVBENCH_BENCH(BM_orc_multithreaded_read_string)
.set_name("orc_multithreaded_read_decode_string")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", total_data_size)
.add_int64_axis("num_threads", thread_range)
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8});

NVBENCH_BENCH(BM_orc_multithreaded_read_list)
.set_name("orc_multithreaded_read_decode_list")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", total_data_size)
.add_int64_axis("num_threads", thread_range)
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8});

// mixed data types: fixed width, strings
NVBENCH_BENCH(BM_orc_multithreaded_read_chunked_mixed)
.set_name("orc_multithreaded_read_decode_chunked_mixed")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", total_data_size)
.add_int64_axis("num_threads", thread_range)
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
.add_int64_axis("output_limit", {640 * 1024 * 1024});

NVBENCH_BENCH(BM_orc_multithreaded_read_chunked_fixed_width)
.set_name("orc_multithreaded_read_decode_chunked_fixed_width")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", total_data_size)
.add_int64_axis("num_threads", thread_range)
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
.add_int64_axis("output_limit", {640 * 1024 * 1024});

NVBENCH_BENCH(BM_orc_multithreaded_read_chunked_string)
.set_name("orc_multithreaded_read_decode_chunked_string")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", total_data_size)
.add_int64_axis("num_threads", thread_range)
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
.add_int64_axis("output_limit", {640 * 1024 * 1024});

NVBENCH_BENCH(BM_orc_multithreaded_read_chunked_list)
.set_name("orc_multithreaded_read_decode_chunked_list")
.set_min_samples(4)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", total_data_size)
.add_int64_axis("num_threads", thread_range)
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
.add_int64_axis("output_limit", {640 * 1024 * 1024});
Loading

0 comments on commit 564358f

Please sign in to comment.