Skip to content

Commit

Permalink
Merge branch 'improve/xdist-worksteal-cudf-pandas' of github.com:Matt…
Browse files Browse the repository at this point in the history
…711/cudf into improve/xdist-worksteal-cudf-pandas
  • Loading branch information
Matt711 committed Oct 2, 2024
2 parents 1b8f73a + 9e29f3b commit ad2cff1
Show file tree
Hide file tree
Showing 12 changed files with 754 additions and 564 deletions.
472 changes: 11 additions & 461 deletions cpp/include/cudf/detail/aggregation/aggregation.cuh

Large diffs are not rendered by default.

443 changes: 443 additions & 0 deletions cpp/include/cudf/detail/aggregation/device_aggregators.cuh

Large diffs are not rendered by default.

18 changes: 10 additions & 8 deletions cpp/include/cudf/detail/tdigest/tdigest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,28 +143,30 @@ std::unique_ptr<column> make_tdigest_column(size_type num_rows,
rmm::device_async_resource_ref mr);

/**
* @brief Create an empty tdigest column.
* @brief Create a tdigest column of empty tdigests.
*
* An empty tdigest column contains a single row of length 0
* The column created contains the specified number of rows of empty tdigests.
*
* @param num_rows The number of rows in the output column.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*
* @returns An empty tdigest column.
* @returns A tdigest column of empty clusters.
*/
CUDF_EXPORT
std::unique_ptr<column> make_empty_tdigest_column(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
std::unique_ptr<column> make_empty_tdigests_column(size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* @brief Create an empty tdigest scalar.
* @brief Create a scalar of an empty tdigest cluster.
*
* An empty tdigest scalar is a struct_scalar that contains a single row of length 0
* The returned scalar is a struct_scalar that contains a single row of an empty cluster.
*
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*
* @returns An empty tdigest scalar.
* @returns A scalar of an empty tdigest cluster.
*/
std::unique_ptr<scalar> make_empty_tdigest_scalar(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
Expand Down
20 changes: 10 additions & 10 deletions cpp/include/cudf_test/tdigest_utilities.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ void tdigest_simple_all_nulls_aggregation(Func op)
static_cast<column_view>(values).type(), tdigest_gen{}, op, values, delta);

// NOTE: an empty tdigest column still has 1 row.
auto expected = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto expected = cudf::tdigest::detail::make_empty_tdigests_column(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected);
}
Expand Down Expand Up @@ -562,12 +562,12 @@ template <typename MergeFunc>
void tdigest_merge_empty(MergeFunc merge_op)
{
// 3 empty tdigests all in the same group
auto a = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto b = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto c = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto a = cudf::tdigest::detail::make_empty_tdigests_column(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto b = cudf::tdigest::detail::make_empty_tdigests_column(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto c = cudf::tdigest::detail::make_empty_tdigests_column(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
std::vector<column_view> cols;
cols.push_back(*a);
cols.push_back(*b);
Expand All @@ -577,8 +577,8 @@ void tdigest_merge_empty(MergeFunc merge_op)
auto const delta = 1000;
auto result = merge_op(*values, delta);

auto expected = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto expected = cudf::tdigest::detail::make_empty_tdigests_column(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result);
}
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/aggregation/aggregation.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,9 +15,13 @@
*/

#include <cudf/detail/aggregation/aggregation.cuh>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/table/table_view.hpp>

#include <rmm/cuda_stream_view.hpp>

#include <vector>

namespace cudf {
namespace detail {
void initialize_with_identity(mutable_table_view& table,
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/groupby/hash/groupby_kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

#include "multi_pass_kernels.cuh"

#include <cudf/detail/aggregation/aggregation.cuh>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/aggregation/device_aggregators.cuh>
#include <cudf/groupby.hpp>
#include <cudf/utilities/bit.hpp>

Expand Down Expand Up @@ -100,7 +100,7 @@ struct compute_single_pass_aggs_fn {
if (not skip_rows_with_nulls or cudf::bit_is_set(row_bitmask, i)) {
auto const result = set.insert_and_find(i);

cudf::detail::aggregate_row<true, true>(output_values, *result.first, input_values, i, aggs);
cudf::detail::aggregate_row(output_values, *result.first, input_values, i, aggs);
}
}
};
Expand Down
1 change: 1 addition & 0 deletions cpp/src/groupby/sort/group_single_pass_reduction_util.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/utilities/element_argminmax.cuh>
#include <cudf/detail/valid_if.cuh>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/io/json/nested_json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -620,10 +620,12 @@ struct PdaSymbolToSymbolGroupId {
// We map the delimiter character to LINE_BREAK symbol group id, and the newline character
// to WHITE_SPACE. Note that delimiter cannot be any of opening(closing) brace, bracket, quote,
// escape, comma, colon or whitespace characters.
auto constexpr newline = '\n';
auto constexpr whitespace = ' ';
auto const symbol_position =
symbol == delimiter
? static_cast<int32_t>('\n')
: (symbol == '\n' ? static_cast<int32_t>(' ') : static_cast<int32_t>(symbol));
? static_cast<int32_t>(newline)
: (symbol == newline ? static_cast<int32_t>(whitespace) : static_cast<int32_t>(symbol));
PdaSymbolGroupIdT symbol_gid =
tos_sg_to_pda_sgid[min(symbol_position, pda_sgid_lookup_size - 1)];
return stack_idx * static_cast<PdaSymbolGroupIdT>(symbol_group_id::NUM_PDA_INPUT_SGS) +
Expand Down
23 changes: 12 additions & 11 deletions cpp/src/quantiles/tdigest/tdigest.cu
Original file line number Diff line number Diff line change
Expand Up @@ -292,32 +292,33 @@ std::unique_ptr<column> make_tdigest_column(size_type num_rows,
return make_structs_column(num_rows, std::move(children), 0, {}, stream, mr);
}

std::unique_ptr<column> make_empty_tdigest_column(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
std::unique_ptr<column> make_empty_tdigests_column(size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto offsets = cudf::make_fixed_width_column(
data_type(type_id::INT32), 2, mask_state::UNALLOCATED, stream, mr);
data_type(type_id::INT32), num_rows + 1, mask_state::UNALLOCATED, stream, mr);
thrust::fill(rmm::exec_policy(stream),
offsets->mutable_view().begin<size_type>(),
offsets->mutable_view().end<size_type>(),
0);

auto min_col =
cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr);
auto min_col = cudf::make_numeric_column(
data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr);
thrust::fill(rmm::exec_policy(stream),
min_col->mutable_view().begin<double>(),
min_col->mutable_view().end<double>(),
0);
auto max_col =
cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr);
auto max_col = cudf::make_numeric_column(
data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr);
thrust::fill(rmm::exec_policy(stream),
max_col->mutable_view().begin<double>(),
max_col->mutable_view().end<double>(),
0);

return make_tdigest_column(1,
make_empty_column(type_id::FLOAT64),
make_empty_column(type_id::FLOAT64),
return make_tdigest_column(num_rows,
cudf::make_empty_column(type_id::FLOAT64),
cudf::make_empty_column(type_id::FLOAT64),
std::move(offsets),
std::move(min_col),
std::move(max_col),
Expand All @@ -338,7 +339,7 @@ std::unique_ptr<column> make_empty_tdigest_column(rmm::cuda_stream_view stream,
std::unique_ptr<scalar> make_empty_tdigest_scalar(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto contents = make_empty_tdigest_column(stream, mr)->release();
auto contents = make_empty_tdigests_column(1, stream, mr)->release();
return std::make_unique<struct_scalar>(
std::move(*std::make_unique<table>(std::move(contents.children))), true, stream, mr);
}
Expand Down
Loading

0 comments on commit ad2cff1

Please sign in to comment.