diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index af771bb32b7..f65000b2b30 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -82,7 +82,6 @@ def __init__( sort=False, as_index=True, dropna=True, - engine="nonjit", cache=True, ): """ @@ -119,8 +118,6 @@ def __init__( self._level = level self._sort = sort self._dropna = dropna - self._engine = engine - self._cache = cache if isinstance(by, _Grouping): by._obj = self.obj @@ -551,7 +548,7 @@ def pipe(self, func, *args, **kwargs): """ return cudf.core.common.pipe(self, func, *args, **kwargs) - def apply(self, function, *args, engine="nonjit", cache=True): + def apply(self, function, *args, engine=None, cache=True): """Apply a python transformation function over the grouped chunk. Parameters @@ -620,44 +617,39 @@ def mult(df): raise TypeError(f"type {type(function)} is not callable") group_names, offsets, _, grouped_values = self._grouped() - self._engine = engine - self._cache = cache - if self._engine == "jit": + if engine == "numba": chunk_results = jit_groupby_apply( offsets, grouped_values, function, *args, cache=cache ) result = cudf.Series(chunk_results, index=group_names) result.index.names = self.grouping.names - if self._sort: - result = result.sort_index() - return result - - ngroups = len(offsets) - 1 - if ngroups > self._MAX_GROUPS_BEFORE_WARN: - warnings.warn( - f"GroupBy.apply() performance scales poorly with " - f"number of groups. Got {ngroups} groups." - ) + else: + ngroups = len(offsets) - 1 + if ngroups > self._MAX_GROUPS_BEFORE_WARN: + warnings.warn( + f"GroupBy.apply() performance scales poorly with " + f"number of groups. Got {ngroups} groups." + ) - chunks = [ - grouped_values[s:e] for s, e in zip(offsets[:-1], offsets[1:]) - ] - chunk_results = [function(chk, *args) for chk in chunks] + chunks = [ + grouped_values[s:e] for s, e in zip(offsets[:-1], offsets[1:]) + ] + chunk_results = [function(chk, *args) for chk in chunks] - if not len(chunk_results): - return self.obj.head(0) + if not len(chunk_results): + return self.obj.head(0) - if cudf.api.types.is_scalar(chunk_results[0]): - result = cudf.Series(chunk_results, index=group_names) - result.index.names = self.grouping.names - elif isinstance(chunk_results[0], cudf.Series): - if isinstance(self.obj, cudf.DataFrame): - result = cudf.concat(chunk_results, axis=1).T + if cudf.api.types.is_scalar(chunk_results[0]): + result = cudf.Series(chunk_results, index=group_names) result.index.names = self.grouping.names + elif isinstance(chunk_results[0], cudf.Series): + if isinstance(self.obj, cudf.DataFrame): + result = cudf.concat(chunk_results, axis=1).T + result.index.names = self.grouping.names + else: + result = cudf.concat(chunk_results) else: result = cudf.concat(chunk_results) - else: - result = cudf.concat(chunk_results) if self._sort: result = result.sort_index() diff --git a/python/cudf/cudf/core/udf/function.cu b/python/cudf/cudf/core/udf/function.cu index 47e3947c65a..749088f8307 100644 --- a/python/cudf/cudf/core/udf/function.cu +++ b/python/cudf/cudf/core/udf/function.cu @@ -1,7 +1,23 @@ -// Copyright (c) 2020-2022, NVIDIA CORPORATION. - -#include -#include +/* + * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +using size_type = int; // double atomicAdd __device__ __forceinline__ double atomicAdd(double* address, double val) @@ -47,10 +63,11 @@ __device__ __forceinline__ double atomicMin(double *address, double val) return __longlong_as_double(old); } -extern "C" __device__ int BlockSum_int64(int64_t *numba_return_value, int64_t *data, int64_t size) { - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockSum_int64(int64_t *numba_return_value, int64_t const *data, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; int64_t local_sum = 0; __shared__ int64_t sum; @@ -62,9 +79,9 @@ extern "C" __device__ int BlockSum_int64(int64_t *numba_return_value, int64_t *d // Calculate local sum for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - int64_t load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + int64_t load = data[tid + item * tb_size]; local_sum += load; } } @@ -78,10 +95,11 @@ extern "C" __device__ int BlockSum_int64(int64_t *numba_return_value, int64_t *d return 0; } -extern "C" __device__ int BlockSum_float64(double *numba_return_value, double *data, int64_t size) { - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockSum_float64(double *numba_return_value, double const *data, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; double local_sum = 0; __shared__ double sum; @@ -93,9 +111,9 @@ extern "C" __device__ int BlockSum_float64(double *numba_return_value, double *d // Calculate local sum for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - double load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + double load = data[tid + item * tb_size]; local_sum += load; } } @@ -110,10 +128,11 @@ extern "C" __device__ int BlockSum_float64(double *numba_return_value, double *d } -extern "C" __device__ int BlockMean_int64(double *numba_return_value, int64_t *data, int64_t size) { - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockMean_int64(double *numba_return_value, int64_t const *data, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; int64_t local_sum = 0; double mean; @@ -126,9 +145,9 @@ extern "C" __device__ int BlockMean_int64(double *numba_return_value, int64_t *d // Calculate local sum for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - int64_t load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + int64_t load = data[tid + item * tb_size]; local_sum += load; } } @@ -137,7 +156,7 @@ extern "C" __device__ int BlockMean_int64(double *numba_return_value, int64_t *d __syncthreads(); - mean = sum * 1.0 / size; + mean = sum / static_cast(size); *numba_return_value = mean; @@ -145,10 +164,11 @@ extern "C" __device__ int BlockMean_int64(double *numba_return_value, int64_t *d } -extern "C" __device__ int BlockMean_float64(double *numba_return_value, double *data, int64_t size) { - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockMean_float64(double *numba_return_value, double const *data, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; double local_sum = 0; double mean; @@ -161,9 +181,9 @@ extern "C" __device__ int BlockMean_float64(double *numba_return_value, double * // Calculate local sum for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - double load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + double load = data[tid + item * tb_size]; local_sum += load; } } @@ -172,7 +192,7 @@ extern "C" __device__ int BlockMean_float64(double *numba_return_value, double * __syncthreads(); - mean = sum * 1.0 / size; + mean = sum / static_cast(size); *numba_return_value = mean; @@ -182,10 +202,11 @@ extern "C" __device__ int BlockMean_float64(double *numba_return_value, double * -extern "C" __device__ int BlockStd_int64(double *numba_return_value, int64_t *data, int64_t size) { - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockStd_int64(double *numba_return_value, int64_t const *data, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; int64_t local_sum = 0; double local_var = 0; double mean; @@ -203,9 +224,9 @@ extern "C" __device__ int BlockStd_int64(double *numba_return_value, int64_t *da // Calculate local sum for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - int64_t load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + int64_t load = data[tid + item * tb_size]; local_sum += load; } } @@ -214,13 +235,13 @@ extern "C" __device__ int BlockStd_int64(double *numba_return_value, int64_t *da __syncthreads(); - mean = sum * 1.0 / size; + mean = sum / static_cast(size); // Calculate local sum for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - int64_t load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + int64_t load = data[tid + item * tb_size]; double temp = load - mean; temp = pow(temp, 2); local_var += temp; @@ -238,10 +259,11 @@ extern "C" __device__ int BlockStd_int64(double *numba_return_value, int64_t *da return 0; } -extern "C" __device__ int BlockStd_float64(double *numba_return_value, double *data, int64_t size) { - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockStd_float64(double *numba_return_value, double const *data, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; double local_sum = 0; double local_var = 0; double mean; @@ -259,9 +281,9 @@ extern "C" __device__ int BlockStd_float64(double *numba_return_value, double *d // Calculate local sum for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - double load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + double load = data[tid + item * tb_size]; local_sum += load; } } @@ -270,13 +292,13 @@ extern "C" __device__ int BlockStd_float64(double *numba_return_value, double *d __syncthreads(); - mean = sum * 1.0 / size; + mean = sum / static_cast(size); // Calculate local sum for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - double load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + double load = data[tid + item * tb_size]; double temp = load - mean; temp = pow(temp, 2); local_var += temp; @@ -294,10 +316,11 @@ extern "C" __device__ int BlockStd_float64(double *numba_return_value, double *d return 0; } -extern "C" __device__ int BlockVar_int64(double *numba_return_value, int64_t *data, int64_t size) { - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockVar_int64(double *numba_return_value, int64_t const *data, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; int64_t local_sum = 0; double local_var = 0; double mean; @@ -314,9 +337,9 @@ extern "C" __device__ int BlockVar_int64(double *numba_return_value, int64_t *da // Calculate local sum for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - int64_t load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + int64_t load = data[tid + item * tb_size]; local_sum += load; } } @@ -325,13 +348,13 @@ extern "C" __device__ int BlockVar_int64(double *numba_return_value, int64_t *da __syncthreads(); - mean = sum * 1.0 / size; + mean = sum / static_cast(size); // Calculate local sum for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - int64_t load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + int64_t load = data[tid + item * tb_size]; double temp = load - mean; temp = pow(temp, 2); local_var += temp; @@ -349,10 +372,11 @@ extern "C" __device__ int BlockVar_int64(double *numba_return_value, int64_t *da return 0; } -extern "C" __device__ int BlockVar_float64(double *numba_return_value, double *data, int64_t size) { - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockVar_float64(double *numba_return_value, double const *data, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; double local_sum = 0; double local_var = 0; double mean; @@ -369,9 +393,9 @@ extern "C" __device__ int BlockVar_float64(double *numba_return_value, double *d // Calculate local sum for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - double load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + double load = data[tid + item * tb_size]; local_sum += load; } } @@ -380,13 +404,13 @@ extern "C" __device__ int BlockVar_float64(double *numba_return_value, double *d __syncthreads(); - mean = sum * 1.0 / size; + mean = sum / static_cast(size); // Calculate local sum for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - double load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + double load = data[tid + item * tb_size]; double temp = load - mean; temp = pow(temp, 2); local_var += temp; @@ -407,10 +431,10 @@ extern "C" __device__ int BlockVar_float64(double *numba_return_value, double *d // Calculate maximum of the group, return the scalar extern "C" __device__ int BlockMax_int32(int *numba_return_value, int *data, int64_t size) { - - int tid = threadIdx.x; int tb_size = blockDim.x; + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; int local_max = INT_MIN; __shared__ int smax; @@ -422,9 +446,9 @@ extern "C" __device__ int BlockMax_int32(int *numba_return_value, int *data, int // Calculate local max for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - int load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + int load = data[tid + item * tb_size]; local_max = max(local_max, load); } } @@ -442,11 +466,11 @@ extern "C" __device__ int BlockMax_int32(int *numba_return_value, int *data, int } // Calculate maximum of the group, return the scalar -extern "C" __device__ int BlockMax_int64(int64_t *numba_return_value, int64_t *data, int64_t size) { - - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockMax_int64(int64_t *numba_return_value, int64_t const *data, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; int64_t local_max = INT64_MIN; __shared__ int64_t smax; @@ -458,9 +482,9 @@ extern "C" __device__ int BlockMax_int64(int64_t *numba_return_value, int64_t *d // Calculate local max for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - int64_t load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + int64_t load = data[tid + item * tb_size]; local_max = max(local_max, load); } } @@ -478,11 +502,11 @@ extern "C" __device__ int BlockMax_int64(int64_t *numba_return_value, int64_t *d } // Calculate maximum of the group, return the scalar -extern "C" __device__ int BlockMax_float64(double *numba_return_value, double *data, int64_t size) { - - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockMax_float64(double *numba_return_value, double const *data, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; double local_max = -DBL_MAX; __shared__ double smax; @@ -494,9 +518,9 @@ extern "C" __device__ int BlockMax_float64(double *numba_return_value, double *d // Calculate local max for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - double load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + double load = data[tid + item * tb_size]; local_max = max(local_max, load); } } @@ -515,10 +539,10 @@ extern "C" __device__ int BlockMax_float64(double *numba_return_value, double *d // Calculate minimum of the group, return the scalar extern "C" __device__ int BlockMin_int32(int *numba_return_value, int *data, int64_t size) { - - int tid = threadIdx.x; int tb_size = blockDim.x; + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; int local_min = INT_MAX; __shared__ int smin; @@ -529,9 +553,9 @@ extern "C" __device__ int BlockMin_int32(int *numba_return_value, int *data, int __syncthreads(); #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - int load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + int load = data[tid + item * tb_size]; local_min = min(local_min, load); } } @@ -549,11 +573,11 @@ extern "C" __device__ int BlockMin_int32(int *numba_return_value, int *data, int } // Calculate minimum of the group, return the scalar -extern "C" __device__ int BlockMin_int64(int64_t *numba_return_value, int64_t *data, int64_t size) { - - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockMin_int64(int64_t *numba_return_value, int64_t const *data, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; int64_t local_min = INT64_MAX; __shared__ int64_t smin; @@ -565,9 +589,9 @@ extern "C" __device__ int BlockMin_int64(int64_t *numba_return_value, int64_t *d // Calculate local max for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - int64_t load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + int64_t load = data[tid + item * tb_size]; local_min = min(local_min, load); } } @@ -585,11 +609,11 @@ extern "C" __device__ int BlockMin_int64(int64_t *numba_return_value, int64_t *d } // Calculate minimum of the group, return the scalar -extern "C" __device__ int BlockMin_float64(double *numba_return_value, double *data, int64_t size) { - - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockMin_float64(double *numba_return_value, double const *data, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; double local_min = DBL_MAX; __shared__ double smin; @@ -601,9 +625,9 @@ extern "C" __device__ int BlockMin_float64(double *numba_return_value, double *d // Calculate local max for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - double load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + double load = data[tid + item * tb_size]; local_min = min(local_min, load); } } @@ -621,11 +645,11 @@ extern "C" __device__ int BlockMin_float64(double *numba_return_value, double *d } // Calculate minimum of the group, return the scalar -extern "C" __device__ int BlockIdxMax_int64(int64_t *numba_return_value, int64_t *data, int64_t* index, int64_t size) { - - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockIdxMax_int64(int64_t *numba_return_value, int64_t const *data, int64_t* index, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; int64_t local_max = INT64_MIN; int64_t local_idx = -1; @@ -641,12 +665,12 @@ extern "C" __device__ int BlockIdxMax_int64(int64_t *numba_return_value, int64_t // Calculate local max for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - int64_t load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + int64_t load = data[tid + item * tb_size]; if (load > local_max) { local_max = load; - local_idx = index[tid + ITEM * tb_size]; + local_idx = index[tid + item * tb_size]; } } } @@ -670,11 +694,11 @@ extern "C" __device__ int BlockIdxMax_int64(int64_t *numba_return_value, int64_t } // Calculate minimum of the group, return the scalar -extern "C" __device__ int BlockIdxMax_float64(int64_t *numba_return_value, double *data, int64_t* index, int64_t size) { - - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockIdxMax_float64(int64_t *numba_return_value, double const *data, int64_t* index, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; double local_max = -DBL_MAX; int64_t local_idx = -1; @@ -690,12 +714,12 @@ extern "C" __device__ int BlockIdxMax_float64(int64_t *numba_return_value, doubl // Calculate local max for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - double load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + double load = data[tid + item * tb_size]; if (load > local_max) { local_max = load; - local_idx = index[tid + ITEM * tb_size]; + local_idx = index[tid + item * tb_size]; } } } @@ -719,11 +743,11 @@ extern "C" __device__ int BlockIdxMax_float64(int64_t *numba_return_value, doubl } // Calculate minimum of the group, return the scalar -extern "C" __device__ int BlockIdxMin_int64(int64_t *numba_return_value, int64_t *data, int64_t* index, int64_t size) { - - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockIdxMin_int64(int64_t *numba_return_value, int64_t const *data, int64_t* index, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; int64_t local_min = INT64_MAX; int64_t local_idx = -1; @@ -739,12 +763,12 @@ extern "C" __device__ int BlockIdxMin_int64(int64_t *numba_return_value, int64_t // Calculate local max for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - int64_t load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + int64_t load = data[tid + item * tb_size]; if (load < local_min) { local_min = load; - local_idx = index[tid + ITEM * tb_size]; + local_idx = index[tid + item * tb_size]; } } } @@ -768,11 +792,11 @@ extern "C" __device__ int BlockIdxMin_int64(int64_t *numba_return_value, int64_t } // Calculate minimum of the group, return the scalar -extern "C" __device__ int BlockIdxMin_float64(int64_t *numba_return_value, double *data, int64_t* index, int64_t size) { - - int tid = threadIdx.x; int tb_size = blockDim.x; +extern "C" __device__ int BlockIdxMin_float64(int64_t *numba_return_value, double const *data, int64_t* index, int64_t size) { + int tid = threadIdx.x; + int tb_size = blockDim.x; // Calculate how many elements each thread is working on - int ITEMS_PER_THREAD = (size + tb_size - 1) / tb_size; + auto const items_per_thread = (size + tb_size - 1) / tb_size; double local_min = DBL_MAX; int64_t local_idx = -1; @@ -788,12 +812,12 @@ extern "C" __device__ int BlockIdxMin_float64(int64_t *numba_return_value, doubl // Calculate local max for each thread #pragma unroll - for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++) { - if (tid + (ITEM * tb_size) < size) { - double load = data[tid + ITEM * tb_size]; + for (size_type item = 0; item < items_per_thread; item++) { + if (tid + (item * tb_size) < size) { + double load = data[tid + item * tb_size]; if (load < local_min) { local_min = load; - local_idx = index[tid + ITEM * tb_size]; + local_idx = index[tid + item * tb_size]; } } } diff --git a/python/cudf/cudf/core/udf/groupby_function.py b/python/cudf/cudf/core/udf/groupby_function.py index 6f8237396cb..0e835b2b9e6 100644 --- a/python/cudf/cudf/core/udf/groupby_function.py +++ b/python/cudf/cudf/core/udf/groupby_function.py @@ -816,11 +816,11 @@ def jit_groupby_apply(offsets, grouped_values, function, *args, cache=True): max_group_size = cp.diff(offsets).max() - if max_group_size >= 1024: - if ngroups < 100: - blocklim = 1024 - else: - blocklim = 256 + if max_group_size >= 1000: + # if ngroups < 100: + # blocklim = 1024 + # else: + blocklim = 256 else: blocklim = ((max_group_size + 32 - 1) / 32) * 32