Skip to content

Commit

Permalink
feat(flagging): add pipeline task for rainfall flagging
Browse files Browse the repository at this point in the history
  • Loading branch information
sjforeman committed May 10, 2024
1 parent bb49131 commit 0548c5d
Showing 1 changed file with 194 additions and 1 deletion.
195 changes: 194 additions & 1 deletion ch_pipeline/analysis/flagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import numpy as np

from caput import mpiutil, mpiarray, memh5, config, pipeline, tod
from ch_util import rfi, data_quality, tools, ephemeris, cal_utils, andata
from ch_util import rfi, data_quality, tools, ephemeris, cal_utils, andata, finder
from chimedb import dataflag as df
from chimedb.core import connect as connect_database

Expand All @@ -18,6 +18,7 @@
from draco.core import containers as dcontainers

from ..core import containers
from ..core.dataquery import _DEFAULT_NODE_SPOOF


class RFIFilter(task.SingleTask):
Expand Down Expand Up @@ -2260,3 +2261,195 @@ def process(self, data):
data.input_flags[:] = flag

return data


def load_rainfall(start_time, end_time, node_spoof=_DEFAULT_NODE_SPOOF):
"""Load rainfall measurements in a specific time range.
Parameters
----------
start_time, end_time : float
Unix times denoting beginning and end of time range.
node_spoof : dictionary
Host and directory for finding weather data.
Default: {'cedar_online': '/project/rpp-krs/chime/chime_online/'}
Returns
-------
times, rainfall : np.ndarray
Arrays of Unix timestamps and rainfall measurements (in mm).
"""

# Use Finder to fetch weather data files overlapping with specified time interval
f = finder.Finder(node_spoof=node_spoof)
f.only_chime_weather()
f.set_time_range(start_time=start_time, end_time=end_time)
f.accept_all_global_flags()
results_list = f.get_results()

# For each weather file, fetch rainfall measurements and associated timestamps
times = []
rainfall = []
for result in results_list:
result_data = result.as_loaded_data()
times.append(result_data.index_map["station_time_blockhouse"])
rainfall.append(result_data["blockhouse"]["rain"])

# Concatenate timestamp and rainfall arrays, and also sort chronologically
times = np.concatenate(times)
rainfall = np.concatenate(rainfall)
idx_sort = np.argsort(times)
times = times[idx_sort]
rainfall = rainfall[idx_sort]

return times, rainfall


def compute_cumulative_rainfall(
times, accumulation_time=30.0, node_spoof=_DEFAULT_NODE_SPOOF
):
"""Compute cumulative rainfall for a given set of Unix times.
The cumulative rainfall total is computed over the previous
accumulation_time seconds.
Parameters
----------
times : np.ndarray
Unix times to compute cumulative rainfall for. Assumed to be sorted
chronologically.
accumulation_time: float
Number of hours over which to compute accumulated rainfall for each time sample.
Default: 30.
Returns
-------
rainfall : np.ndarray
Cumulative rainfall totals, in mm.
"""

# Extra buffer (in s) for reading rainfall measurements, to ensure that range of
# input times is always fully within range of rainfall timestamps
_TIME_BUFFER = 600

# Compute accumulation time in seconds
accumulation_time_s = accumulation_time * 3600

# Load rainfall measurements within relevant time range
rain_timestamps, rain_meas = load_rainfall(
times[0] - accumulation_time_s * 3600 - _TIME_BUFFER,
times[-1] + _TIME_BUFFER,
node_spoof,
)

# Compute number of rainfall timestamps to accumulate
dtimestamp = np.median(np.diff(rain_timestamps))
n_sum = np.rint(accumulation_time_s / dtimestamp).astype(int)

def _sum_previous_N(arr, N):
# For each element in an array, compute the sum of the element and the N-1
# previous elements

# For each element, compute the sum of the element and all previous elements
cumsum = np.cumsum(arr)

# Make new array to stor results
sums = np.zeros_like(arr)
# First N sums will just be equal to cumsum result
sums[:N] = cumsum[:N]
# Final sums will be difference of cumsum results separated by N entries
sums[N:] = cumsum[N:] - cumsum[: len(arr) - N]

return sums

# Compute cumulative rainfall totals at same timestamps
cumu_rainfall = _sum_previous_N(rain_meas, n_sum)

# For each input time, assign cumulative rainfall from first timestamp
# that occurs after this time. This errs on the side of potentially
# overestimating the cumulative rainfall at a given input time.
time_timestamp_idx = np.searchsorted(rain_timestamps, times, side="left")
cumu_rainfall = cumu_rainfall[time_timestamp_idx]

return cumu_rainfall


class FlagRainfall(task.SingleTask):
"""Flag times following periods of heavy rainfall.
This task uses rainfall measurements from the DRAO weather station to compute
the accumulated rainfall within a given time interval prior to each time sample.
If the rainfall total exceeds some threshold, the weight dataset is set to zero
for that time.
Parameters
----------
accumulation_time : float
Number of hours over which to compute accumulated rainfall for each time sample.
Default: 30.
threshold : float
Rainfall threshold (in mm) for flagging. Default: 1.0.
node_spoof : dictionary
Host and directory for finding weather data.
Default: {'cedar_online': '/project/rpp-krs/chime/chime_online/'}
"""

accumulation_time = config.Property(proptype=float, default=30.0)
threshold = config.Property(proptype=float, default=1.0)
node_spoof = config.Property(proptype=dict, default=_DEFAULT_NODE_SPOOF)

def process(self, timestream):
"""Set weight to zero if cumulative rainfall exceeds desired threshold.
Parameters
----------
timestream : andata.CorrData or dcontainers.SiderealStream or dcontainers.TimeStream
Timestream to flag.
Returns
-------
timestream : andata.CorrData or dcontainers.SiderealStream or dcontainers.TimeStream
Returns the same timestream object with a modified weight dataset.
"""
# Redistribute over the frequency direction
timestream.redistribute("freq")

# Get time axis or convert RA axis to Unix time
if "ra" in timestream.index_map:
ra = timestream.index_map["ra"][:]
if "lsd" in timestream.attrs:
csd = timestream.attrs["lsd"]
else:
csd = timestream.attrs["csd"]
time = ephemeris.csd_to_unix(csd + ra / 360.0)
else:
time = timestream.time

# Compute cumulative rainfall within specified time interval.
# Only run on rank 0, because a database query is required
if self.comm.rank == 0:
rainfall = compute_cumulative_rainfall(
time,
accumulation_time=self.accumulation_time,
node_spoof=self.node_spoof,
)
else:
rainfall = np.empty_like(time)

# Broadcast cumulative rainfall to all ranks
self.comm.Bcast(rainfall, root=0)

# Compute mask corresponding to times when rainfall is below threshold
rainfall_mask = rainfall < self.threshold

# Multiply weights by mask
weight = timestream.weight[:]
weight.local_array[:] *= rainfall_mask[np.newaxis, np.newaxis, :]

# Report how much data has been flagged due to rainfall
self.log.info(
"%0.2f percent of data was flagged due to rainfall."
% (100.0 * (1.0 - np.sum(rainfall_mask) / len(rainfall_mask)))
)

return timestream

0 comments on commit 0548c5d

Please sign in to comment.