Skip to content

Commit

Permalink
Code cleanup #1
Browse files Browse the repository at this point in the history
  • Loading branch information
bwyogatama committed Aug 19, 2022
1 parent 8659149 commit b7ede43
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 178 deletions.
54 changes: 23 additions & 31 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ def __init__(
sort=False,
as_index=True,
dropna=True,
engine="nonjit",
cache=True,
):
"""
Expand Down Expand Up @@ -119,8 +118,6 @@ def __init__(
self._level = level
self._sort = sort
self._dropna = dropna
self._engine = engine
self._cache = cache

if isinstance(by, _Grouping):
by._obj = self.obj
Expand Down Expand Up @@ -551,7 +548,7 @@ def pipe(self, func, *args, **kwargs):
"""
return cudf.core.common.pipe(self, func, *args, **kwargs)

def apply(self, function, *args, engine="nonjit", cache=True):
def apply(self, function, *args, engine=None, cache=True):
"""Apply a python transformation function over the grouped chunk.
Parameters
Expand Down Expand Up @@ -620,44 +617,39 @@ def mult(df):
raise TypeError(f"type {type(function)} is not callable")
group_names, offsets, _, grouped_values = self._grouped()

self._engine = engine
self._cache = cache
if self._engine == "jit":
if engine == "numba":
chunk_results = jit_groupby_apply(
offsets, grouped_values, function, *args, cache=cache
)
result = cudf.Series(chunk_results, index=group_names)
result.index.names = self.grouping.names
if self._sort:
result = result.sort_index()
return result

ngroups = len(offsets) - 1
if ngroups > self._MAX_GROUPS_BEFORE_WARN:
warnings.warn(
f"GroupBy.apply() performance scales poorly with "
f"number of groups. Got {ngroups} groups."
)
else:
ngroups = len(offsets) - 1
if ngroups > self._MAX_GROUPS_BEFORE_WARN:
warnings.warn(
f"GroupBy.apply() performance scales poorly with "
f"number of groups. Got {ngroups} groups."
)

chunks = [
grouped_values[s:e] for s, e in zip(offsets[:-1], offsets[1:])
]
chunk_results = [function(chk, *args) for chk in chunks]
chunks = [
grouped_values[s:e] for s, e in zip(offsets[:-1], offsets[1:])
]
chunk_results = [function(chk, *args) for chk in chunks]

if not len(chunk_results):
return self.obj.head(0)
if not len(chunk_results):
return self.obj.head(0)

if cudf.api.types.is_scalar(chunk_results[0]):
result = cudf.Series(chunk_results, index=group_names)
result.index.names = self.grouping.names
elif isinstance(chunk_results[0], cudf.Series):
if isinstance(self.obj, cudf.DataFrame):
result = cudf.concat(chunk_results, axis=1).T
if cudf.api.types.is_scalar(chunk_results[0]):
result = cudf.Series(chunk_results, index=group_names)
result.index.names = self.grouping.names
elif isinstance(chunk_results[0], cudf.Series):
if isinstance(self.obj, cudf.DataFrame):
result = cudf.concat(chunk_results, axis=1).T
result.index.names = self.grouping.names
else:
result = cudf.concat(chunk_results)
else:
result = cudf.concat(chunk_results)
else:
result = cudf.concat(chunk_results)

if self._sort:
result = result.sort_index()
Expand Down
Loading

0 comments on commit b7ede43

Please sign in to comment.