Skip to content

Commit

Permalink
Antialiased line support for where reductions (#1269)
Browse files Browse the repository at this point in the history
* Dynamically create aa_stage_2_clear rather than use parallel_fill

* Move combine function definitions out of where._build_combine for reuse

* Antialiased where reductions on CPU

* Add tests
  • Loading branch information
ianthomas23 committed Aug 16, 2023
1 parent fe567d0 commit ea163e9
Show file tree
Hide file tree
Showing 7 changed files with 373 additions and 150 deletions.
58 changes: 41 additions & 17 deletions datashader/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from .antialias import AntialiasCombination
from .reductions import SpecialColumn, UsesCudaMutex, by, category_codes, summary
from .utils import (isnull, ngjit, parallel_fill,
from .utils import (isnull, ngjit,
nanmax_in_place, nanmin_in_place, nansum_in_place, nanfirst_in_place, nanlast_in_place,
nanmax_n_in_place_3d, nanmax_n_in_place_4d, nanmin_n_in_place_3d, nanmin_n_in_place_4d,
nanfirst_n_in_place_3d, nanfirst_n_in_place_4d, nanlast_n_in_place_3d, nanlast_n_in_place_4d,
Expand Down Expand Up @@ -113,7 +113,7 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False, parti
else:
array_module = np
antialias_stage_2 = antialias_stage_2(array_module)
antialias_stage_2_funcs = make_antialias_stage_2_functions(antialias_stage_2)
antialias_stage_2_funcs = make_antialias_stage_2_functions(antialias_stage_2, bases, cuda, partitioned)
else:
self_intersect = False
antialias_stage_2 = False
Expand Down Expand Up @@ -148,9 +148,9 @@ def _get_antialias_stage_2_combine_func(combination: AntialiasCombination, zero:
n_reduction: bool, categorical: bool):
if n_reduction:
if zero == -1:
if combination == AntialiasCombination.MAX:
if combination in (AntialiasCombination.MAX, AntialiasCombination.LAST):
return row_max_n_in_place_4d if categorical else row_max_n_in_place_3d
elif combination == AntialiasCombination.MIN:
elif combination in (AntialiasCombination.MIN, AntialiasCombination.FIRST):
return row_min_n_in_place_4d if categorical else row_min_n_in_place_3d
else:
raise NotImplementedError
Expand All @@ -170,9 +170,9 @@ def _get_antialias_stage_2_combine_func(combination: AntialiasCombination, zero:
# 2D (ny, nx) if categorical is False. The same combination functions can be for both
# as all elements are independent.
if zero == -1:
if combination == AntialiasCombination.MAX:
if combination in (AntialiasCombination.MAX, AntialiasCombination.LAST):
return row_max_in_place
elif combination == AntialiasCombination.MIN:
elif combination in (AntialiasCombination.MIN, AntialiasCombination.FIRST):
return row_min_in_place
else:
raise NotImplementedError
Expand All @@ -189,18 +189,25 @@ def _get_antialias_stage_2_combine_func(combination: AntialiasCombination, zero:
return nansum_in_place


def make_antialias_stage_2_functions(antialias_stage_2):
def make_antialias_stage_2_functions(antialias_stage_2, bases, cuda, partitioned):
aa_combinations, aa_zeroes, aa_n_reductions, aa_categorical = antialias_stage_2

# Accumulate functions.
funcs = [_get_antialias_stage_2_combine_func(comb, zero, n_red, cat) for comb, zero, n_red, cat
in zip(aa_combinations, aa_zeroes, aa_n_reductions, aa_categorical)]

base_is_where = [b.is_where() for b in bases]
next_base_is_where = base_is_where[1:] + [False]

namespace = {}
namespace["literal_unroll"] = literal_unroll
for func in set(funcs):
namespace[func.__name__] = func

# Generator of unique names for combine functions
names = (f"combine{i}" for i in count())

# aa_stage_2_accumulate
lines = [
"def aa_stage_2_accumulate(aggs_and_copies, first_pass):",
# Don't need to accumulate if first_pass, just copy (opposite of aa_stage_2_copy_back)
Expand All @@ -209,21 +216,38 @@ def make_antialias_stage_2_functions(antialias_stage_2):
" a[1][:] = a[0][:]",
" else:",
]
for i, func in enumerate(funcs):
lines.append(f" {func.__name__}(aggs_and_copies[{i}][1], aggs_and_copies[{i}][0])")

for i, (func, is_where, next_is_where) in enumerate(zip(funcs, base_is_where, next_base_is_where)):
if is_where:
where_reduction = bases[i]
if isinstance(where_reduction, by):
where_reduction = where_reduction.reduction

combine = where_reduction._combine_callback(cuda, partitioned, aa_categorical[i])
name = next(names) # Unique name
namespace[name] = combine

lines.append(f" {name}(aggs_and_copies[{i}][::-1], aggs_and_copies[{i-1}][::-1])")
elif next_is_where:
# This is dealt with as part of the following base which is a where reduction.
pass
else:
lines.append(f" {func.__name__}(aggs_and_copies[{i}][1], aggs_and_copies[{i}][0])")
code = "\n".join(lines)
exec(code, namespace)
aa_stage_2_accumulate = ngjit(namespace["aa_stage_2_accumulate"])

@ngjit
def aa_stage_2_clear(aggs_and_copies):
k = 0
# Numba access to heterogeneous tuples is only permitted using literal_unroll.
for agg_and_copy in literal_unroll(aggs_and_copies):
parallel_fill(agg_and_copy[0], aa_zeroes[k])
k += 1
# aa_stage_2_clear
if np.any(np.isnan(aa_zeroes)):
namespace["nan"] = np.nan

lines = ["def aa_stage_2_clear(aggs_and_copies):"]
for i, aa_zero in enumerate(aa_zeroes):
lines.append(f" aggs_and_copies[{i}][0].fill({aa_zero})")
code = "\n".join(lines)
exec(code, namespace)
aa_stage_2_clear = ngjit(namespace["aa_stage_2_clear"])

# aa_stage_2_copy_back
@ngjit
def aa_stage_2_copy_back(aggs_and_copies):
# Numba access to heterogeneous tuples is only permitted using literal_unroll.
Expand Down
2 changes: 1 addition & 1 deletion datashader/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ def line(self, source, x=None, y=None, agg=None, axis=0, geometry=None,
if not isinstance(non_cat_agg, (
rd.any, rd.count, rd.max, rd.min, rd.sum, rd.summary, rd._sum_zero,
rd._first_or_last, rd.mean, rd.max_n, rd.min_n, rd._first_n_or_last_n,
rd._max_or_min_row_index, rd._max_n_or_min_n_row_index
rd._max_or_min_row_index, rd._max_n_or_min_n_row_index, rd.where,
)):
raise NotImplementedError(
f"{type(non_cat_agg)} reduction not implemented for antialiased lines")
Expand Down
74 changes: 37 additions & 37 deletions datashader/reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ def _build_append(self, dshape, schema, cuda, antialias, self_intersect):
else:
return self._append

def _build_combine(self, dshape, antialias, cuda, partitioned):
def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False):
return self._combine

def _build_finalize(self, dshape):
Expand Down Expand Up @@ -627,7 +627,7 @@ def _append_no_field_cuda(x, y, agg):
nb_cuda.atomic.add(agg, (y, x), 1)
return 0

def _build_combine(self, dshape, antialias, cuda, partitioned):
def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False):
if antialias:
return self._combine_antialias
else:
Expand Down Expand Up @@ -751,8 +751,8 @@ def _build_bases(self, cuda, partitioned):
def _build_append(self, dshape, schema, cuda, antialias, self_intersect):
return self.reduction._build_append(dshape, schema, cuda, antialias, self_intersect)

def _build_combine(self, dshape, antialias, cuda, partitioned):
return self.reduction._build_combine(dshape, antialias, cuda, partitioned)
def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False):
return self.reduction._build_combine(dshape, antialias, cuda, partitioned, True)

def _build_combine_temps(self, cuda, partitioned):
return self.reduction._build_combine_temps(cuda, partitioned)
Expand Down Expand Up @@ -820,7 +820,7 @@ def _append_no_field_antialias(x, y, agg, aa_factor):
_append_cuda =_append
_append_no_field_cuda = _append_no_field

def _build_combine(self, dshape, antialias, cuda, partitioned):
def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False):
if antialias:
return self._combine_antialias
else:
Expand Down Expand Up @@ -1596,7 +1596,7 @@ def _append_cuda(x, y, agg, field):
return i
return -1

def _build_combine(self, dshape, antialias, cuda, partitioned):
def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False):
if cuda:
return self._combine_cuda
else:
Expand Down Expand Up @@ -1676,7 +1676,7 @@ def _append_cuda(x, y, agg, field):
return i
return -1

def _build_combine(self, dshape, antialias, cuda, partitioned):
def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False):
if cuda:
return self._combine_cuda
else:
Expand Down Expand Up @@ -1883,7 +1883,12 @@ def _build_bases(self, cuda, partitioned):
else:
return selector._build_bases(cuda, partitioned) + super()._build_bases(cuda, partitioned)

