Skip to content

Commit

Permalink
refactor(transform): add a ReduceVar class and refactor ReduceBase
Browse files Browse the repository at this point in the history
  • Loading branch information
ljgray committed May 2, 2023
1 parent bff7776 commit 2e64493
Showing 1 changed file with 112 additions and 206 deletions.
318 changes: 112 additions & 206 deletions draco/analysis/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -1399,56 +1399,32 @@ def _apply_sel(arr, sel, axis):
return out


class Reduce(task.SingleTask):
"""Apply a reduction operation across specific axes and datasets.
class ReduceBase(task.SingleTask):
"""Apply a weighted reduction operation across specific axes.
This can apply any numpy reduction operation across multiple axes
of multiple datasets. Downselections can be applied to any axes
before applying the reduction. Any datasets not included in the
reduction are not initialized or preserved.
This is non-functional without overriding the `reduction` method.
If a dataset is distributed, there must be at least one axis not
included in the downselection and one axis not included in the
reduction, although it does not have to be the same axis.
There must be at least one axis not included in the reduction.
Attributes
----------
axes : list
Axis names to apply the reduction to
datasets : list
Dataset names to reduce.
selections : dict, optional
Dict formatted as {<ax_name>:<selection>,} for any axes
that should be downselected before applying the reduction.
Selections can be either slices or a list/tuple of indices.
op : str
Reduction operation to apply. Must be the name of a numpy
attribute, ex. "var". Default is "mean".
weight_op : str
Reduction operation to apply to the weights. Must be the name of a numpy
attribute, ex. "var". When using a boolean operation, such as "all" or "any",
the operation is applied to the boolean array (weights > 0.0). Default is "any".
apply_weight : list, optional
Apply the weights to any dataset in this list by multiplying by the weights.
The downselected weights must be broadcastable to these datasets.
mask_array : bool, optional
If true, use a numpy masked array when reducing. The mask is the boolean array
~(weights > 0.0). This can be used in conjunction with `apply_weight`. Default
is False.
dataset : str
Dataset name to reduce.
weighting : str
Which type of weighting to use, if applicable. Options are "none",
"masked", or "weighted"
distribute_over : str, optional
Optionally prefer to redistribute over a specific axis. This should be
the name of the axis. By default, the best possible axis will be selected
based on the selections to be applied.
"""

axes = config.Property(proptype=list)
datasets = config.Property(proptype=list)
selections = config.Property(proptype=dict, default={})
op = config.Property(proptype=str, default="mean")
weight_op = config.Property(proptype=str, default="any")
apply_weight = config.Property(proptype=list, default=[])
mask_array = config.Property(proptype=bool, default=False)

def setup(self):
"""Get the actual reduction ops and make sure they exist."""
self._op_func = getattr(np, self.op)
self._weight_op_func = getattr(np, self.weight_op)
dataset = config.Property(proptype=str)
weighting = config.enum(["none", "masked", "weighted"], default="none")
distribute_over = config.Property(proptype=str, default=None)

def process(self, data: containers.ContainerBase) -> containers.ContainerBase:
"""Downselect and apply the reduction operation to the data.
Expand All @@ -1471,58 +1447,32 @@ def process(self, data: containers.ContainerBase) -> containers.ContainerBase:
data.index_map.keys(), key=lambda x: len(data.index_map[x]), reverse=True
)

def _select_dist_axes(options: list | tuple, current_ax: str) -> tuple[int]:
"""Select the best distributed axes for selection and reduction."""
sel_ax_id, red_ax_id = None, None
def _select_dist_axis(options: list | tuple, current_ax: str) -> tuple[int]:
"""Select the best distributed axis for selection."""
# If possible, use the user-specified axis
if self.distribute_over in options and self.distribute_over != current_ax:
return options.index(self.distribute_over)

new_ax_id = None

# first, check if we can just use the current axis
if current_ax not in self.selections:
sel_ax_id = options.index(current_ax)

if current_ax not in self.axes:
red_ax_id = options.index(current_ax)

# If we can't use the current axis here, walk through until
# we find the highest priority axis that we can use
if sel_ax_id is None:
for ax in ax_priority:
if ax not in options:
continue
if ax not in self.selections:
# Set the first axis to distribute across
sel_ax_id = options.index(ax)
# If possible, use this axis again
if ax not in self.axes and red_ax_id is None:
red_ax_id = sel_ax_id
break

# If we stil haven't found a good axis for the second step,
# walk through the priority tree again until we find one
if red_ax_id is None:
new_ax_id = options.index(current_ax)
else:
for ax in ax_priority:
if ax not in options:
continue

if ax not in self.axes:
red_ax_id = options.index(ax)
new_ax_id = options.index(ax)
break

