diff --git a/python/cudf/cudf/_lib/aggregation.pxd b/python/cudf/cudf/_lib/aggregation.pxd index ad2c978801f..f83f170c7c2 100644 --- a/python/cudf/cudf/_lib/aggregation.pxd +++ b/python/cudf/cudf/_lib/aggregation.pxd @@ -1,7 +1,8 @@ -# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. from libcpp.memory cimport unique_ptr +from cudf._lib cimport pylibcudf from cudf._lib.cpp.aggregation cimport ( groupby_aggregation, groupby_scan_aggregation, @@ -15,10 +16,7 @@ cdef class RollingAggregation: cdef unique_ptr[rolling_aggregation] c_obj cdef class GroupbyAggregation: - cdef unique_ptr[groupby_aggregation] c_obj - -cdef class GroupbyScanAggregation: - cdef unique_ptr[groupby_scan_aggregation] c_obj + cdef pylibcudf.aggregation.Aggregation c_obj cdef class ReduceAggregation: cdef unique_ptr[reduce_aggregation] c_obj @@ -28,6 +26,5 @@ cdef class ScanAggregation: cdef RollingAggregation make_rolling_aggregation(op, kwargs=*) cdef GroupbyAggregation make_groupby_aggregation(op, kwargs=*) -cdef GroupbyScanAggregation make_groupby_scan_aggregation(op, kwargs=*) cdef ReduceAggregation make_reduce_aggregation(op, kwargs=*) cdef ScanAggregation make_scan_aggregation(op, kwargs=*) diff --git a/python/cudf/cudf/_lib/aggregation.pyx b/python/cudf/cudf/_lib/aggregation.pyx index b202d08ac2e..127580a6ec6 100644 --- a/python/cudf/cudf/_lib/aggregation.pyx +++ b/python/cudf/cudf/_lib/aggregation.pyx @@ -23,13 +23,14 @@ from cudf._lib.types import Interpolation cimport cudf._lib.cpp.aggregation as libcudf_aggregation cimport cudf._lib.cpp.types as libcudf_types -from cudf._lib.cpp.aggregation cimport ( - underlying_type_t_correlation_type, - underlying_type_t_rank_method, -) +from cudf._lib.cpp.aggregation cimport underlying_type_t_correlation_type import cudf +from cudf._lib cimport pylibcudf + +from cudf._lib import pylibcudf + class AggregationKind(Enum): SUM = libcudf_aggregation.aggregation.Kind.SUM @@ -257,226 +258,120 @@ cdef class GroupbyAggregation: like `df.agg(lambda x: x.sum())`; such functions are called with this class as an argument to generation the desired aggregation. """ + def __init__(self, pylibcudf.aggregation.Aggregation agg): + self.c_obj = agg + @property def kind(self): - return AggregationKind(self.c_obj.get()[0].kind).name + return AggregationKind(int(self.c_obj.kind())).name @classmethod def sum(cls): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_sum_aggregation[groupby_aggregation]()) - return agg + return cls(pylibcudf.aggregation.sum()) @classmethod def min(cls): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_min_aggregation[groupby_aggregation]()) - return agg + return cls(pylibcudf.aggregation.min()) @classmethod def max(cls): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_max_aggregation[groupby_aggregation]()) - return agg + return cls(pylibcudf.aggregation.max()) @classmethod def idxmin(cls): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_argmin_aggregation[ - groupby_aggregation]()) - return agg + return cls(pylibcudf.aggregation.argmin()) @classmethod def idxmax(cls): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_argmax_aggregation[ - groupby_aggregation]()) - return agg + return cls(pylibcudf.aggregation.argmax()) @classmethod def mean(cls): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_mean_aggregation[groupby_aggregation]()) - return agg + return cls(pylibcudf.aggregation.mean()) @classmethod def count(cls, dropna=True): - cdef libcudf_types.null_policy c_null_handling - if dropna: - c_null_handling = libcudf_types.null_policy.EXCLUDE - else: - c_null_handling = libcudf_types.null_policy.INCLUDE - - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_count_aggregation[groupby_aggregation]( - c_null_handling - )) - return agg + return cls(pylibcudf.aggregation.count( + pylibcudf.types.NullPolicy.EXCLUDE + if dropna else pylibcudf.types.NullPolicy.INCLUDE + )) @classmethod def size(cls): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_count_aggregation[groupby_aggregation]( - ( - NullHandling.INCLUDE) - )) - return agg + return cls(pylibcudf.aggregation.count(pylibcudf.types.NullPolicy.INCLUDE)) @classmethod def collect(cls): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_collect_list_aggregation[groupby_aggregation]( - libcudf_types.null_policy.INCLUDE - )) - return agg + return cls( + pylibcudf.aggregation.collect_list(pylibcudf.types.NullPolicy.INCLUDE) + ) @classmethod def nunique(cls): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_nunique_aggregation[groupby_aggregation]( - libcudf_types.null_policy.EXCLUDE - )) - return agg + return cls(pylibcudf.aggregation.nunique(pylibcudf.types.NullPolicy.EXCLUDE)) @classmethod def nth(cls, libcudf_types.size_type size): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_nth_element_aggregation[groupby_aggregation](size)) - return agg + return cls(pylibcudf.aggregation.nth_element(size)) @classmethod def product(cls): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_product_aggregation[groupby_aggregation]()) - return agg + return cls(pylibcudf.aggregation.product()) prod = product @classmethod def sum_of_squares(cls): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_sum_of_squares_aggregation[groupby_aggregation]() - ) - return agg + return cls(pylibcudf.aggregation.sum_of_squares()) @classmethod def var(cls, ddof=1): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_variance_aggregation[groupby_aggregation](ddof)) - return agg + return cls(pylibcudf.aggregation.variance(ddof)) @classmethod def std(cls, ddof=1): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_std_aggregation[groupby_aggregation](ddof)) - return agg + return cls(pylibcudf.aggregation.std(ddof)) @classmethod def median(cls): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_median_aggregation[groupby_aggregation]()) - return agg + return cls(pylibcudf.aggregation.median()) @classmethod def quantile(cls, q=0.5, interpolation="linear"): - cdef GroupbyAggregation agg = cls() - if not pd.api.types.is_list_like(q): q = [q] - cdef vector[double] c_q = q - cdef libcudf_types.interpolation c_interp = ( - ( - ( - Interpolation[interpolation.upper()] - ) - ) - ) - agg.c_obj = move( - libcudf_aggregation.make_quantile_aggregation[groupby_aggregation]( - c_q, c_interp) - ) - return agg + return cls(pylibcudf.aggregation.quantile( + q, pylibcudf.types.Interpolation[interpolation.upper()] + )) @classmethod def unique(cls): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_collect_set_aggregation[groupby_aggregation]( - libcudf_types.null_policy.INCLUDE, - libcudf_types.null_equality.EQUAL, - libcudf_types.nan_equality.ALL_EQUAL, - )) - return agg + return cls(pylibcudf.aggregation.collect_set( + pylibcudf.types.NullPolicy.INCLUDE, + pylibcudf.types.NullEquality.EQUAL, + pylibcudf.types.NanEquality.ALL_EQUAL, + + )) @classmethod def first(cls): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_nth_element_aggregation[groupby_aggregation]( - 0, - ( - NullHandling.EXCLUDE - ) - ) + return cls( + pylibcudf.aggregation.nth_element(0, pylibcudf.types.NullPolicy.EXCLUDE) ) - return agg @classmethod def last(cls): - cdef GroupbyAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_nth_element_aggregation[groupby_aggregation]( - -1, - ( - NullHandling.EXCLUDE - ) - ) + return cls( + pylibcudf.aggregation.nth_element(-1, pylibcudf.types.NullPolicy.EXCLUDE) ) - return agg @classmethod def corr(cls, method, libcudf_types.size_type min_periods): - cdef GroupbyAggregation agg = cls() - cdef libcudf_aggregation.correlation_type c_method = ( - ( - ( - CorrelationType[method.upper()] - ) - ) - ) - agg.c_obj = move( - libcudf_aggregation. - make_correlation_aggregation[groupby_aggregation]( - c_method, min_periods - )) - return agg + return cls(pylibcudf.aggregation.correlation( + pylibcudf.aggregation.CorrelationType[method.upper()], + min_periods + + )) @classmethod def cov( @@ -484,125 +379,36 @@ cdef class GroupbyAggregation: libcudf_types.size_type min_periods, libcudf_types.size_type ddof=1 ): - cdef GroupbyAggregation agg = cls() - - agg.c_obj = move( - libcudf_aggregation. - make_covariance_aggregation[groupby_aggregation]( - min_periods, ddof - )) - return agg - - -cdef class GroupbyScanAggregation: - """A Cython wrapper for groupby scan aggregations. - - **This class should never be instantiated using a standard constructor, - only using one of its many factories.** These factories handle mapping - different cudf operations to their libcudf analogs, e.g. - `cudf.DataFrame.idxmin` -> `libcudf.argmin`. Additionally, they perform - any additional configuration needed to translate Python arguments into - their corresponding C++ types (for instance, C++ enumerations used for - flag arguments). The factory approach is necessary to support operations - like `df.agg(lambda x: x.sum())`; such functions are called with this - class as an argument to generation the desired aggregation. - """ - @property - def kind(self): - return AggregationKind(self.c_obj.get()[0].kind).name - - @classmethod - def sum(cls): - cdef GroupbyScanAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_sum_aggregation[groupby_scan_aggregation]()) - return agg - - @classmethod - def min(cls): - cdef GroupbyScanAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_min_aggregation[groupby_scan_aggregation]()) - return agg - - @classmethod - def max(cls): - cdef GroupbyScanAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_max_aggregation[groupby_scan_aggregation]()) - return agg - - @classmethod - def count(cls, dropna=True): - cdef libcudf_types.null_policy c_null_handling - if dropna: - c_null_handling = libcudf_types.null_policy.EXCLUDE - else: - c_null_handling = libcudf_types.null_policy.INCLUDE - - cdef GroupbyScanAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_count_aggregation[groupby_scan_aggregation](c_null_handling)) - return agg - - @classmethod - def size(cls): - cdef GroupbyScanAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_count_aggregation[groupby_scan_aggregation]( - ( - NullHandling.INCLUDE) - )) - return agg + return cls(pylibcudf.aggregation.covariance( + min_periods, + ddof + )) + # scan aggregations @classmethod def cumcount(cls): - cdef GroupbyScanAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_count_aggregation[groupby_scan_aggregation]( - libcudf_types.null_policy.INCLUDE - )) - return agg + return cls.count(False) - # scan aggregations - # TODO: update this after adding per algorithm aggregation derived types - # https://github.com/rapidsai/cudf/issues/7106 cumsum = sum cummin = min cummax = max @classmethod def rank(cls, method, ascending, na_option, pct): - cdef GroupbyScanAggregation agg = cls() - cdef libcudf_aggregation.rank_method c_method = ( - ( - ( - RankMethod[method.upper()] - ) - ) - ) - agg.c_obj = move( - libcudf_aggregation. - make_rank_aggregation[groupby_scan_aggregation]( - c_method, - (libcudf_types.order.ASCENDING if ascending else - libcudf_types.order.DESCENDING), - (libcudf_types.null_policy.EXCLUDE if na_option == "keep" else - libcudf_types.null_policy.INCLUDE), - (libcudf_types.null_order.BEFORE - if (na_option == "top") == ascending else - libcudf_types.null_order.AFTER), - (libcudf_aggregation.rank_percentage.ZERO_NORMALIZED - if pct else - libcudf_aggregation.rank_percentage.NONE) - )) - return agg + return cls(pylibcudf.aggregation.rank( + pylibcudf.aggregation.RankMethod[method.upper()], + (pylibcudf.types.Order.ASCENDING if ascending else + pylibcudf.types.Order.DESCENDING), + (pylibcudf.types.NullPolicy.EXCLUDE if na_option == "keep" else + pylibcudf.types.NullPolicy.INCLUDE), + (pylibcudf.types.NullOrder.BEFORE + if (na_option == "top") == ascending else + pylibcudf.types.NullOrder.AFTER), + (pylibcudf.aggregation.RankPercentage.ZERO_NORMALIZED + if pct else + pylibcudf.aggregation.RankPercentage.NONE) + + )) cdef class ReduceAggregation: @@ -878,44 +684,6 @@ cdef GroupbyAggregation make_groupby_aggregation(op, kwargs=None): raise TypeError(f"Unknown aggregation {op}") return agg -cdef GroupbyScanAggregation make_groupby_scan_aggregation(op, kwargs=None): - r""" - Parameters - ---------- - op : str or callable - If callable, must meet one of the following requirements: - - * Is of the form lambda x: x.agg(*args, **kwargs), where - `agg` is the name of a supported aggregation. Used to - to specify aggregations that take arguments, e.g., - `lambda x: x.quantile(0.5)`. - * Is a user defined aggregation function that operates on - grouped, scannable values. In this case, the output dtype must be - specified in the `kwargs` dictionary. - \*\*kwargs : dict, optional - Any keyword arguments to be passed to the op. - - Returns - ------- - GroupbyScanAggregation - """ - if kwargs is None: - kwargs = {} - - cdef GroupbyScanAggregation agg - if isinstance(op, str): - agg = getattr(GroupbyScanAggregation, op)(**kwargs) - elif callable(op): - if op is list: - agg = GroupbyScanAggregation.collect() - elif "dtype" in kwargs: - agg = GroupbyScanAggregation.from_udf(op, **kwargs) - else: - agg = op(GroupbyScanAggregation) - else: - raise TypeError(f"Unknown aggregation {op}") - return agg - cdef ReduceAggregation make_reduce_aggregation(op, kwargs=None): r""" Parameters diff --git a/python/cudf/cudf/_lib/groupby.pyx b/python/cudf/cudf/_lib/groupby.pyx index db4c5e6173a..3493d1c4f33 100644 --- a/python/cudf/cudf/_lib/groupby.pyx +++ b/python/cudf/cudf/_lib/groupby.pyx @@ -13,33 +13,16 @@ from cudf.core.dtypes import ( StructDtype, ) -from libcpp cimport bool -from libcpp.memory cimport unique_ptr -from libcpp.pair cimport pair -from libcpp.utility cimport move -from libcpp.vector cimport vector - -from cudf._lib.column cimport Column from cudf._lib.scalar cimport DeviceScalar -from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns +from cudf._lib.utils cimport columns_from_pylibcudf_table from cudf._lib.scalar import as_device_scalar -from libcpp.functional cimport reference_wrapper - -cimport cudf._lib.cpp.groupby as libcudf_groupby -cimport cudf._lib.cpp.types as libcudf_types -from cudf._lib.aggregation cimport ( - GroupbyAggregation, - GroupbyScanAggregation, - make_groupby_aggregation, - make_groupby_scan_aggregation, -) -from cudf._lib.cpp.column.column cimport column +from cudf._lib.aggregation cimport make_groupby_aggregation from cudf._lib.cpp.replace cimport replace_policy from cudf._lib.cpp.scalar.scalar cimport scalar -from cudf._lib.cpp.table.table cimport table, table_view -from cudf._lib.cpp.types cimport size_type + +from cudf._lib import pylibcudf # The sets below define the possible aggregations that can be performed on # different dtypes. These strings must be elements of the AggregationKind enum. @@ -111,67 +94,24 @@ def _(dtype: DecimalDtype): return _DECIMAL_AGGS -cdef _agg_result_from_columns( - vector[libcudf_groupby.aggregation_result]& c_result_columns, - set column_included, - int n_input_columns -): - """Construct the list of result columns from libcudf result. The result - contains the same number of lists as the number of input columns. Result - for an input column that has no applicable aggregations is an empty list. - """ - cdef: - int i - int j - int result_index = 0 - vector[unique_ptr[column]]* c_result - result_columns = [] - for i in range(n_input_columns): - if i in column_included: - c_result = &c_result_columns[result_index].results - result_columns.append([ - Column.from_unique_ptr(move(c_result[0][j])) - for j in range(c_result[0].size()) - ]) - result_index += 1 - else: - result_columns.append([]) - return result_columns - cdef class GroupBy: - cdef unique_ptr[libcudf_groupby.groupby] c_obj cdef dict __dict__ - def __cinit__(self, list keys, bool dropna=True): - cdef libcudf_types.null_policy c_null_handling - cdef table_view keys_view - - if dropna: - c_null_handling = libcudf_types.null_policy.EXCLUDE - else: - c_null_handling = libcudf_types.null_policy.INCLUDE - + def __init__(self, keys, dropna=True): with acquire_spill_lock() as spill_lock: - keys_view = table_view_from_columns(keys) - # We spill lock the columns while this GroupBy instance is alive. - self._spill_lock = spill_lock - - with nogil: - self.c_obj.reset( - new libcudf_groupby.groupby( - keys_view, - c_null_handling, - ) + self._groupby = pylibcudf.groupby.GroupBy( + pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in keys]), + pylibcudf.types.NullPolicy.EXCLUDE if dropna + else pylibcudf.types.NullPolicy.INCLUDE ) - def __init__(self, list keys, bool dropna=True): - self.keys = keys - self.dropna = dropna + # We spill lock the columns while this GroupBy instance is alive. + self._spill_lock = spill_lock def groups(self, list values): """ - Perform a sort groupby, using ``self.keys`` as the key columns - and ``values`` as the value columns. + Perform a sort groupby, using the keys used to construct the Groupby as the key + columns and ``values`` as the value columns. Parameters ---------- @@ -188,145 +128,17 @@ cdef class GroupBy: Integer offsets such that offsets[i+1] - offsets[i] represents the size of group `i`. """ - cdef table_view values_view = table_view_from_columns(values) - - with nogil: - c_groups = move(self.c_obj.get()[0].get_groups(values_view)) - - grouped_key_cols = columns_from_unique_ptr(move(c_groups.keys)) - - if values: - grouped_value_cols = columns_from_unique_ptr(move(c_groups.values)) - else: - grouped_value_cols = [] - return grouped_key_cols, grouped_value_cols, c_groups.offsets - - def aggregate_internal(self, values, aggregations): - """`values` is a list of columns and `aggregations` is a list of list - of aggregations. `aggregations[i]` is a list of aggregations for - `values[i]`. Returns a tuple containing 1) list of list of aggregation - results, 2) a list of grouped keys, and 3) a list of list of - aggregations performed. - """ - cdef vector[libcudf_groupby.aggregation_request] c_agg_requests - cdef libcudf_groupby.aggregation_request c_agg_request - cdef Column col - cdef GroupbyAggregation agg_obj - - cdef pair[ - unique_ptr[table], - vector[libcudf_groupby.aggregation_result] - ] c_result - - allow_empty = all(len(v) == 0 for v in aggregations) - - included_aggregations = [] - column_included = set() - for i, (col, aggs) in enumerate(zip(values, aggregations)): - dtype = col.dtype - - valid_aggregations = get_valid_aggregation(dtype) - included_aggregations_i = [] - - c_agg_request = move(libcudf_groupby.aggregation_request()) - for agg in aggs: - agg_obj = make_groupby_aggregation(agg) - if (valid_aggregations == "ALL" - or agg_obj.kind in valid_aggregations): - included_aggregations_i.append((agg, agg_obj.kind)) - c_agg_request.aggregations.push_back( - move(agg_obj.c_obj) - ) - included_aggregations.append(included_aggregations_i) - if not c_agg_request.aggregations.empty(): - c_agg_request.values = col.view() - c_agg_requests.push_back( - move(c_agg_request) - ) - column_included.add(i) - if c_agg_requests.empty() and not allow_empty: - raise DataError("All requested aggregations are unsupported.") - - with nogil: - c_result = move( - self.c_obj.get()[0].aggregate( - c_agg_requests - ) - ) - - grouped_keys = columns_from_unique_ptr( - move(c_result.first) - ) - - result_columns = _agg_result_from_columns( - c_result.second, column_included, len(values) - ) - - return result_columns, grouped_keys, included_aggregations - - def scan_internal(self, values, aggregations): - """`values` is a list of columns and `aggregations` is a list of list - of aggregations. `aggregations[i]` is a list of aggregations for - `values[i]`. Returns a tuple containing 1) list of list of aggregation - results, 2) a list of grouped keys, and 3) a list of list of - aggregations performed. - """ - cdef vector[libcudf_groupby.scan_request] c_agg_requests - cdef libcudf_groupby.scan_request c_agg_request - cdef Column col - cdef GroupbyScanAggregation agg_obj - - cdef pair[ - unique_ptr[table], - vector[libcudf_groupby.aggregation_result] - ] c_result - - allow_empty = all(len(v) == 0 for v in aggregations) - - included_aggregations = [] - column_included = set() - for i, (col, aggs) in enumerate(zip(values, aggregations)): - dtype = col.dtype - - valid_aggregations = get_valid_aggregation(dtype) - included_aggregations_i = [] - - c_agg_request = move(libcudf_groupby.scan_request()) - for agg in aggs: - agg_obj = make_groupby_scan_aggregation(agg) - if (valid_aggregations == "ALL" - or agg_obj.kind in valid_aggregations): - included_aggregations_i.append((agg, agg_obj.kind)) - c_agg_request.aggregations.push_back( - move(agg_obj.c_obj) - ) - included_aggregations.append(included_aggregations_i) - if not c_agg_request.aggregations.empty(): - c_agg_request.values = col.view() - c_agg_requests.push_back( - move(c_agg_request) - ) - column_included.add(i) - if c_agg_requests.empty() and not allow_empty: - raise DataError("All requested aggregations are unsupported.") - - with nogil: - c_result = move( - self.c_obj.get()[0].scan( - c_agg_requests - ) - ) - - grouped_keys = columns_from_unique_ptr( - move(c_result.first) + grouped_keys, grouped_values, offsets = self._groupby.get_groups( + pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in values]) + if values else None ) - result_columns = _agg_result_from_columns( - c_result.second, column_included, len(values) + return ( + columns_from_pylibcudf_table(grouped_keys), + columns_from_pylibcudf_table(grouped_values), + offsets, ) - return result_columns, grouped_keys, included_aggregations - def aggregate(self, values, aggregations): """ Parameters @@ -344,56 +156,61 @@ cdef class GroupBy: ------- Frame of aggregated values """ - if _is_all_scan_aggregate(aggregations): - return self.scan_internal(values, aggregations) + included_aggregations = [] + column_included = [] + requests = [] + for i, (col, aggs) in enumerate(zip(values, aggregations)): + valid_aggregations = get_valid_aggregation(col.dtype) + included_aggregations_i = [] + col_aggregations = [] + for agg in aggs: + agg_obj = make_groupby_aggregation(agg) + if valid_aggregations == "ALL" or agg_obj.kind in valid_aggregations: + included_aggregations_i.append((agg, agg_obj.kind)) + col_aggregations.append(agg_obj.c_obj) + included_aggregations.append(included_aggregations_i) + if col_aggregations: + requests.append(pylibcudf.groupby.GroupByRequest( + col.to_pylibcudf(mode="read"), col_aggregations + )) + column_included.append(i) - return self.aggregate_internal(values, aggregations) + if not requests and any(len(v) > 0 for v in aggregations): + raise DataError("All requested aggregations are unsupported.") - def shift(self, list values, int periods, list fill_values): - cdef table_view view = table_view_from_columns(values) - cdef size_type num_col = view.num_columns() - cdef vector[size_type] offsets = vector[size_type](num_col, periods) - - cdef vector[reference_wrapper[constscalar]] c_fill_values - cdef DeviceScalar d_slr - d_slrs = [] - c_fill_values.reserve(num_col) - for val, col in zip(fill_values, values): - d_slr = as_device_scalar(val, dtype=col.dtype) - d_slrs.append(d_slr) - c_fill_values.push_back( - reference_wrapper[constscalar](d_slr.get_raw_ptr()[0]) - ) + keys, results = self._groupby.scan(requests) if \ + _is_all_scan_aggregate(aggregations) else self._groupby.aggregate(requests) - cdef pair[unique_ptr[table], unique_ptr[table]] c_result + result_columns = [[] for _ in range(len(values))] + for i, result in zip(column_included, results): + result_columns[i] = columns_from_pylibcudf_table(result) - with nogil: - c_result = move( - self.c_obj.get()[0].shift(view, offsets, c_fill_values) - ) + return result_columns, columns_from_pylibcudf_table(keys), included_aggregations - grouped_keys = columns_from_unique_ptr(move(c_result.first)) - shifted = columns_from_unique_ptr(move(c_result.second)) + def shift(self, list values, int periods, list fill_values): + keys, shifts = self._groupby.shift( + pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in values]), + [periods] * len(values), + [ + ( as_device_scalar(val, dtype=col.dtype)).c_value + for val, col in zip(fill_values, values) + ], + ) - return shifted, grouped_keys + return columns_from_pylibcudf_table(shifts), columns_from_pylibcudf_table(keys) def replace_nulls(self, list values, object method): - cdef table_view val_view = table_view_from_columns(values) - cdef pair[unique_ptr[table], unique_ptr[table]] c_result - cdef replace_policy policy = ( - replace_policy.PRECEDING - if method == 'ffill' else replace_policy.FOLLOWING - ) - cdef vector[replace_policy] policies = vector[replace_policy]( - val_view.num_columns(), policy + # TODO: This is using an enum (replace_policy) that has not been exposed in + # pylibcudf yet. We'll want to fix that import once it is in pylibcudf. + _, replaced = self._groupby.replace_nulls( + pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in values]), + [ + replace_policy.PRECEDING + if method == 'ffill' else replace_policy.FOLLOWING + ] * len(values), ) - with nogil: - c_result = move( - self.c_obj.get()[0].replace_nulls(val_view, policies) - ) - - return columns_from_unique_ptr(move(c_result.second)) + return columns_from_pylibcudf_table(replaced) _GROUPBY_SCANS = {"cumcount", "cumsum", "cummin", "cummax", "rank"} diff --git a/python/cudf/cudf/_lib/pylibcudf/aggregation.pyx b/python/cudf/cudf/_lib/pylibcudf/aggregation.pyx index 0b91263d720..bde2643d5b1 100644 --- a/python/cudf/cudf/_lib/pylibcudf/aggregation.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/aggregation.pyx @@ -70,6 +70,8 @@ cdef class Aggregation: operations to perform. Using a class for aggregations provides a unified API for handling parametrizable aggregations. This class should never be instantiated directly, only via one of the factory functions. + + For details, see :cpp:class:`cudf::aggregation`. """ def __init__(self): raise ValueError( @@ -128,6 +130,8 @@ cdef class Aggregation: cpdef Aggregation sum(): """Create a sum aggregation. + For details, see :cpp:func:`make_sum_aggregation`. + Returns ------- Aggregation @@ -139,6 +143,8 @@ cpdef Aggregation sum(): cpdef Aggregation product(): """Create a product aggregation. + For details, see :cpp:func:`make_product_aggregation`. + Returns ------- Aggregation @@ -150,6 +156,8 @@ cpdef Aggregation product(): cpdef Aggregation min(): """Create a min aggregation. + For details, see :cpp:func:`make_min_aggregation`. + Returns ------- Aggregation @@ -161,6 +169,8 @@ cpdef Aggregation min(): cpdef Aggregation max(): """Create a max aggregation. + For details, see :cpp:func:`make_max_aggregation`. + Returns ------- Aggregation @@ -172,6 +182,8 @@ cpdef Aggregation max(): cpdef Aggregation count(null_policy null_handling = null_policy.EXCLUDE): """Create a count aggregation. + For details, see :cpp:func:`make_count_aggregation`. + Parameters ---------- null_handling : null_policy, default EXCLUDE @@ -190,6 +202,8 @@ cpdef Aggregation count(null_policy null_handling = null_policy.EXCLUDE): cpdef Aggregation any(): """Create an any aggregation. + For details, see :cpp:func:`make_any_aggregation`. + Returns ------- Aggregation @@ -201,6 +215,8 @@ cpdef Aggregation any(): cpdef Aggregation all(): """Create an all aggregation. + For details, see :cpp:func:`make_all_aggregation`. + Returns ------- Aggregation @@ -212,6 +228,8 @@ cpdef Aggregation all(): cpdef Aggregation sum_of_squares(): """Create a sum_of_squares aggregation. + For details, see :cpp:func:`make_sum_of_squares_aggregation`. + Returns ------- Aggregation @@ -225,6 +243,8 @@ cpdef Aggregation sum_of_squares(): cpdef Aggregation mean(): """Create a mean aggregation. + For details, see :cpp:func:`make_mean_aggregation`. + Returns ------- Aggregation @@ -236,6 +256,8 @@ cpdef Aggregation mean(): cpdef Aggregation variance(size_type ddof=1): """Create a variance aggregation. + For details, see :cpp:func:`make_variance_aggregation`. + Parameters ---------- ddof : int, default 1 @@ -252,6 +274,8 @@ cpdef Aggregation variance(size_type ddof=1): cpdef Aggregation std(size_type ddof=1): """Create a std aggregation. + For details, see :cpp:func:`make_std_aggregation`. + Parameters ---------- ddof : int, default 1 @@ -268,6 +292,8 @@ cpdef Aggregation std(size_type ddof=1): cpdef Aggregation median(): """Create a median aggregation. + For details, see :cpp:func:`make_median_aggregation`. + Returns ------- Aggregation @@ -279,6 +305,8 @@ cpdef Aggregation median(): cpdef Aggregation quantile(list quantiles, interpolation interp = interpolation.LINEAR): """Create a quantile aggregation. + For details, see :cpp:func:`make_quantile_aggregation`. + Parameters ---------- quantiles : list @@ -300,6 +328,8 @@ cpdef Aggregation quantile(list quantiles, interpolation interp = interpolation. cpdef Aggregation argmax(): """Create an argmax aggregation. + For details, see :cpp:func:`make_argmax_aggregation`. + Returns ------- Aggregation @@ -311,6 +341,8 @@ cpdef Aggregation argmax(): cpdef Aggregation argmin(): """Create an argmin aggregation. + For details, see :cpp:func:`make_argmin_aggregation`. + Returns ------- Aggregation @@ -322,6 +354,8 @@ cpdef Aggregation argmin(): cpdef Aggregation nunique(null_policy null_handling = null_policy.EXCLUDE): """Create a nunique aggregation. + For details, see :cpp:func:`make_nunique_aggregation`. + Parameters ---------- null_handling : null_policy, default EXCLUDE @@ -342,6 +376,8 @@ cpdef Aggregation nth_element( ): """Create a nth_element aggregation. + For details, see :cpp:func:`make_nth_element_aggregation`. + Parameters ---------- null_handling : null_policy, default INCLUDE @@ -360,6 +396,8 @@ cpdef Aggregation nth_element( cpdef Aggregation collect_list(null_policy null_handling = null_policy.INCLUDE): """Create a collect_list aggregation. + For details, see :cpp:func:`make_collect_list_aggregation`. + Parameters ---------- null_handling : null_policy, default INCLUDE @@ -382,6 +420,8 @@ cpdef Aggregation collect_set( ): """Create a collect_set aggregation. + For details, see :cpp:func:`make_collect_set_aggregation`. + Parameters ---------- null_handling : null_policy, default INCLUDE @@ -407,6 +447,8 @@ cpdef Aggregation collect_set( cpdef Aggregation udf(str operation, DataType output_type): """Create a udf aggregation. + For details, see :cpp:func:`make_udf_aggregation`. + Parameters ---------- operation : str @@ -433,6 +475,8 @@ cpdef Aggregation udf(str operation, DataType output_type): cpdef Aggregation correlation(correlation_type type, size_type min_periods): """Create a correlation aggregation. + For details, see :cpp:func:`make_correlation_aggregation`. + Parameters ---------- type : correlation_type @@ -454,6 +498,8 @@ cpdef Aggregation correlation(correlation_type type, size_type min_periods): cpdef Aggregation covariance(size_type min_periods, size_type ddof): """Create a covariance aggregation. + For details, see :cpp:func:`make_covariance_aggregation`. + Parameters ---------- min_periods : int @@ -481,6 +527,8 @@ cpdef Aggregation rank( ): """Create a rank aggregation. + For details, see :cpp:func:`make_rank_aggregation`. + Parameters ---------- method : rank_method diff --git a/python/cudf/cudf/_lib/pylibcudf/groupby.pxd b/python/cudf/cudf/_lib/pylibcudf/groupby.pxd index ce472e3c990..d06959b3c31 100644 --- a/python/cudf/cudf/_lib/pylibcudf/groupby.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/groupby.pxd @@ -40,7 +40,7 @@ cdef class GroupBy: cpdef tuple aggregate(self, list requests) cpdef tuple scan(self, list requests) cpdef tuple shift(self, Table values, list offset, list fill_values) - cpdef tuple replace_nulls(self, Table values, list replace_policy) + cpdef tuple replace_nulls(self, Table values, list replace_policies) cpdef tuple get_groups(self, Table values=*) @staticmethod diff --git a/python/cudf/cudf/_lib/pylibcudf/groupby.pyx b/python/cudf/cudf/_lib/pylibcudf/groupby.pyx index f442aafa4bd..d6ce9825ed3 100644 --- a/python/cudf/cudf/_lib/pylibcudf/groupby.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/groupby.pyx @@ -14,6 +14,7 @@ from cudf._lib.cpp.groupby cimport ( groups, scan_request, ) +from cudf._lib.cpp.replace cimport replace_policy from cudf._lib.cpp.scalar.scalar cimport scalar from cudf._lib.cpp.table.table cimport table from cudf._lib.cpp.types cimport size_type @@ -28,6 +29,12 @@ from .utils cimport _as_vector cdef class GroupByRequest: """A request for a groupby aggregation or scan. + This class is functionally polymorphic and can represent either an + aggregation or a scan depending on the algorithm it is used with. For + details on the libcudf types it converts to, see + :cpp:class:`cudf::groupby::aggregation_request` and + :cpp:class:`cudf::groupby::scan_request`. + Parameters ---------- values : Column @@ -73,6 +80,8 @@ cdef class GroupByRequest: cdef class GroupBy: """Group values by keys and compute various aggregate quantities. + For details, see :cpp:class:`cudf::groupby::groupby`. + Parameters ---------- keys : Table @@ -113,6 +122,8 @@ cdef class GroupBy: cpdef tuple aggregate(self, list requests): """Compute aggregations on columns. + For details, see :cpp:func:`cudf::groupby::groupby::aggregate`. + Parameters ---------- requests : List[GroupByRequest] @@ -132,14 +143,16 @@ cdef class GroupBy: for request in requests: c_requests.push_back(move(request._to_libcudf_agg_request())) - cdef pair[unique_ptr[table], vector[aggregation_result]] c_res = move( - dereference(self.c_obj).aggregate(c_requests) - ) + cdef pair[unique_ptr[table], vector[aggregation_result]] c_res + with nogil: + c_res = move(dereference(self.c_obj).aggregate(c_requests)) return GroupBy._parse_outputs(move(c_res)) cpdef tuple scan(self, list requests): """Compute scans on columns. + For details, see :cpp:func:`cudf::groupby::groupby::scan`. + Parameters ---------- requests : List[GroupByRequest] @@ -159,14 +172,16 @@ cdef class GroupBy: for request in requests: c_requests.push_back(move(request._to_libcudf_scan_request())) - cdef pair[unique_ptr[table], vector[aggregation_result]] c_res = move( - dereference(self.c_obj).scan(c_requests) - ) + cdef pair[unique_ptr[table], vector[aggregation_result]] c_res + with nogil: + c_res = move(dereference(self.c_obj).scan(c_requests)) return GroupBy._parse_outputs(move(c_res)) cpdef tuple shift(self, Table values, list offset, list fill_values): """Compute shifts on columns. + For details, see :cpp:func:`cudf::groupby::groupby::shift`. + Parameters ---------- values : Table @@ -186,9 +201,11 @@ cdef class GroupBy: _as_vector(fill_values) cdef vector[size_type] c_offset = offset - cdef pair[unique_ptr[table], unique_ptr[table]] c_res = move( - dereference(self.c_obj).shift(values.view(), c_offset, c_fill_values) - ) + cdef pair[unique_ptr[table], unique_ptr[table]] c_res + with nogil: + c_res = move( + dereference(self.c_obj).shift(values.view(), c_offset, c_fill_values) + ) return ( Table.from_libcudf(move(c_res.first)), @@ -198,6 +215,8 @@ cdef class GroupBy: cpdef tuple replace_nulls(self, Table value, list replace_policies): """Replace nulls in columns. + For details, see :cpp:func:`cudf::groupby::groupby::replace_nulls`. + Parameters ---------- values : Table @@ -211,9 +230,12 @@ cdef class GroupBy: A tuple whose first element is the group's keys and whose second element is a table of values with nulls replaced. """ - cdef pair[unique_ptr[table], unique_ptr[table]] c_res = move( - dereference(self.c_obj).replace_nulls(value.view(), replace_policies) - ) + cdef pair[unique_ptr[table], unique_ptr[table]] c_res + cdef vector[replace_policy] c_replace_policies = replace_policies + with nogil: + c_res = move( + dereference(self.c_obj).replace_nulls(value.view(), c_replace_policies) + ) return ( Table.from_libcudf(move(c_res.first)), @@ -223,6 +245,8 @@ cdef class GroupBy: cpdef tuple get_groups(self, Table values=None): """Get the grouped keys and values labels for each row. + For details, see :cpp:func:`cudf::groupby::groupby::get_groups`. + Parameters ---------- values : Table, optional