def _build_combine(self, dshape, antialias, cuda, partitioned):
def _combine_callback(self, cuda, partitioned, categorical):
# Used by:
# 1) where._build_combine()) below, the usual mechanism for combining aggs from
# different dask partitions.
# 2) make_antialias_stage_2_functions() in compiler.py to perform stage 2 combine
# of antialiased aggs.
selector = self.selector
is_n_reduction = isinstance(selector, FloatingNReduction)
if cuda:
Expand Down Expand Up @@ -1989,38 +1994,33 @@ def combine_cuda_n_4d(aggs, selector_aggs):
break
cuda_shift_and_insert(aggs[0][y, x, cat], aggs[1][y, x, cat, i], update_index)

def wrapped_combine(aggs, selector_aggs):
ret = aggs[0], selector_aggs[0]
ndim = aggs[0].ndim
if is_n_reduction:
# ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n)
if cuda:
return combine_cuda_n_4d if categorical else combine_cuda_n_3d
else:
return combine_cpu_n_4d if categorical else combine_cpu_n_3d
else:
# ndim is either 2 (ny, nx) or 3 (ny, nx, ncat)
if cuda:
return combine_cuda_3d if categorical else combine_cuda_2d
else:
return combine_cpu_3d if categorical else combine_cpu_2d

