diff --git a/ch_pipeline/analysis/beam.py b/ch_pipeline/analysis/beam.py index 0b63d786..4c400e20 100644 --- a/ch_pipeline/analysis/beam.py +++ b/ch_pipeline/analysis/beam.py @@ -2,6 +2,7 @@ import json import yaml from os import path, listdir +import re import numpy as np from scipy import constants @@ -12,8 +13,9 @@ from ch_util import ephemeris as ephem from ch_util import tools, layout, holography -from chimedb import data_index as di +from chimedb import data_index as di, dataset from chimedb.core import connect as connect_database +from chimedb.core.exceptions import NotFoundError from draco.core import task, io @@ -30,6 +32,9 @@ SPEED_LIGHT = float(constants.c) / 1e6 # 10^6 m / s CHIME_CYL_W = 22.0 # m +NULL_ID = "00000000000000000000000000000000" # null dataset ID +ACQ_RE = re.compile("(\d{8}T\d{6}Z)_") + class TransitGrouper(task.SingleTask): """Group transits from a sequence of TimeStream objects. @@ -47,6 +52,9 @@ class TransitGrouper(task.SingleTask): This is a hack until a better solution is implemented. fail_if_missing: bool Raise an exception if no transits are found in the provided data. + check_dataset_id: bool + Check that the gating state associated with the dataset ID for the data matches + the holography source. Only for gated holography. """ ha_span = config.Property(proptype=float, default=180.0) @@ -54,6 +62,7 @@ class TransitGrouper(task.SingleTask): source = config.Property(proptype=str) db_source = config.Property(proptype=str) fail_if_missing = config.Property(proptype=bool, default=True) + check_dataset_id = config.Property(proptype=bool, default=False) def setup(self, observer=None): """Set the local observers position if not using CHIME. @@ -82,6 +91,9 @@ def setup(self, observer=None): self.tstreams = [] self.last_time = 0 self.n_processed = 0 + self.acq = None + self._uncertain_gating = False + self._known_datasets = [] # Get list of holography observations # Only allowed to query database from rank0 @@ -112,14 +124,19 @@ def process(self, tstream): # placeholder for finalized transit when it is ready final_ts = None + # for gated holography, check gating state matches source + if not self._match_dataset_id(tstream): + self.log.info("Dataset ID check failed. Skipping.") + return None + # check if we jumped to another acquisition - if (tstream.time[0] - self.last_time) > 5 * (tstream.time[1] - tstream.time[0]): - if self.cur_transit is None: - # will be true when we start a new transit - pass - else: - # start on a new transit and return current one - final_ts = self._finalize_transit() + this_acq = ACQ_RE.search(tstream.attrs["filename"]).group(1) + if self.acq is None: + # this is the first file + self.acq = this_acq + elif this_acq != self.acq: + # start on a new transit and return current one + final_ts = self._finalize_transit() # if this is the start of a new grouping, setup transit bounds if self.cur_transit is None: @@ -218,12 +235,17 @@ def _finalize_transit(self): ephem.unix_to_datetime(self.cur_transit).strftime("%Y%m%dT%H%M%S"), ) ts.attrs["archivefiles"] = filenames + if self._uncertain_gating: + ts.attrs["uncertain_gating"] = True + ts.attrs["tag"] = ts.attrs["tag"] + f"_{self.n_processed}" else: self.log.info("Transit too short. Skipping.") ts = None self.tstreams = [] self.cur_transit = None + self.acq = None + self._uncertain_gating = False self.n_processed += 1 return ts @@ -257,6 +279,66 @@ def _transit_bounds(self): self.end_t = min(self.end_t, this_run[0][1][1]) self.obs_id = this_run[0][0] + def _match_dataset_id(self, tstream): + + if not self.check_dataset_id: + # skip this check + return True + + if "dataset_id" not in tstream.flags.keys(): + self.log.warning( + "Data was acquired before dataset IDs were saved. Aborting check." + ) + self._uncertain_gating = ( + True # set a flag to indicate we couldn't confirm source + ) + return True + + # get dataset IDs from file + dset_ids = np.unique(tstream.flags["dataset_id"][:].flatten()) + dset_ids = [d for d in dset_ids if d != NULL_ID] # exclude null state + + # communicate all dataset IDs to rank 0 + dset_ids = self.comm.gather(dset_ids, root=0) + + result = True + if self.comm.rank == 0: + dset_ids = list(set([i for ds in dset_ids for i in ds])) + + # check whether we valided these already + known = True + for dsi in dset_ids: + if dsi not in self._known_datasets: + known = False + break + + if not known: + connect_database() + for dsi in dset_ids: + dset = dataset.Dataset.get_by_id(dsi) + try: + state = dset.closest_ancestor_of_type("gating") + except NotFoundError: + result = False + self.log.warning( + f"Dataset {dsi} has no associated gating state." + ) + break + name = state.dataset_state.data["data"]["data"]["pulsar_name"] + if name not in self.src.names: + result = False + self.log.warning( + f"Gating state associated with {dsi} is for {name}, not {self.source}." + ) + break + else: + self._known_datasets.append(dsi) + + # communicate result to all ranks + result = self.comm.bcast(result, root=0) + + return result + class TransitRegridder(Regridder): """Interpolate TimeStream transits onto a regular grid in hour angle. @@ -771,7 +853,7 @@ def process(self, beam, data): attrs_from=beam, distributed=True, comm=data.comm, - **output_kwargs + **output_kwargs, ) stacked_beam.vis[:] = 0.0 diff --git a/ch_pipeline/processing/beam.py b/ch_pipeline/processing/beam.py index 78108f41..a243fa2f 100644 --- a/ch_pipeline/processing/beam.py +++ b/ch_pipeline/processing/beam.py @@ -101,6 +101,7 @@ db_source: *db_source_name ha_span: *hour_angle fail_if_missing: True + check_dataset_id: {check_dset} - type: ch_pipeline.core.dataquery.QueryInputs in: transits @@ -159,6 +160,7 @@ class HolographyFringestop(base.ProcessingType): "ompnum": 12, "time": "0-0:15:00", "timing_corr": "/project/rpp-chime/chime/chime_processed/timing/rev_00/not_referenced/*_chimetiming_delay.h5", + "check_dset": False, } default_script = DEFAULT_SCRIPT @@ -221,6 +223,7 @@ def _finalise_jobparams(self, tag, jobparams): # instrument is different for gated observations jobparams["src"] = self._revparams["gated_sources"][src_db] jobparams["inst"] = "chime26mgated" + jobparams["check_dset"] = True else: jobparams["src"] = self._revparams["sources"][src_db] jobparams["inst"] = "chime26m"