# If we couldn't find a valid axis for at least one step,
# throw an exception
if sel_ax_id is None or red_ax_id is None:
# If we couldn't find a valid axis, throw an exception
if new_ax_id is None:
raise ValueError("Could not find an axis to distribute across.")

return sel_ax_id, red_ax_id

def _apply_sel(arr, sel, axis):
"""Apply a selection to a single axis of an array."""
if type(sel) is slice:
sel = (slice(None),) * axis + (sel,)
return arr[sel]
elif type(sel) in {list, tuple}:
return np.take(arr, sel, axis=axis)
return new_ax_id

def _select_red_axes(ds_axes: list | tuple) -> tuple[int]:
def _get_axis_indices(ds_axes: list | tuple) -> tuple[int]:
"""Get the indices of the axes that we want to reduce over.
The dataset must contain all the axes that we want to apply
Expand All @@ -1533,53 +1483,65 @@ def _select_red_axes(ds_axes: list | tuple) -> tuple[int]:
try:
apply_over.append(ds_axes.index(ax))
except ValueError as e:
raise ValueError(f"Axis {ax} not found in dataset {name}.") from e
raise ValueError(
f"Axis {ax} not found in dataset {self.dataset}."
) from e

return tuple(apply_over)

def _downselect(ds):
"""Apply downselection to a dataset, redistributing as required.
out = self.make_output_container(data)
out.add_dataset(self.dataset)

# Get the dataset
ds = data.datasets[self.dataset]
# Get a view of the array and its axis
arr = ds[:]
original_ax_id = arr.axis

# Get the axis indices to apply the operation over
ds_axes = list(ds.attrs["axis"])
# Get the new axis to distribute over
new_ax_id = _select_dist_axis(ds_axes, ds_axes[original_ax_id])
new_ax_name = ds_axes[new_ax_id]

# Redistribute the array to the new axis
arr = arr.redistribute(new_ax_id)
# Distribute the output container over the target axis
out.redistribute(new_ax_name)

# Get the weights
if hasattr(data, "weight"):
weight = data.weight[:]
# The weights should be distributed over the same axis as the array,
# even if they don't share all the same axes
new_weight_ax = list(data.weight.attrs["axis"]).index(new_ax_name)
weight = weight.redistribute(new_weight_ax)
else:
self.log.info("No weights available. Using equal weighting.")
weight = np.ones_like(arr)