def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False):
combine = self._combine_callback(cuda, partitioned, categorical)

def wrapped_combine(aggs, selector_aggs):
if len(aggs) == 1:
pass
elif is_n_reduction:
# ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n)
if cuda:
if ndim == 3:
combine_cuda_n_3d[cuda_args(aggs[0].shape[:2])](aggs, selector_aggs)
else:
combine_cuda_n_4d[cuda_args(aggs[0].shape[:3])](aggs, selector_aggs)
else:
if ndim == 3:
combine_cpu_n_3d(aggs, selector_aggs)
else:
combine_cpu_n_4d(aggs, selector_aggs)
elif cuda:
is_n_reduction = isinstance(self.selector, FloatingNReduction)
shape = aggs[0].shape[:-1] if is_n_reduction else aggs[0].shape
combine[cuda_args(shape)](aggs, selector_aggs)
else:
# ndim is either 2 (ny, nx) or 3 (ny, nx, ncat)
if cuda:
if ndim == 2:
combine_cuda_2d[cuda_args(aggs[0].shape)](aggs, selector_aggs)
else:
combine_cuda_3d[cuda_args(aggs[0].shape)](aggs, selector_aggs)
else:
if ndim == 2:
combine_cpu_2d(aggs, selector_aggs)
else:
combine_cpu_3d(aggs, selector_aggs)
combine(aggs, selector_aggs)

return ret
return aggs[0], selector_aggs[0]

return wrapped_combine

Expand Down Expand Up @@ -2226,7 +2226,7 @@ def _append_cuda(x, y, agg, field):
return 0
return -1

def _build_combine(self, dshape, antialias, cuda, partitioned):
def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False):
if cuda:
return self._combine_cuda
else:
Expand Down Expand Up @@ -2270,7 +2270,7 @@ def uses_cuda_mutex(self) -> UsesCudaMutex:
def uses_row_index(self, cuda, partitioned):
return True

def _build_combine(self, dshape, antialias, cuda, partitioned):
def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False):
if cuda:
return self._combine_cuda
else:
Expand Down
Loading

0 comments on commit ea163e9

Please sign in to comment.