Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sidereal, containers): sample variance over sidereal days #104

Merged
merged 8 commits into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions draco/analysis/beamform.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ def _sig(pp, freq, dec):
return sig_amps[pp] / freq / np.cos(dec)
ssiegelx marked this conversation as resolved.
Show resolved Hide resolved

def _amp(pp, dec, zenith):

def _flat_top_gauss6(x, A, sig, x0):
"""Flat-top gaussian. Power of 6."""
return A * np.exp(-abs((x - x0) / sig) ** 6)
Expand Down
143 changes: 120 additions & 23 deletions draco/analysis/sidereal.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,33 +491,42 @@ def _regrid(self, vis, weight, lsd):
class SiderealStacker(task.SingleTask):
"""Take in a set of sidereal days, and stack them up.

This will apply relative calibration.
Also computes the variance over sideral days using an
algorithm that updates the sum of square differences from
ssiegelx marked this conversation as resolved.
Show resolved Hide resolved
the current mean, which is less prone to numerical issues.
See West, D.H.D. (1979). https://doi.org/10.1145/359146.359153.

Parameters
Attributes
----------
tag : str
tag : str (default: "stack")
The tag to give the stack.
weight: str (default: "inverse_variance")
The weighting to use in the stack.
Either `uniform` or `inverse_variance`.
with_sample_variance : bool (default: False)
Add a dataset containing the sample variance
of the visibilities over sidereal days to the
sidereal stack.
"""

stack = None
lsd_list = None

tag = config.Property(proptype=str, default="stack")
weight = config.enum(["uniform", "inverse_variance"], default="inverse_variance")
with_sample_variance = config.Property(proptype=bool, default=False)

def process(self, sdata):
"""Stack up sidereal days.

Parameters
----------
sdata : containers.SiderealStream
Individual sidereal day to stack up.
Individual sidereal day to add to stack.
"""

sdata.redistribute("freq")

# Get the LSD label out of the data (resort to using a CSD if it's
# present). If there's no label just use a place holder and stack
# anyway.
# Get the LSD (or CSD) label out of the input's attributes.
# If there is no label, use a placeholder.
if "lsd" in sdata.attrs:
input_lsd = sdata.attrs["lsd"]
elif "csd" in sdata.attrs:
Expand All @@ -527,43 +536,131 @@ def process(self, sdata):

input_lsd = _ensure_list(input_lsd)

# If this is our first sidereal day, then initialize the
# container that will hold the stack.
if self.stack is None:

self.stack = containers.empty_like(sdata)
self.stack.redistribute("freq")

self.stack.vis[:] = sdata.vis[:] * sdata.weight[:]
self.stack.weight[:] = sdata.weight[:]
# Add stack-specific datasets
if "nsample" not in self.stack.datasets:
self.stack.add_dataset("nsample")

self.lsd_list = input_lsd
if self.with_sample_variance and (
"sample_variance" not in self.stack.datasets
):
self.stack.add_dataset("sample_variance")

self.log.info("Starting stack with LSD:%i", sdata.attrs["lsd"])
self.stack.redistribute("freq")

return
# Initialize all datasets to zero.
for data in self.stack.datasets.values():
data[:] = 0

self.log.info("Adding LSD:%i to stack", sdata.attrs["lsd"])
self.lsd_list = []

# note: Eventually we should fix up gains
# Keep track of the sum of squared weights
# to perform Bessel's correction at the end.
if self.with_sample_variance:
self.sum_coeff_sq = np.zeros_like(self.stack.weight[:].view(np.ndarray))

# Combine stacks with inverse `noise' weighting
self.stack.vis[:] += sdata.vis[:] * sdata.weight[:]
self.stack.weight[:] += sdata.weight[:]
# Accumulate
self.log.info("Adding to stack LSD(s): %s" % input_lsd)

self.lsd_list += input_lsd

if "nsample" in sdata.datasets:
# The input sidereal stream is already a stack
# over multiple sidereal days. Use the nsample
# dataset as the weight for the uniform case.
count = sdata.nsample[:]
else:
# The input sidereal stream contains a single
# sidereal day. Use a boolean array that
# indicates a non-zero weight dataset as
# the weight for the uniform case.
dtype = self.stack.nsample.dtype
count = (sdata.weight[:] > 0.0).astype(dtype)