If this is a distributed dataset, the output array will be distributed
to the best possible axis for the subsequent reduction.
"""
ds_axes = ds.attrs["axis"]
arr = ds[:]
# Apply the reduction, ensuring that the weights have the correct dimension
weight = np.broadcast_to(weight, arr.shape, subok=False)
apply_over = _get_axis_indices(ds_axes)
reduced, reduced_weight = self.reduction(arr.local_array[:], weight, apply_over)

if ds.distributed:
# Select the best axes to redistribute to
# _select_axes takes the name of the current axis, not the index
original_ax_id = ds.distributed_axis
sel_ax_id, red_ax_id = _select_dist_axes(
ds_axes, ds_axes[original_ax_id]
)
# Redistribute to the axis used for downselection
arr = arr.redistribute(sel_ax_id)
# Add the reduced data and redistribute the container back to the
# original axis
out[self.dataset][:] = reduced[:]

# Apply downselections to the dataset
for ax, sel in self.selections.items():
try:
ax_ind = ds_axes.index(ax)
except ValueError:
continue
arr = _apply_sel(arr, sel, ax_ind)
if hasattr(out, "weight"):
out.weight[:] = reduced_weight[:]

if ds.distributed:
# Distribute to the axis used for reduction
arr = arr.redistribute(red_ax_id)
# Redistribute bcak to the original axis
out.redistribute(original_ax_id)

return arr
return out

# Figure out the axes for the new container
# Apply the downselections to each axis index_map
output_axes = {
ax: _apply_sel(data.index_map[ax], sel, 0)
for ax, sel in self.selections.items()
}
# Further reduce any axis indices that will be reduced
reduced_axes = {ax: np.array([-1], dtype=int) for ax in self.axes}
# Overwrite any axes which will be reduced after selections are made
output_axes.update(reduced_axes)
def make_output_container(
self, data: containers.ContainerBase
) -> containers.ContainerBase:
"""Create the output container."""
output_axes = {ax: np.array([-1], dtype=int) for ax in self.axes}

# Create the output container without initializing any datasets.
# Add some extra metadata about which axes were reduced and which
Expand All @@ -1588,108 +1550,52 @@ def _downselect(ds):
axes_from=data, attrs_from=data, skip_datasets=True, **output_axes
)
out.attrs["reduced"] = True
out.attrs["reduction_op"] = self.op
out.attrs["weight_reduction_op"] = self.weight_op
out.attrs["reduction_axes"] = np.array(self.axes)
out.attrs["downselections"] = self.selections
out.attrs["reduced_datasets"] = np.array(self.datasets)
out.attrs["reduced_dataset"] = self.dataset

out = self.add_output_attrs(out)

# Initialize the weight dataset
if "weight" in data.datasets:
out.add_dataset("weight")
elif "vis_weight" in data.datasets:
out.add_dataset("vis_weight")

# Set up the new weights
if hasattr(out, "weight"):
# Get the current distributed axis, if it is distributed
if data.weight.distributed:
original_ax_id = data.weight.distributed_axis

weight_axes = data.weight.attrs["axis"]
# Get the axes to reduce over
apply_over = _select_red_axes(weight_axes)
# Apply downselections and redistribute for reduction
weight = _downselect(data.weight)

if self.weight_op in {"all", "any"}:
# Convert to boolean before applying the op
w = weight > 0.0
else:
w = weight
return out

# Apply the weight reduction
ds_weight = self._weight_op_func(w, axis=apply_over, keepdims=True)
def add_output_attrs(
self, cont: containers.ContainerBase
) -> containers.ContainerBase:
"""Overwrite to add any additional attributes to the output."""
return cont

if out.weight.distributed:
# Distribute back to the original axis
ds_weight = ds_weight.redistribute(original_ax_id)
def reduction(
self, arr: np.ndarray, weight: np.ndarray, axis: tuple
) -> tuple[np.ndarray, np.ndarray]:
"""Overwrite to implement the reductino operation."""
raise NotImplementedError

out.weight[:] = ds_weight[:]
else:
# Flat weight that can broadcast to whatever shape
self.log.info("No weights available. Using equal weighting.")
weight = np.ones(1, dtype=bool)

# Iterate over the datasets and reduce
for name in self.datasets:
# Get the dataset
ds = data.datasets[name]
# Initialize the dataset in the output container
out.add_dataset(name)
# Get the current distributed axis of this dataset, if it
# is distributed
if ds.distributed:
original_ax_id = ds.distributed_axis

# Get the axes for this dataset
ds_axes = ds.attrs["axis"]
# Get the axes in this dataset to reduce over
apply_over = _select_red_axes(ds_axes)
# Apply downselections and redistribute
arr = _downselect(ds)
# If arr is distributed, get its axis and a numpy view
if isinstance(arr, mpiarray.MPIArray):
# Get the new distributed axis
new_ax = arr.axis
arr = arr.local_array[:]

if name in self.apply_weight or self.mask_array:
# If we need to use the weights, make sure they're distributed
# to the correct axis and can broadcast to this array
if isinstance(weight, mpiarray.MPIArray):
# Redistribute to the equivalent axis in the weights
ax_name = ds_axes[new_ax]
new_weight_ax = weight_axes.index(ax_name)
weight = weight.redistribute(new_weight_ax)

# Broadcast the weights to the array shape
# MPIArrays aren't preserved properly, so must return a ndarray
try:
mask = np.broadcast_to(weight, arr.shape, subok=False)
except ValueError as e:
raise ValueError(
f"Could not broadcast weights. Got dataset {name} with "
f"shape {arr.shape} and weight with shape {weight.shape}."
) from e
class ReduceVar(ReduceBase):
"""Take the weighted variance of a container."""

if name in self.apply_weight:
# Apply the weights to the dataset
arr = arr * mask
def add_output_attrs(self, cont):
"""Add an attribute about the operation being done."""
cont.attrs["reduction_op"] = "variance"

# Apply the optional mask and reduction op
if self.mask_array:
# Use a masked array, ignoring values where the weights are zero
arr = np.ma.array(arr, mask=~(mask > 0.0))
reduced = self._op_func(arr, axis=apply_over, keepdims=True).data
else:
reduced = self._op_func(arr, axis=apply_over, keepdims=True)
return cont

if ds.distributed:
# Redistribute back to the original axis
reduced = mpiarray.MPIArray.wrap(reduced, axis=new_ax)
reduced = reduced.redistribute(original_ax_id)
def reduction(self, arr, weight, axis):
"""Apply a weighted variance."""
if self.weighting == "none":
return np.var(arr, axis=axis, keepdims=True)

out[name][:] = reduced[:]
if self.weighting == "masked":
weight = (weight > 0).astype(weight.dtype)

return out
ws = weight.sum(axis=axis, keepdims=True) / np.prod(np.take(weight.shape, axis))
weight = weight * ws

v = np.var(weight**0.5 * arr, axis=axis, keepdims=True)

return v, np.ones_like(v)

0 comments on commit 2e64493

Please sign in to comment.