Skip to content

Commit

Permalink
feat(analysis.beam.TransitGrouper): Add dataset ID check.
Browse files Browse the repository at this point in the history
Add option to check that gating state associated with dataset ID
matches the holography source.
  • Loading branch information
tristpinsm committed Jun 9, 2022
1 parent e099f48 commit 868e493
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 9 deletions.
100 changes: 91 additions & 9 deletions ch_pipeline/analysis/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import yaml
from os import path, listdir
import re

import numpy as np
from scipy import constants
Expand All @@ -12,8 +13,9 @@

from ch_util import ephemeris as ephem
from ch_util import tools, layout, holography
from chimedb import data_index as di
from chimedb import data_index as di, dataset
from chimedb.core import connect as connect_database
from chimedb.core.exceptions import NotFoundError


from draco.core import task, io
Expand All @@ -30,6 +32,9 @@
SPEED_LIGHT = float(constants.c) / 1e6 # 10^6 m / s
CHIME_CYL_W = 22.0 # m

NULL_ID = "00000000000000000000000000000000" # null dataset ID
ACQ_RE = re.compile("(\d{8}T\d{6}Z)_")


class TransitGrouper(task.SingleTask):
"""Group transits from a sequence of TimeStream objects.
Expand All @@ -47,13 +52,17 @@ class TransitGrouper(task.SingleTask):
This is a hack until a better solution is implemented.
fail_if_missing: bool
Raise an exception if no transits are found in the provided data.
check_dataset_id: bool
Check that the gating state associated with the dataset ID for the data matches
the holography source. Only for gated holography.
"""

ha_span = config.Property(proptype=float, default=180.0)
min_span = config.Property(proptype=float, default=0.0)
source = config.Property(proptype=str)
db_source = config.Property(proptype=str)
fail_if_missing = config.Property(proptype=bool, default=True)
check_dataset_id = config.Property(proptype=bool, default=False)

def setup(self, observer=None):
"""Set the local observers position if not using CHIME.
Expand Down Expand Up @@ -82,6 +91,9 @@ def setup(self, observer=None):
self.tstreams = []
self.last_time = 0
self.n_processed = 0
self.acq = None
self._uncertain_gating = False
self._known_datasets = []

# Get list of holography observations
# Only allowed to query database from rank0
Expand Down Expand Up @@ -112,14 +124,19 @@ def process(self, tstream):
# placeholder for finalized transit when it is ready
final_ts = None

# for gated holography, check gating state matches source
if not self._match_dataset_id(tstream):
self.log.info("Dataset ID check failed. Skipping.")
return None

# check if we jumped to another acquisition
if (tstream.time[0] - self.last_time) > 5 * (tstream.time[1] - tstream.time[0]):
if self.cur_transit is None:
# will be true when we start a new transit
pass
else:
# start on a new transit and return current one
final_ts = self._finalize_transit()
this_acq = ACQ_RE.search(tstream.attrs["filename"]).group(1)
if self.acq is None:
# this is the first file
self.acq = this_acq
elif this_acq != self.acq:
# start on a new transit and return current one
final_ts = self._finalize_transit()

# if this is the start of a new grouping, setup transit bounds
if self.cur_transit is None:
Expand Down Expand Up @@ -218,12 +235,17 @@ def _finalize_transit(self):
ephem.unix_to_datetime(self.cur_transit).strftime("%Y%m%dT%H%M%S"),
)
ts.attrs["archivefiles"] = filenames
if self._uncertain_gating:
ts.attrs["uncertain_gating"] = True
ts.attrs["tag"] = ts.attrs["tag"] + f"_{self.n_processed}"
else:
self.log.info("Transit too short. Skipping.")
ts = None

self.tstreams = []
self.cur_transit = None
self.acq = None
self._uncertain_gating = False
self.n_processed += 1

return ts
Expand Down Expand Up @@ -257,6 +279,66 @@ def _transit_bounds(self):
self.end_t = min(self.end_t, this_run[0][1][1])
self.obs_id = this_run[0][0]

def _match_dataset_id(self, tstream):

if not self.check_dataset_id:
# skip this check
return True

if "dataset_id" not in tstream.flags.keys():
self.log.warning(
"Data was acquired before dataset IDs were saved. Aborting check."
)
self._uncertain_gating = (
True # set a flag to indicate we couldn't confirm source
)
return True

# get dataset IDs from file
dset_ids = np.unique(tstream.flags["dataset_id"][:].flatten())
dset_ids = [d for d in dset_ids if d != NULL_ID] # exclude null state

# communicate all dataset IDs to rank 0
dset_ids = self.comm.gather(dset_ids, root=0)

result = True
if self.comm.rank == 0:
dset_ids = list(set([i for ds in dset_ids for i in ds]))

# check whether we valided these already
known = True
for dsi in dset_ids:
if dsi not in self._known_datasets:
known = False
break

if not known:
connect_database()
for dsi in dset_ids:
dset = dataset.Dataset.get_by_id(dsi)
try:
state = dset.closest_ancestor_of_type("gating")
except NotFoundError:
result = False
self.log.warning(
f"Dataset {dsi} has no associated gating state."
)
break
name = state.dataset_state.data["data"]["data"]["pulsar_name"]
if name not in self.src.names:
result = False
self.log.warning(
f"Gating state associated with {dsi} is for {name}, not {self.source}."
)
break
else:
self._known_datasets.append(dsi)

# communicate result to all ranks
result = self.comm.bcast(result, root=0)

return result


class TransitRegridder(Regridder):
"""Interpolate TimeStream transits onto a regular grid in hour angle.
Expand Down Expand Up @@ -771,7 +853,7 @@ def process(self, beam, data):
attrs_from=beam,
distributed=True,
comm=data.comm,
**output_kwargs
**output_kwargs,
)

stacked_beam.vis[:] = 0.0
Expand Down
3 changes: 3 additions & 0 deletions ch_pipeline/processing/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
db_source: *db_source_name
ha_span: *hour_angle
fail_if_missing: True
check_dataset_id: {check_dset}
- type: ch_pipeline.core.dataquery.QueryInputs
in: transits
Expand Down Expand Up @@ -159,6 +160,7 @@ class HolographyFringestop(base.ProcessingType):
"ompnum": 12,
"time": "0-0:15:00",
"timing_corr": "/project/rpp-chime/chime/chime_processed/timing/rev_00/not_referenced/*_chimetiming_delay.h5",
"check_dset": False,
}
default_script = DEFAULT_SCRIPT

Expand Down Expand Up @@ -221,6 +223,7 @@ def _finalise_jobparams(self, tag, jobparams):
# instrument is different for gated observations
jobparams["src"] = self._revparams["gated_sources"][src_db]
jobparams["inst"] = "chime26mgated"
jobparams["check_dset"] = True
else:
jobparams["src"] = self._revparams["sources"][src_db]
jobparams["inst"] = "chime26m"
Expand Down

0 comments on commit 868e493

Please sign in to comment.