# Determine the weights to be used in the average.
if self.weight == "uniform":
coeff = count.astype(np.float32)
# Accumulate the variances in the stack.weight dataset.
self.stack.weight[:] += (coeff ** 2) * tools.invert_no_zero(sdata.weight[:])
else:
coeff = sdata.weight[:]
# We are using inverse variance weights. In this case,
# we accumulate the inverse variances in the stack.weight
# dataset. Do that directly to avoid an unneccessary
# division in the more general expression above.
self.stack.weight[:] += sdata.weight[:]

# Accumulate the total number of samples.
self.stack.nsample[:] += count

# Below we will need to normalize by the current sum of coefficients.
# Can be found in the stack.nsample dataset for uniform case or
# the stack.weight dataset for inverse variance case.
if self.weight == "uniform":
sum_coeff = self.stack.nsample[:].astype(np.float32)
else:
sum_coeff = self.stack.weight[:]

# Calculate weighted difference between the new data and the current mean.
delta_before = coeff * (sdata.vis[:] - self.stack.vis[:])

# Update the mean.
self.stack.vis[:] += delta_before * tools.invert_no_zero(sum_coeff)

# The calculations below are only required if the sample variance was requested
if self.with_sample_variance:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed on Friday, I don't think this sample variance estimation works with nested stacking, i.e. if I run a set of individual days through this, get a couple of stacks like that, and stack those together with this, this will essentially take the variance of the means. I think what we really want it to do is to take the means of the variances (in some heuristic sense). Does that seem reasonable?

I think that means switching behaviour if you find that your input data already has a sample_variance dataset, but that might be a bit too much work to squeeze into this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sorry about that, I forgot what was done in this PR.

Are there scenarios in the current stacking pipeline where this task is used for nested stacking? Or is the SiderealStackMatch task used exclusively for that purpose?

I can update this task (and also SiderealStackMatch) so that it handles the sample variance calculation during nested stacking properly, but I think it will be low priority. If you think that it would be useful to have the sample variance of the quarter stacks on the next version of the pipeline processing, then I would merge this PR. Then the sample variance calculation can be turned off when generating a full stack until I can issue a separate PR for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just SiderealStackerMatch. I agree that it's a low priority to get the variance carried through to the nested stacks, so maybe add it to the Pipeline issues list, and then we'll get this one merged.


# Accumulate the sum of squared coefficients.
self.sum_coeff_sq += coeff ** 2

# Calculate the difference between the new data and the updated mean.
delta_after = sdata.vis[:] - self.stack.vis[:]

# Update the sample variance.
self.stack.sample_variance[0] += delta_before.real * delta_after.real
self.stack.sample_variance[1] += delta_before.real * delta_after.imag
self.stack.sample_variance[2] += delta_before.imag * delta_after.imag

def process_finish(self):
"""Construct and emit sidereal stack.
"""Normalize the stack and return the result.

Returns
-------
stack : containers.SiderealStream
Stack of sidereal days.
"""

self.stack.attrs["tag"] = self.tag
self.stack.attrs["lsd"] = np.array(self.lsd_list)

self.stack.vis[:] *= tools.invert_no_zero(self.stack.weight[:])
# For uniform weighting, normalize the accumulated variances by the total
# number of samples squared and then invert to finalize stack.weight.
if self.weight == "uniform":
norm = self.stack.nsample[:].astype(np.float32)
self.stack.weight[:] = (
tools.invert_no_zero(self.stack.weight[:]) * norm ** 2
)

# We need to normalize the sample variance by the sum of coefficients.
# Can be found in the stack.nsample dataset for uniform case
# or the stack.weight dataset for inverse variance case.
if self.with_sample_variance:

if self.weight != "uniform":
norm = self.stack.weight[:]

# Perform Bessel's correction. In the case of
# uniform weighting, norm will be equal to nsample - 1.
norm = norm - self.sum_coeff_sq * tools.invert_no_zero(norm)

# Normalize the sample variance.
self.stack.sample_variance[:] *= np.where(
self.stack.nsample[:] > 1, tools.invert_no_zero(norm), 0.0
)[np.newaxis, :]

return self.stack

Expand Down
Loading