Skip to content

Commit

Permalink
feat(sidereal): create nearest, linear, and cubic regridders
Browse files Browse the repository at this point in the history
Creates the SiderealRegridderNearest, SiderealRegridderLinear,
and SiderealRegridderCubic pipeline tasks that implement
simple methods for interpolating onto a fixed sidereal grid.
  • Loading branch information
ssiegelx authored and jrs65 committed Oct 21, 2020
1 parent 525305c commit 81badda
Showing 1 changed file with 260 additions and 1 deletion.
261 changes: 260 additions & 1 deletion draco/analysis/sidereal.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import numpy as np

from caput import config, mpiutil, mpiarray, tod
from cora.util import units

from .transform import Regridder
from ..core import task, containers, io
Expand Down Expand Up @@ -176,8 +177,13 @@ class SiderealRegridder(Regridder):
Width of the Lanczos interpolation kernel.
snr_cov: float
Ratio of signal covariance to noise covariance (used for Wiener filter).
down_mix: bool
Down mix the visibility prior to interpolation using the fringe rate
of a source at zenith. This is un-done after the interpolation.
"""

down_mix = config.Property(proptype=bool, default=False)

def setup(self, manager):
"""Set the local observers position.
Expand All @@ -204,12 +210,15 @@ def process(self, data):
sdata : containers.SiderealStream
The regularly gridded sidereal timestream.
"""

self.log.info("Regridding LSD:%i", data.attrs["lsd"])

# Redistribute if needed too
data.redistribute("freq")

sfreq = data.vis.local_offset[0]
efreq = sfreq + data.vis.local_shape[0]
freq = data.freq[sfreq:efreq]

# Convert data timestamps into LSDs
timestamp_lsd = self.observer.unix_to_lsd(data.time)

Expand All @@ -221,9 +230,21 @@ def process(self, data):
weight = data.weight[:].view(np.ndarray)
vis_data = data.vis[:].view(np.ndarray)

# Mix down
if self.down_mix:
self.log.info("Downmixing before regridding.")
phase = self._get_phase(freq, data.prodstack, timestamp_lsd)
vis_data *= phase

# perform regridding
new_grid, sts, ni = self._regrid(vis_data, weight, timestamp_lsd)

# Mix back up
if self.down_mix:
phase = self._get_phase(freq, data.prodstack, new_grid).conj()
sts *= phase
ni *= (np.abs(phase) > 0.0).astype(ni.dtype)

# Wrap to produce MPIArray
sts = mpiarray.MPIArray.wrap(sts, axis=0)
ni = mpiarray.MPIArray.wrap(ni, axis=0)
Expand All @@ -239,6 +260,244 @@ def process(self, data):

return sdata

def _get_phase(self, freq, prod, lsd):

# Determine if any baselines contains masked feeds
# These baselines will be flagged since they do not
# have valid baseline distances.
aa, bb = prod["input_a"], prod["input_b"]

mask = self.observer.feedmask[(aa, bb)].astype(np.float32)[
np.newaxis, :, np.newaxis
]

# Calculate the fringe rate assuming that ha = 0.0 and dec = lat
lmbda = units.c / (freq * 1e6)
u = self.observer.baselines[np.newaxis, :, 0] / lmbda[:, np.newaxis]

omega = -2.0 * np.pi * u * np.cos(np.radians(self.observer.latitude))

# Calculate the local sidereal angle
dphi = 2.0 * np.pi * (lsd - np.floor(lsd))

# Construct a complex sinusoid whose frequency
# is equal to each baselines fringe rate
phase = mask * np.exp(
-1.0j * omega[:, :, np.newaxis] * dphi[np.newaxis, np.newaxis, :]
)

return phase


def _search_nearest(x, xeval):

index_next = np.searchsorted(x, xeval, side="left")

index_previous = np.maximum(0, index_next - 1)
index_next = np.minimum(x.size - 1, index_next)

index = np.where(
np.abs(xeval - x[index_previous]) < np.abs(xeval - x[index_next]),
index_previous,
index_next,
)

return index


class SiderealRegridderNearest(SiderealRegridder):
"""Regrid onto the sidereal day using nearest neighbor interpolation."""

def _regrid(self, vis, weight, lsd):

# Create a regular grid
interp_grid = np.arange(0, self.samples, dtype=np.float64) / self.samples
interp_grid = interp_grid * (self.end - self.start) + self.start

# Find the data points that are closest to the fixed points on the grid
index = _search_nearest(lsd, interp_grid)

interp_vis = vis[..., index]
interp_weight = weight[..., index]

# Flag the re-gridded data if the nearest neighbor was more than one
# sample spacing away. This can occur if the input data does not have
# complete sidereal coverage.
delta = np.median(np.abs(np.diff(lsd)))
distant = np.flatnonzero(np.abs(lsd[index] - interp_grid) > delta)
interp_weight[..., distant] = 0.0

return interp_grid, interp_vis, interp_weight


class SiderealRegridderLinear(SiderealRegridder):
"""Regrid onto the sidereal day using linear interpolation."""

def _regrid(self, vis, weight, lsd):

# Create a regular grid
interp_grid = np.arange(0, self.samples, dtype=np.float64) / self.samples
interp_grid = interp_grid * (self.end - self.start) + self.start

# Find the data points that lie on either side of each point in the fixed grid
index = np.searchsorted(lsd, interp_grid, side="left")

ind1 = index - 1
ind2 = index

# If the fixed grid is outside the range covered by the data,
# then we will extrapolate and later flag as bad.
below = np.flatnonzero(ind1 == -1)
if below.size > 0:
ind1[below] = 0
ind2[below] = 1

above = np.flatnonzero(ind2 == lsd.size)
if above.size > 0:
ind1[above] = lsd.size - 2
ind2[above] = lsd.size - 1

# If the closest data points to the fixed grid point are more than one
# sample spacing away, then we will later flag that data as bad.
# This will occur if the input data does not cover the full sidereal day.
delta = np.median(np.abs(np.diff(lsd)))
distant = np.flatnonzero(
(np.abs(lsd[ind1] - interp_grid) > delta)
| (np.abs(lsd[ind2] - interp_grid) > delta)
)

# Calculate the coefficients for the linear interpolation
dx1 = interp_grid - lsd[ind1]
dx2 = lsd[ind2] - interp_grid

norm = tools.invert_no_zero(dx1 + dx2)
coeff1 = dx2 * norm
coeff2 = dx1 * norm

# Initialize the output arrays
shp = vis.shape[:-1] + (self.samples,)

interp_vis = np.zeros(shp, dtype=vis.dtype)
interp_weight = np.zeros(shp, dtype=weight.dtype)

# Loop over frequencies to reduce memory usage
for ff in range(shp[0]):

fvis = vis[ff]
fweight = weight[ff]

# Consider the data valid if it has nonzero weight
fflag = fweight > 0.0

# Determine the variance from the inverse weight
fvar = tools.invert_no_zero(fweight)

# Require both data points to be valid for the interpolated value to be valid
finterp_flag = fflag[:, ind1] & fflag[:, ind2]

# Interpolate the visibilities and propagate the weights
interp_vis[ff] = coeff1 * fvis[:, ind1] + coeff2 * fvis[:, ind2]

interp_weight[ff] = tools.invert_no_zero(
coeff1 ** 2 * fvar[:, ind1] + coeff2 ** 2 * fvar[:, ind2]
) * finterp_flag.astype(np.float32)

# Flag as bad any values that were extrapolated or that used distant points
interp_weight[..., below] = 0.0
interp_weight[..., above] = 0.0
interp_weight[..., distant] = 0.0

return interp_grid, interp_vis, interp_weight


class SiderealRegridderCubic(SiderealRegridder):
"""Regrid onto the sidereal day using cubic Hermite spline interpolation."""

def _regrid(self, vis, weight, lsd):

# Create a regular grid
interp_grid = np.arange(0, self.samples, dtype=np.float64) / self.samples
interp_grid = interp_grid * (self.end - self.start) + self.start

# Find the data point just after each point on the fixed grid
index = np.searchsorted(lsd, interp_grid, side="left")

# Find the 4 data points that will be used to interpolate
# each point on the fixed grid
index = np.vstack([index + i for i in range(-2, 2)])

# If the fixed grid is outside the range covered by the data,
# then we will extrapolate and later flag as bad
below = np.flatnonzero(np.any(index < 0, axis=0))
if below.size > 0:
index = np.maximum(index, 0)

above = np.flatnonzero(np.any(index >= lsd.size, axis=0))
if above.size > 0:
index = np.minimum(index, lsd.size - 1)

# If the closest data points to the fixed grid point are more than one
# sample spacing away, then we will later flag that data as bad.
# This will occur if the input data does not cover the full sidereal day.
delta = np.median(np.abs(np.diff(lsd)))
distant = np.flatnonzero(
np.any(np.abs(interp_grid - lsd[index]) > (2.0 * delta), axis=0)
)

# Calculate the coefficients for the interpolation
u = (interp_grid - lsd[index[1]]) * tools.invert_no_zero(
lsd[index[2]] - lsd[index[1]]
)

coeff = np.zeros((4, u.size), dtype=np.float64)
coeff[0] = u * ((2 - u) * u - 1)
coeff[1] = u ** 2 * (3 * u - 5) + 2
coeff[2] = u * ((4 - 3 * u) * u + 1)
coeff[3] = u ** 2 * (u - 1)
coeff *= 0.5

# Initialize the output arrays
shp = vis.shape[:-1] + (self.samples,)

interp_vis = np.zeros(shp, dtype=vis.dtype)
interp_weight = np.zeros(shp, dtype=weight.dtype)

# Loop over frequencies to reduce memory usage
for ff in range(shp[0]):

fvis = vis[ff]
fweight = weight[ff]

# Consider the data valid if it has nonzero weight
fflag = fweight > 0.0

# Determine the variance from the inverse weight
fvar = tools.invert_no_zero(fweight)

# Interpolate the visibilities and propagate the weights
finterp_flag = np.ones(shp[1:], dtype=np.bool)
finterp_var = np.zeros(shp[1:], dtype=weight.dtype)

for ii, cc in zip(index, coeff):

finterp_flag &= fflag[:, ii]
finterp_var += cc ** 2 * fvar[:, ii]

interp_vis[ff] += cc * fvis[:, ii]

# Invert the accumulated variances to get the weight
# Require all data points are valid for the interpolated value to be valid
interp_weight[ff] = tools.invert_no_zero(finterp_var) * finterp_flag.astype(
np.float32
)

# Flag as bad any values that were extrapolated or that used distant points
interp_weight[..., below] = 0.0
interp_weight[..., above] = 0.0
interp_weight[..., distant] = 0.0

return interp_grid, interp_vis, interp_weight


class SiderealStacker(task.SingleTask):
"""Take in a set of sidereal days, and stack them up.
Expand Down

0 comments on commit 81badda

Please sign in to comment.