From 2e64493b5e6bb14805ba413518fdbb7a90e7a8e4 Mon Sep 17 00:00:00 2001 From: ljgray Date: Mon, 1 May 2023 17:47:21 -0700 Subject: [PATCH] refactor(transform): add a ReduceVar class and refactor ReduceBase --- draco/analysis/transform.py | 318 +++++++++++++----------------------- 1 file changed, 112 insertions(+), 206 deletions(-) diff --git a/draco/analysis/transform.py b/draco/analysis/transform.py index 7833235c3..30402b5ba 100644 --- a/draco/analysis/transform.py +++ b/draco/analysis/transform.py @@ -1399,56 +1399,32 @@ def _apply_sel(arr, sel, axis): return out -class Reduce(task.SingleTask): - """Apply a reduction operation across specific axes and datasets. +class ReduceBase(task.SingleTask): + """Apply a weighted reduction operation across specific axes. - This can apply any numpy reduction operation across multiple axes - of multiple datasets. Downselections can be applied to any axes - before applying the reduction. Any datasets not included in the - reduction are not initialized or preserved. + This is non-functional without overriding the `reduction` method. - If a dataset is distributed, there must be at least one axis not - included in the downselection and one axis not included in the - reduction, although it does not have to be the same axis. + There must be at least one axis not included in the reduction. Attributes ---------- axes : list Axis names to apply the reduction to - datasets : list - Dataset names to reduce. - selections : dict, optional - Dict formatted as {:,} for any axes - that should be downselected before applying the reduction. - Selections can be either slices or a list/tuple of indices. - op : str - Reduction operation to apply. Must be the name of a numpy - attribute, ex. "var". Default is "mean". - weight_op : str - Reduction operation to apply to the weights. Must be the name of a numpy - attribute, ex. "var". When using a boolean operation, such as "all" or "any", - the operation is applied to the boolean array (weights > 0.0). Default is "any". - apply_weight : list, optional - Apply the weights to any dataset in this list by multiplying by the weights. - The downselected weights must be broadcastable to these datasets. - mask_array : bool, optional - If true, use a numpy masked array when reducing. The mask is the boolean array - ~(weights > 0.0). This can be used in conjunction with `apply_weight`. Default - is False. + dataset : str + Dataset name to reduce. + weighting : str + Which type of weighting to use, if applicable. Options are "none", + "masked", or "weighted" + distribute_over : str, optional + Optionally prefer to redistribute over a specific axis. This should be + the name of the axis. By default, the best possible axis will be selected + based on the selections to be applied. """ axes = config.Property(proptype=list) - datasets = config.Property(proptype=list) - selections = config.Property(proptype=dict, default={}) - op = config.Property(proptype=str, default="mean") - weight_op = config.Property(proptype=str, default="any") - apply_weight = config.Property(proptype=list, default=[]) - mask_array = config.Property(proptype=bool, default=False) - - def setup(self): - """Get the actual reduction ops and make sure they exist.""" - self._op_func = getattr(np, self.op) - self._weight_op_func = getattr(np, self.weight_op) + dataset = config.Property(proptype=str) + weighting = config.enum(["none", "masked", "weighted"], default="none") + distribute_over = config.Property(proptype=str, default=None) def process(self, data: containers.ContainerBase) -> containers.ContainerBase: """Downselect and apply the reduction operation to the data. @@ -1471,58 +1447,32 @@ def process(self, data: containers.ContainerBase) -> containers.ContainerBase: data.index_map.keys(), key=lambda x: len(data.index_map[x]), reverse=True ) - def _select_dist_axes(options: list | tuple, current_ax: str) -> tuple[int]: - """Select the best distributed axes for selection and reduction.""" - sel_ax_id, red_ax_id = None, None + def _select_dist_axis(options: list | tuple, current_ax: str) -> tuple[int]: + """Select the best distributed axis for selection.""" + # If possible, use the user-specified axis + if self.distribute_over in options and self.distribute_over != current_ax: + return options.index(self.distribute_over) + + new_ax_id = None # first, check if we can just use the current axis - if current_ax not in self.selections: - sel_ax_id = options.index(current_ax) - if current_ax not in self.axes: - red_ax_id = options.index(current_ax) - - # If we can't use the current axis here, walk through until - # we find the highest priority axis that we can use - if sel_ax_id is None: - for ax in ax_priority: - if ax not in options: - continue - if ax not in self.selections: - # Set the first axis to distribute across - sel_ax_id = options.index(ax) - # If possible, use this axis again - if ax not in self.axes and red_ax_id is None: - red_ax_id = sel_ax_id - break - - # If we stil haven't found a good axis for the second step, - # walk through the priority tree again until we find one - if red_ax_id is None: + new_ax_id = options.index(current_ax) + else: for ax in ax_priority: if ax not in options: continue - if ax not in self.axes: - red_ax_id = options.index(ax) + new_ax_id = options.index(ax) break - # If we couldn't find a valid axis for at least one step, - # throw an exception - if sel_ax_id is None or red_ax_id is None: + # If we couldn't find a valid axis, throw an exception + if new_ax_id is None: raise ValueError("Could not find an axis to distribute across.") - return sel_ax_id, red_ax_id - - def _apply_sel(arr, sel, axis): - """Apply a selection to a single axis of an array.""" - if type(sel) is slice: - sel = (slice(None),) * axis + (sel,) - return arr[sel] - elif type(sel) in {list, tuple}: - return np.take(arr, sel, axis=axis) + return new_ax_id - def _select_red_axes(ds_axes: list | tuple) -> tuple[int]: + def _get_axis_indices(ds_axes: list | tuple) -> tuple[int]: """Get the indices of the axes that we want to reduce over. The dataset must contain all the axes that we want to apply @@ -1533,53 +1483,65 @@ def _select_red_axes(ds_axes: list | tuple) -> tuple[int]: try: apply_over.append(ds_axes.index(ax)) except ValueError as e: - raise ValueError(f"Axis {ax} not found in dataset {name}.") from e + raise ValueError( + f"Axis {ax} not found in dataset {self.dataset}." + ) from e return tuple(apply_over) - def _downselect(ds): - """Apply downselection to a dataset, redistributing as required. + out = self.make_output_container(data) + out.add_dataset(self.dataset) + + # Get the dataset + ds = data.datasets[self.dataset] + # Get a view of the array and its axis + arr = ds[:] + original_ax_id = arr.axis + + # Get the axis indices to apply the operation over + ds_axes = list(ds.attrs["axis"]) + # Get the new axis to distribute over + new_ax_id = _select_dist_axis(ds_axes, ds_axes[original_ax_id]) + new_ax_name = ds_axes[new_ax_id] + + # Redistribute the array to the new axis + arr = arr.redistribute(new_ax_id) + # Distribute the output container over the target axis + out.redistribute(new_ax_name) + + # Get the weights + if hasattr(data, "weight"): + weight = data.weight[:] + # The weights should be distributed over the same axis as the array, + # even if they don't share all the same axes + new_weight_ax = list(data.weight.attrs["axis"]).index(new_ax_name) + weight = weight.redistribute(new_weight_ax) + else: + self.log.info("No weights available. Using equal weighting.") + weight = np.ones_like(arr) - If this is a distributed dataset, the output array will be distributed - to the best possible axis for the subsequent reduction. - """ - ds_axes = ds.attrs["axis"] - arr = ds[:] + # Apply the reduction, ensuring that the weights have the correct dimension + weight = np.broadcast_to(weight, arr.shape, subok=False) + apply_over = _get_axis_indices(ds_axes) + reduced, reduced_weight = self.reduction(arr.local_array[:], weight, apply_over) - if ds.distributed: - # Select the best axes to redistribute to - # _select_axes takes the name of the current axis, not the index - original_ax_id = ds.distributed_axis - sel_ax_id, red_ax_id = _select_dist_axes( - ds_axes, ds_axes[original_ax_id] - ) - # Redistribute to the axis used for downselection - arr = arr.redistribute(sel_ax_id) + # Add the reduced data and redistribute the container back to the + # original axis + out[self.dataset][:] = reduced[:] - # Apply downselections to the dataset - for ax, sel in self.selections.items(): - try: - ax_ind = ds_axes.index(ax) - except ValueError: - continue - arr = _apply_sel(arr, sel, ax_ind) + if hasattr(out, "weight"): + out.weight[:] = reduced_weight[:] - if ds.distributed: - # Distribute to the axis used for reduction - arr = arr.redistribute(red_ax_id) + # Redistribute bcak to the original axis + out.redistribute(original_ax_id) - return arr + return out - # Figure out the axes for the new container - # Apply the downselections to each axis index_map - output_axes = { - ax: _apply_sel(data.index_map[ax], sel, 0) - for ax, sel in self.selections.items() - } - # Further reduce any axis indices that will be reduced - reduced_axes = {ax: np.array([-1], dtype=int) for ax in self.axes} - # Overwrite any axes which will be reduced after selections are made - output_axes.update(reduced_axes) + def make_output_container( + self, data: containers.ContainerBase + ) -> containers.ContainerBase: + """Create the output container.""" + output_axes = {ax: np.array([-1], dtype=int) for ax in self.axes} # Create the output container without initializing any datasets. # Add some extra metadata about which axes were reduced and which @@ -1588,11 +1550,10 @@ def _downselect(ds): axes_from=data, attrs_from=data, skip_datasets=True, **output_axes ) out.attrs["reduced"] = True - out.attrs["reduction_op"] = self.op - out.attrs["weight_reduction_op"] = self.weight_op out.attrs["reduction_axes"] = np.array(self.axes) - out.attrs["downselections"] = self.selections - out.attrs["reduced_datasets"] = np.array(self.datasets) + out.attrs["reduced_dataset"] = self.dataset + + out = self.add_output_attrs(out) # Initialize the weight dataset if "weight" in data.datasets: @@ -1600,96 +1561,41 @@ def _downselect(ds): elif "vis_weight" in data.datasets: out.add_dataset("vis_weight") - # Set up the new weights - if hasattr(out, "weight"): - # Get the current distributed axis, if it is distributed - if data.weight.distributed: - original_ax_id = data.weight.distributed_axis - - weight_axes = data.weight.attrs["axis"] - # Get the axes to reduce over - apply_over = _select_red_axes(weight_axes) - # Apply downselections and redistribute for reduction - weight = _downselect(data.weight) - - if self.weight_op in {"all", "any"}: - # Convert to boolean before applying the op - w = weight > 0.0 - else: - w = weight + return out - # Apply the weight reduction - ds_weight = self._weight_op_func(w, axis=apply_over, keepdims=True) + def add_output_attrs( + self, cont: containers.ContainerBase + ) -> containers.ContainerBase: + """Overwrite to add any additional attributes to the output.""" + return cont - if out.weight.distributed: - # Distribute back to the original axis - ds_weight = ds_weight.redistribute(original_ax_id) + def reduction( + self, arr: np.ndarray, weight: np.ndarray, axis: tuple + ) -> tuple[np.ndarray, np.ndarray]: + """Overwrite to implement the reductino operation.""" + raise NotImplementedError - out.weight[:] = ds_weight[:] - else: - # Flat weight that can broadcast to whatever shape - self.log.info("No weights available. Using equal weighting.") - weight = np.ones(1, dtype=bool) - # Iterate over the datasets and reduce - for name in self.datasets: - # Get the dataset - ds = data.datasets[name] - # Initialize the dataset in the output container - out.add_dataset(name) - # Get the current distributed axis of this dataset, if it - # is distributed - if ds.distributed: - original_ax_id = ds.distributed_axis - - # Get the axes for this dataset - ds_axes = ds.attrs["axis"] - # Get the axes in this dataset to reduce over - apply_over = _select_red_axes(ds_axes) - # Apply downselections and redistribute - arr = _downselect(ds) - # If arr is distributed, get its axis and a numpy view - if isinstance(arr, mpiarray.MPIArray): - # Get the new distributed axis - new_ax = arr.axis - arr = arr.local_array[:] - - if name in self.apply_weight or self.mask_array: - # If we need to use the weights, make sure they're distributed - # to the correct axis and can broadcast to this array - if isinstance(weight, mpiarray.MPIArray): - # Redistribute to the equivalent axis in the weights - ax_name = ds_axes[new_ax] - new_weight_ax = weight_axes.index(ax_name) - weight = weight.redistribute(new_weight_ax) - - # Broadcast the weights to the array shape - # MPIArrays aren't preserved properly, so must return a ndarray - try: - mask = np.broadcast_to(weight, arr.shape, subok=False) - except ValueError as e: - raise ValueError( - f"Could not broadcast weights. Got dataset {name} with " - f"shape {arr.shape} and weight with shape {weight.shape}." - ) from e +class ReduceVar(ReduceBase): + """Take the weighted variance of a container.""" - if name in self.apply_weight: - # Apply the weights to the dataset - arr = arr * mask + def add_output_attrs(self, cont): + """Add an attribute about the operation being done.""" + cont.attrs["reduction_op"] = "variance" - # Apply the optional mask and reduction op - if self.mask_array: - # Use a masked array, ignoring values where the weights are zero - arr = np.ma.array(arr, mask=~(mask > 0.0)) - reduced = self._op_func(arr, axis=apply_over, keepdims=True).data - else: - reduced = self._op_func(arr, axis=apply_over, keepdims=True) + return cont - if ds.distributed: - # Redistribute back to the original axis - reduced = mpiarray.MPIArray.wrap(reduced, axis=new_ax) - reduced = reduced.redistribute(original_ax_id) + def reduction(self, arr, weight, axis): + """Apply a weighted variance.""" + if self.weighting == "none": + return np.var(arr, axis=axis, keepdims=True) - out[name][:] = reduced[:] + if self.weighting == "masked": + weight = (weight > 0).astype(weight.dtype) - return out + ws = weight.sum(axis=axis, keepdims=True) / np.prod(np.take(weight.shape, axis)) + weight = weight * ws + + v = np.var(weight**0.5 * arr, axis=axis, keepdims=True) + + return v, np.ones_like(v)