From 3a7d12b87a80a2fbe97506af9df9825a3f3493c3 Mon Sep 17 00:00:00 2001 From: Simon Foreman Date: Fri, 10 Feb 2023 17:09:54 -0800 Subject: [PATCH] style: apply black 23.1.0 --- draco/analysis/beam.py | 2 -- draco/analysis/beamform.py | 5 ----- draco/analysis/dayenu.py | 13 ------------- draco/analysis/delay.py | 7 ------- draco/analysis/fgfilter.py | 4 ---- draco/analysis/flagging.py | 9 --------- draco/analysis/mapmaker.py | 4 ---- draco/analysis/ringmapmaker.py | 15 --------------- draco/analysis/sensitivity.py | 4 ---- draco/analysis/sidereal.py | 14 -------------- draco/analysis/sourcestack.py | 7 ------- draco/analysis/svdfilter.py | 3 --- draco/analysis/transform.py | 9 --------- draco/core/containers.py | 21 --------------------- draco/core/io.py | 9 --------- draco/core/misc.py | 5 ----- draco/core/task.py | 13 ------------- draco/synthesis/gain.py | 3 --- draco/synthesis/noise.py | 5 ----- draco/synthesis/stream.py | 1 - draco/util/random.py | 1 - draco/util/regrid.py | 1 - draco/util/rfi.py | 2 -- draco/util/tools.py | 3 --- test/test_containers.py | 3 --- test/test_io.py | 4 ---- test/test_tools.py | 3 --- 27 files changed, 170 deletions(-) diff --git a/draco/analysis/beam.py b/draco/analysis/beam.py index eeb9e4ea6..2d3d6f553 100644 --- a/draco/analysis/beam.py +++ b/draco/analysis/beam.py @@ -248,13 +248,11 @@ def _evaluate_beam(self, data): # Loop over local frequencies and polarisations and evaluate the beam # by calling the telescopes beam method. for ff, freq in enumerate(local_freq_index): - if not local_freq_flag[ff]: weight[ff] = 0.0 continue for pp, pol in enumerate(pol_pairs): - bii = self.telescope.beam(map_pol_to_feed[pol[0]], freq, angpos) if pol[0] != pol[1]: diff --git a/draco/analysis/beamform.py b/draco/analysis/beamform.py index 52d89b8ad..6f00168f6 100644 --- a/draco/analysis/beamform.py +++ b/draco/analysis/beamform.py @@ -111,7 +111,6 @@ def setup(self, manager): # Ensure that if we are using variable time tracking, # then we are also collapsing over hour angle. if self.variable_timetrack: - if self.collapse_ha: self.log.info( "Tracking source for declination dependent amount of time " @@ -197,7 +196,6 @@ def process(self): # For each source, beamform and populate container. for src in range(self.nsource): - if src % 1000 == 0: self.log.info(f"Source {src}/{self.nsource}") @@ -269,7 +267,6 @@ def process(self): # Loop over polarisations for pol, pol_str in enumerate(self.process_pol): - primary_beam = self._beamfunc(pol_str, dec, ha_array) # Fringestop and sum over products @@ -452,7 +449,6 @@ def _initialize_beam_with_data(self): # Find the index of the local frequencies in # the frequency axis of the telescope instance if not self.no_beam_model: - self.freq_local_telescope_index = np.array( [ np.argmin(np.abs(nu - self.telescope.frequencies)) @@ -492,7 +488,6 @@ def _beamfunc(self, pol, dec, ha): primary_beam = np.zeros((nfreq, ha.size), dtype=np.float64) for ff, freq in enumerate(self.freq_local_telescope_index): - bii = self.telescope.beam(self.map_pol_feed[pol[0]], freq, angpos) if pol[0] != pol[1]: diff --git a/draco/analysis/dayenu.py b/draco/analysis/dayenu.py index 0279b443d..093382a5e 100644 --- a/draco/analysis/dayenu.py +++ b/draco/analysis/dayenu.py @@ -93,7 +93,6 @@ def process(self, stream): # Loop over products for bb, bcut in enumerate(cutoff): - t0 = time.time() # Flag frequencies and times with zero weight @@ -144,7 +143,6 @@ def process(self, stream): return stream def _get_cut(self, prod): - baselines = ( self.telescope.feedpositions[prod["input_a"], :] - self.telescope.feedpositions[prod["input_b"], :] @@ -204,7 +202,6 @@ def setup(self): """Create the function used to determine the delay cutoff.""" if self.filename is not None: - fcut = containers.DelayCutoff.from_file(self.filename, distributed=False) kind = fcut.attrs.get("kind", "linear") @@ -215,7 +212,6 @@ def setup(self): self._cut_interpolator = {} for pp, pol in enumerate(fcut.pol): - self._cut_interpolator[pol] = scipy.interpolate.interp1d( fcut.el, fcut.cutoff[pp], @@ -270,13 +266,11 @@ def process(self, ringmap): # Loop over beam and polarisation for ind in np.ndindex(*lshp): - wind = ind[1:] kwargs = {ax: ringmap.index_map[ax][ii] for ax, ii in zip(axes, ind)} for ee, el in enumerate(els): - t0 = time.time() slc = ind + (slice(None), slice(None), ee) @@ -319,12 +313,10 @@ def process(self, ringmap): # Apply the filter if self.single_mask: - rm[slc] = np.matmul(NF[0], erm) weight[wslc] = tools.invert_no_zero(np.matmul(NF[0] ** 2, evar)) if self.atten_threshold > 0.0: - diag = np.diag(NF[0]) med_diag = np.median(diag[diag > 0.0]) @@ -333,7 +325,6 @@ def process(self, ringmap): weight[wslc] *= flag_low[:, np.newaxis].astype(np.float32) else: - for ii, rr in enumerate(index): rm[ind][:, rr, ee] = np.matmul(NF[ii], erm[:, rr]) weight[wind][:, rr, ee] = tools.invert_no_zero( @@ -341,7 +332,6 @@ def process(self, ringmap): ) if self.atten_threshold > 0.0: - diag = np.diag(NF[ii]) med_diag = np.median(diag[diag > 0.0]) @@ -449,7 +439,6 @@ def process(self, stream): # Loop over frequencies for ff, nu in enumerate(freq): - t0 = time.time() # The next several lines determine the mask as a function of time @@ -488,7 +477,6 @@ def process(self, stream): # Loop over E-W baselines for uu, ub in enumerate(uniqb): - iub = np.flatnonzero(indexb == uu) visfb = np.ascontiguousarray(vis[ff, iub]) @@ -512,7 +500,6 @@ def process(self, stream): return stream def _get_cut(self, freq, xsep): - lmbda = units.c / (freq * 1e6) u = xsep / lmbda m = instantaneous_m( diff --git a/draco/analysis/delay.py b/draco/analysis/delay.py index d6812b3fb..7ee8208e2 100644 --- a/draco/analysis/delay.py +++ b/draco/analysis/delay.py @@ -92,7 +92,6 @@ def process(self, ss): baselines = tel.feedpositions[ia] - tel.feedpositions[ib] for lbi, bi in ss.vis[:].enumerate(axis=1): - # Select the baseline length to use baseline = baselines[bi] if self.telescope_orientation == "NS": @@ -277,7 +276,6 @@ def process(self, ss: FreqContainerType) -> FreqContainerType: ) for lbi, bi in ss.datasets[dset][:].enumerate(axis=dist_axis_pos): - # Extract the part of the array that we are processing, and # transpose/reshape to make a 2D array with frequency as axis=0 vis_local = _take_view(ssv, lbi, dist_axis_pos) @@ -448,7 +446,6 @@ def process(self, ss): # Iterate over all baselines and use the Gibbs sampler to estimate the spectrum for lbi, bi in delay_spec.spectrum[:].enumerate(axis=0): - self.log.debug("Delay transforming baseline %i/%i", bi, len(baselines)) # Get the local selections @@ -680,7 +677,6 @@ def process(self, ss: FreqContainerType) -> containers.DelaySpectrum: # Iterate over all baselines and use the Gibbs sampler to estimate the spectrum for lbi, bi in delay_spec.spectrum[:].enumerate(axis=0): - self.log.debug(f"Delay transforming baseline {bi}/{nbase}") # Get the local selections @@ -772,7 +768,6 @@ def stokes_I(sstream, tel): # Cache beamclass as it's regenerated every call beamclass = tel.beamclass[:] for ii, ui in enumerate(uinv): - # Skip if not all polarisations were included if ucount[ui] < 4: continue @@ -1047,7 +1042,6 @@ def delay_spectrum_gibbs( # Window the frequency data if window is not None: - # Construct the window function x = fsel * 1.0 / total_freq w = window_generalised(x, window=window) @@ -1173,7 +1167,6 @@ def _draw_ps_sample(d): # Perform the Gibbs sampling iteration for a given number of loops and # return the power spectrum output of them. for ii in range(niter): - d_samp = _draw_signal_sample(S_samp) S_samp = _draw_ps_sample(d_samp) diff --git a/draco/analysis/fgfilter.py b/draco/analysis/fgfilter.py index 3893bbe4a..707219703 100644 --- a/draco/analysis/fgfilter.py +++ b/draco/analysis/fgfilter.py @@ -82,7 +82,6 @@ def _forward(self, mmodes): # Iterate over local m's, project mode and save to disk. for lm, mi in mmodes.vis[:].enumerate(axis=0): - tm = mmodes.vis[mi].transpose((1, 0, 2)).reshape(tel.nfreq, 2 * tel.npairs) svdm = bt.project_vector_telescope_to_svd(mi, tm) @@ -128,7 +127,6 @@ def _backward(self, svdmodes): # Iterate over local m's, project mode and save to disk. for lm, mi in mmodes.vis[:].enumerate(axis=0): - svdm = svdmodes.vis[mi] tm = bt.project_vector_svd_to_telescope(mi, svdm) @@ -190,7 +188,6 @@ def _forward(self, svdmodes): # Iterate over local m's and project mode into KL basis for lm, mi in svdmodes.vis[:].enumerate(axis=0): - sm = svdmodes.vis[mi][: svdmodes.nmode[mi]] klm = kl.project_vector_svd_to_kl(mi, sm, threshold=self.threshold) @@ -227,7 +224,6 @@ def _backward(self, klmodes): # Iterate over local m's and project mode into KL basis for lm, mi in klmodes.vis[:].enumerate(axis=0): - klm = klmodes.vis[mi][: klmodes.nmode[mi]] sm = kl.project_vector_kl_to_svd(mi, klm, threshold=self.threshold) diff --git a/draco/analysis/flagging.py b/draco/analysis/flagging.py index 01e49ef42..dbea61981 100644 --- a/draco/analysis/flagging.py +++ b/draco/analysis/flagging.py @@ -486,7 +486,6 @@ def process(self, data): med_weight = np.zeros(npol, dtype=np.float32) for pp in range(npol): - wlocal = data.weight[:, pp] wglobal = np.zeros(wlocal.global_shape, dtype=wlocal.dtype) @@ -616,7 +615,6 @@ def process(self, data): weight_local = weight.local_array for lfi, gfi in weight.enumerate(axis=0): - # MPIArray takes the local index, returns a local np.ndarray # Find values equal to zero to preserve them in final weights zeromask = weight_local[lfi] == 0.0 @@ -807,7 +805,6 @@ def process(self, sensitivity): stmask[:] = False for li, ii in madmask.enumerate(axis=0): - # Only process this polarisation if we should be including it, # otherwise skip and let it be implicitly set to False (i.e. not # masked) @@ -1033,7 +1030,6 @@ def process( # Get the rank with stack to create the new mask if sstream.comm.rank == rank_with_ind: - # Cut out the right section wf = ssv.local_array[:, self.stack_ind - lstart] ww = ssw.local_array[:, self.stack_ind - lstart] @@ -1271,7 +1267,6 @@ def _bad_freq_mask(self, nfreq: int) -> np.ndarray: mask = np.zeros(nfreq, dtype=bool) for s in self.bad_freq_ind: - if isinstance(s, int): if s < nfreq: mask[s] = True @@ -1470,7 +1465,6 @@ def tv_channels_flag(x, freq, sigma=5, f=0.5, debug=False): freq_end = freq + 0.5 * df for i in range(67): - # Find all frequencies that lie wholly or partially within the TV channel fs = tvstart_freq + i * tvwidth_freq fe = fs + tvwidth_freq @@ -1635,7 +1629,6 @@ def process(self, data): # Find the median offset between the stack and the daily data if self.match_median: - # Find the parts of the both the stack and the daily data that are both # measured mask = ( @@ -1689,7 +1682,6 @@ def process(self, data): weight *= weight_stack else: - # Perform a weighted average of the data to fill in missing samples dset *= weight dset += weight_stack * self.frac * (dset_stack + stack_offset) @@ -1811,7 +1803,6 @@ def process( # If only flagging co-pol baselines, make separate mask to select those, # and multiply into low-weight mask if self.pols_to_flag == "copol": - # Get local section of stack axis local_stack = stream.stack[stream.weight[:].local_bounds] diff --git a/draco/analysis/mapmaker.py b/draco/analysis/mapmaker.py index 10baf3ebd..ccc437711 100644 --- a/draco/analysis/mapmaker.py +++ b/draco/analysis/mapmaker.py @@ -77,7 +77,6 @@ def process(self, mmodes): # Loop over all m's and solve from m-mode visibilities to alms. for mi, m in m_array.enumerate(axis=0): - self.log.debug( "Processing m=%i (local %i/%i)", m, mi + 1, m_array.local_shape[0] ) @@ -156,7 +155,6 @@ class DirtyMapMaker(BaseMapMaker): """ def _solve_m(self, m, f, v, Ni): - bt = self.beamtransfer # Massage the arrays into shape @@ -188,7 +186,6 @@ class MaximumLikelihoodMapMaker(BaseMapMaker): """ def _solve_m(self, m, f, v, Ni): - bt = self.beamtransfer # Massage the arrays into shape @@ -241,7 +238,6 @@ class WienerMapMaker(BaseMapMaker): bt_cache = None def _solve_m(self, m, f, v, Ni): - import scipy.linalg as la bt = self.beamtransfer diff --git a/draco/analysis/ringmapmaker.py b/draco/analysis/ringmapmaker.py index f3f3251f2..cd99ee217 100644 --- a/draco/analysis/ringmapmaker.py +++ b/draco/analysis/ringmapmaker.py @@ -140,7 +140,6 @@ def process(self, sstream): # Unpack visibilities into new array for vis_ind, (p_ind, x_ind, y_ind) in enumerate(zip(pind, xind, yind)): - # Different behavior for intracylinder and intercylinder baselines. gsv[p_ind, :, x_ind, y_ind, :] = ssv[:, vis_ind] gsw[p_ind, :, x_ind, y_ind, :] = ssw[:, vis_ind] @@ -245,7 +244,6 @@ def process(self, gstream): # Loop over local frequencies and fill ring map for lfi, fi in gstream.vis[:].enumerate(1): - # Get the current frequency and wavelength fr = freq[fi] wv = scipy.constants.c * 1e-6 / fr @@ -391,7 +389,6 @@ def process(self, hstream): # Loop over local frequencies and fill ring map for lfi, fi in hstream.vis[:].enumerate(axis=1): - # Rotate the polarisations v = np.tensordot(P, hvv[:, lfi], axes=(1, 0)) b = np.tensordot(P, hvb[:, lfi], axes=(1, 0)) @@ -597,7 +594,6 @@ def process( # Loop over frequencies for lfi, freq in enumerate(local_freq): - find = (slice(None),) * 3 + (lfi,) hvf = hv[find] @@ -748,14 +744,12 @@ def _get_window(self, hybrid_vis_m): for ff in range(nfreq): for ee in range(nel): - mmin = min_m[ff, ee] mmax = max_m[ff, ee] in_range = np.flatnonzero((m >= mmin) & (m <= mmax)) if in_range.size > 0: - x = (m[in_range] - mmin) / (mmax - mmin) window[ff, in_range, ee] = tools.window_generalised( @@ -853,7 +847,6 @@ def process( def _get_beam_mmodes( self, hybrid_vis_m: containers.HybridVisMModes ) -> containers.HybridVisMModes: - # NOTE: Coefficients taken from Mateus's fits, but adjust to fix the definition # of sigma, and be the widths for the "voltage" beam def sig_chime_X(freq, dec): @@ -891,7 +884,6 @@ def B(phi, u, sigma): # Loop over all local frequencies and calculate the beam m-modes for lfi, fi in hybrid_vis_m.vis[:].enumerate(axis=3): - freq = hybrid_vis_m.freq[fi] # Calculate the baseline distance in wavelengths @@ -906,7 +898,6 @@ def B(phi, u, sigma): # each polarisation and declination sig = np.zeros((pol.size, dec.size), dtype=dec.dtype) for pi, (pa, pb) in enumerate(pol): - # Get the effective beamwidth for the polarisation combination sig_a = beam_width[pa](freq, dec) sig_b = beam_width[pb](freq, dec) @@ -946,7 +937,6 @@ class TikhonovRingMapMaker(DeconvolveHybridMBase): inv_SN = config.Property(proptype=float, default=1e-6) def _get_weight(self, inv_var): - if self.weight_ew == "inverse_variance": weight_ew = inv_var @@ -972,7 +962,6 @@ def _get_weight(self, inv_var): return weight_ew def _get_regularisation(self, *args): - return self.inv_SN @@ -1021,7 +1010,6 @@ class WienerRingMapMaker(DeconvolveHybridMBase): weight_ew = "inverse_variance" def _get_regularisation(self, freq, m, *args): - gal = ( self.gal_amp * (freq / self.pivot_freq) ** self.gal_alpha @@ -1036,7 +1024,6 @@ def _get_regularisation(self, freq, m, *args): return tools.invert_no_zero(spectrum[:, np.newaxis, np.newaxis]) def _get_weight(self, inv_var): - weight_ew = inv_var if self.exclude_intracyl: weight_ew[..., 0, :] = 0.0 @@ -1103,11 +1090,9 @@ def process( # Determine the weights that where used to average over the EW baselines if weight_scheme == "inverse_variance": - weight_ew = tools.invert_no_zero(var_time_avg) else: - n_ew = var.shape[-2] if weight_scheme == "uniform": diff --git a/draco/analysis/sensitivity.py b/draco/analysis/sensitivity.py index 731eeab6b..3775c1e84 100644 --- a/draco/analysis/sensitivity.py +++ b/draco/analysis/sensitivity.py @@ -159,13 +159,11 @@ def process(self, data): # Average over selected baseline per polarization for pp, ipol in enumerate(pol_index): - pcnt = cnt[ipol, :] pscale = 2.0 - auto_flag[ipol, np.newaxis] # Loop over frequencies to reduce memory usage for ff in range(nfreq): - fslc = slice((ff % niff) * ntime, ((ff % niff) + 1) * ntime) pfcnt = pcnt[:, index_cnt[fslc]] @@ -196,9 +194,7 @@ def process(self, data): radiometer_counter = np.zeros((nfreq, npol, ntime), dtype=np.float32) for ii, (ai, pi) in enumerate(zip(auto_input, auto_pol)): - for jj, (aj, pj) in enumerate(zip(auto_input, auto_pol)): - if self.exclude_intracyl and (ew_position[ai] == ew_position[aj]): # Exclude intracylinder baselines continue diff --git a/draco/analysis/sidereal.py b/draco/analysis/sidereal.py index 3eaf09ec7..cef22f099 100644 --- a/draco/analysis/sidereal.py +++ b/draco/analysis/sidereal.py @@ -251,7 +251,6 @@ def process(self, data): return sdata def _get_phase(self, freq, prod, lsd): - # Determine if any baselines contains masked feeds # These baselines will be flagged since they do not # have valid baseline distances. @@ -280,7 +279,6 @@ def _get_phase(self, freq, prod, lsd): def _search_nearest(x, xeval): - index_next = np.searchsorted(x, xeval, side="left") index_previous = np.maximum(0, index_next - 1) @@ -299,7 +297,6 @@ class SiderealRegridderNearest(SiderealRegridder): """Regrid onto the sidereal day using nearest neighbor interpolation.""" def _regrid(self, vis, weight, lsd): - # Create a regular grid interp_grid = np.arange(0, self.samples, dtype=np.float64) / self.samples interp_grid = interp_grid * (self.end - self.start) + self.start @@ -324,7 +321,6 @@ class SiderealRegridderLinear(SiderealRegridder): """Regrid onto the sidereal day using linear interpolation.""" def _regrid(self, vis, weight, lsd): - # Create a regular grid interp_grid = np.arange(0, self.samples, dtype=np.float64) / self.samples interp_grid = interp_grid * (self.end - self.start) + self.start @@ -372,7 +368,6 @@ def _regrid(self, vis, weight, lsd): # Loop over frequencies to reduce memory usage for ff in range(shp[0]): - fvis = vis[ff] fweight = weight[ff] @@ -404,7 +399,6 @@ class SiderealRegridderCubic(SiderealRegridder): """Regrid onto the sidereal day using cubic Hermite spline interpolation.""" def _regrid(self, vis, weight, lsd): - # Create a regular grid interp_grid = np.arange(0, self.samples, dtype=np.float64) / self.samples interp_grid = interp_grid * (self.end - self.start) + self.start @@ -454,7 +448,6 @@ def _regrid(self, vis, weight, lsd): # Loop over frequencies to reduce memory usage for ff in range(shp[0]): - fvis = vis[ff] fweight = weight[ff] @@ -469,7 +462,6 @@ def _regrid(self, vis, weight, lsd): finterp_var = np.zeros(shp[1:], dtype=weight.dtype) for ii, cc in zip(index, coeff): - finterp_flag &= fflag[:, ii] finterp_var += cc**2 * fvar[:, ii] @@ -540,7 +532,6 @@ def process(self, sdata): # If this is our first sidereal day, then initialize the # container that will hold the stack. if self.stack is None: - self.stack = containers.empty_like(sdata) # Add stack-specific datasets @@ -615,7 +606,6 @@ def process(self, sdata): # The calculations below are only required if the sample variance was requested if self.with_sample_variance: - # Accumulate the sum of squared coefficients. self.sum_coeff_sq += coeff**2 @@ -650,7 +640,6 @@ def process_finish(self): # Can be found in the stack.nsample dataset for uniform case # or the stack.weight dataset for inverse variance case. if self.with_sample_variance: - if self.weight != "uniform": norm = self.stack.weight[:] @@ -705,7 +694,6 @@ def process(self, sdata): sdata.redistribute("freq") if self.stack is None: - self.log.info("Starting new stack.") self.stack = containers.empty_like(sdata) @@ -791,7 +779,6 @@ def process_finish(self): # of the difficulty mapping the operations we would want to do into what numpy # allows for lfi in range(self.stack.vis[:].local_shape[0]): - Ni_s = self.Ni_s[lfi] N_s = tools.invert_no_zero(Ni_s) V = Va[lfi] * N_s[:, np.newaxis] @@ -823,7 +810,6 @@ def process_finish(self): def _ensure_list(x): - if hasattr(x, "__iter__"): y = [xx for xx in x] else: diff --git a/draco/analysis/sourcestack.py b/draco/analysis/sourcestack.py index 1da1130b9..e0ea12f8e 100644 --- a/draco/analysis/sourcestack.py +++ b/draco/analysis/sourcestack.py @@ -133,7 +133,6 @@ def process(self, formed_beam): # Loop over polarisations for pp, pstr in enumerate(pol): - fb = formed_beam.beam[:, pp].view(np.ndarray) fw = formed_beam.weight[:, pp].view(np.ndarray) @@ -144,7 +143,6 @@ def process(self, formed_beam): count = 0 # Source counter # For each source in the range of this process for lq in range(lshape): - if not source_mask[lq]: # Source not in the data redshift range continue @@ -229,7 +227,6 @@ def setup(self, catalog): # If the catalog is distributed, then we need to make sure that it # is distributed over an axis other than the object_id axis. if catalog.distributed: - axis_size = { key: len(val) for key, val in catalog.index_map.items() @@ -310,7 +307,6 @@ def process(self): # Loop over all datasets and if they have an object_id axis, select the # relevant objects along that axis for name, dset in self.catalog.datasets.items(): - if dset.attrs["axis"][0] == "object_id": new_catalog.datasets[name][:] = dset[:][ind] else: @@ -375,7 +371,6 @@ def process(self, stack): ) if (len(self.stack) % self.ngroup) == 0: - out = self._reset() return out @@ -441,11 +436,9 @@ def _reset(self): # Loop over mock stacks and save to output container for name, odset in out.datasets.items(): - mock_count = 0 for ss, stack in enumerate(self.stack): - dset = stack.datasets[name] if dset.attrs["axis"][0] == "mock": data = dset[:] diff --git a/draco/analysis/svdfilter.py b/draco/analysis/svdfilter.py index 828f54d0d..a378053b5 100644 --- a/draco/analysis/svdfilter.py +++ b/draco/analysis/svdfilter.py @@ -103,7 +103,6 @@ def process(self, mmodes): # Do a quick first pass calculation of all the singular values to get the max on this rank. for mi, m in vis.enumerate(axis=0): - vis_m = vis.local_array[mi].transpose((1, 0, 2)).reshape(vis.shape[2], -1) weight_m = ( weight.local_array[mi].transpose((1, 0, 2)).reshape(vis.shape[2], -1) @@ -124,7 +123,6 @@ def process(self, mmodes): # Loop over all m's and remove modes below the combined cut for mi, m in vis.enumerate(axis=0): - vis_m = vis.local_array[mi].transpose((1, 0, 2)).reshape(vis.shape[2], -1) weight_m = ( weight.local_array[mi].transpose((1, 0, 2)).reshape(vis.shape[2], -1) @@ -183,7 +181,6 @@ def svd_em(A, mask, niter=5, rank=5, full_matrices=False): # missing values, then forming a new estimate of the missing values using a # low rank approximation. for i in range(niter): - u, sig, vh = la.svd(A, full_matrices=full_matrices, overwrite_a=False) low_rank_A = np.dot(u[:, :rank] * sig[:rank], vh[:rank]) diff --git a/draco/analysis/transform.py b/draco/analysis/transform.py index b80e7d2ef..4401c880e 100644 --- a/draco/analysis/transform.py +++ b/draco/analysis/transform.py @@ -64,7 +64,6 @@ def process(self, ss): # Rebin the arrays, do this with a loop to save memory for fi in range(len(ss.freq)): - # Calculate rebinned index ri = fi // self.channel_bin @@ -187,7 +186,6 @@ def process(self, ss): # its representative products so that they contain only feeds that exist # and are not masked in the telescope instance. if ss.is_stacked: - stack_new, stack_flag = tools.redefine_stack_index_map( self.telescope, ss.input, ss.prod, ss.stack, ss.reverse_map["stack"] ) @@ -251,7 +249,6 @@ def process(self, ss): # Infer number of products that went into each stack if self.weight != "inverse_variance": - ssi = ss.input_flags[:] ssp = ss.index_map["prod"][:] sss = ss.reverse_map["stack"]["stack"][:] @@ -278,7 +275,6 @@ def process(self, ss): # Iterate over products (stacked) in the sidereal stream for ss_pi, ((ii, ij), conj) in enumerate(zip(ss_prod, ss_conj)): - # Map the feed indices into ones for the Telescope class bi, bj = input_ind[ii], input_ind[ij] @@ -528,7 +524,6 @@ def process(self, sstream: containers.SiderealContainer) -> containers.MContaine # Divide out the m-mode sinc-suppression caused by the rectangular integration window if self.remove_integration_window: - m = np.arange(mmax + 1) w = np.sinc(m / nra) inv_w = tools.invert_no_zero(w) @@ -647,7 +642,6 @@ def process(self, mmodes: containers.MContainer) -> containers.SiderealContainer # Apply the m-mode sinc-suppression caused by the rectangular integration window if self.apply_integration_window: - m = np.arange(mmodes.mmax + 1) w = np.sinc(m / nra) inv_w = tools.invert_no_zero(w) @@ -836,7 +830,6 @@ def process(self, data): return new_data def _regrid(self, vis_data, weight, times): - # Create a regular grid, padded at either end to supress interpolation issues pad = 5 * self.lanczos_width interp_grid = ( @@ -934,7 +927,6 @@ def process( # Loop over datasets in container for name, dset in sscont.datasets.items(): - if "ra" in dset.attrs["axis"]: # If dataset has RA axis, identify which axis it is ra_axis_idx = np.where(dset.attrs["axis"] == "ra")[0][0] @@ -1003,7 +995,6 @@ def process(self, polcont): YY_ind = list(polcont.index_map["pol"]).index("YY") for name, dset in polcont.datasets.items(): - if "pol" not in dset.attrs["axis"]: outcont.datasets[name][:] = dset[:] else: diff --git a/draco/core/containers.py b/draco/core/containers.py index 801a6f9e0..a56f5a558 100644 --- a/draco/core/containers.py +++ b/draco/core/containers.py @@ -153,7 +153,6 @@ class ContainerBase(memh5.BasicCont): convert_dataset_strings = True def __init__(self, *args, **kwargs): - # Arguments for pulling in definitions from other containers copy_from = kwargs.pop("copy_from", None) axes_from = kwargs.pop("axes_from", copy_from) @@ -192,12 +191,10 @@ def __init__(self, *args, **kwargs): # Create axis entries for axis in self.axes: - axis_map = None # Check if axis is specified in initialiser if axis in kwargs: - # If axis is an integer, turn into an arange as a default definition if isinstance(kwargs[axis], int): axis_map = np.arange(kwargs[axis]) @@ -222,7 +219,6 @@ def __init__(self, *args, **kwargs): # Copy over datasets that have compatible axes if dsets_from is not None: - # Get the list of axes names that have been overriden changed_axes = {ax for ax in self.axes if ax in kwargs} @@ -247,7 +243,6 @@ def __init__(self, *args, **kwargs): # Copy over attributes if attrs_from is not None: - # Copy attributes from container root memh5.copyattrs(attrs_from.attrs, self.attrs) @@ -405,7 +400,6 @@ def dataset_spec(self): # which get added to a temporary dict. We go over the reversed MRO so # that the `tdict.update` overrides tables in base classes.` for cls in inspect.getmro(self.__class__)[::-1]: - try: # NOTE: this is a little ugly as the following line will drop # down to base classes if dataset_spec isn't present, and thus @@ -429,7 +423,6 @@ def _class_axes(cls): # which get added to a temporary dict. We go over the reversed MRO so # that the `tdict.update` overrides tables in base classes. for c in inspect.getmro(cls)[::-1]: - try: axes |= set(c._axes) except AttributeError: @@ -531,7 +524,6 @@ def copy(self, shared=None): # Loop over datasets that exist in the source and either add a view of # the source dataset, or perform a full copy for name, data in self.datasets.items(): - if shared and name in shared: # TODO: find a way to do this that doesn't depend on the # internal implementation of BasicCont and MemGroup @@ -618,7 +610,6 @@ class TableBase(ContainerBase): _table_spec = {} def __init__(self, *args, **kwargs): - # Get the dataset specifiction for this class (not any base classes), or # an empty dictionary if it does not exist. Do the same for the axes entry.. dspec = self.__class__.__dict__.get("_dataset_spec", {}) @@ -627,7 +618,6 @@ def __init__(self, *args, **kwargs): # Iterate over all table_spec entries and construct dataset specifications for # them. for name, spec in self.table_spec.items(): - # Get the specifieid axis or if not present create a unique one for # this table entry axis = spec.get("axis", name + "_index") @@ -675,7 +665,6 @@ def table_spec(self): tdict = {} for cls in inspect.getmro(self.__class__)[::-1]: - try: tdict.update(cls._table_spec) except AttributeError: @@ -860,7 +849,6 @@ class SampleVarianceContainer(ContainerBase): _axes = ("component",) def __init__(self, *args, **kwargs): - # Set component axis to default real-imaginary basis if not already provided if "component" not in kwargs: kwargs["component"] = np.array( @@ -1012,7 +1000,6 @@ class SiderealContainer(ContainerBase): _axes = ("ra",) def __init__(self, ra=None, *args, **kwargs): - # Allow the passing of a number of samples for the RA axis if ra is not None: if isinstance(ra, int): @@ -1049,7 +1036,6 @@ class MContainer(ContainerBase): def __init__( self, mmax: Optional[int] = None, oddra: Optional[bool] = None, *args, **kwargs ): - # Set up axes from passed arguments if mmax is not None: kwargs["m"] = mmax + 1 @@ -1088,7 +1074,6 @@ class HealpixContainer(ContainerBase): _axes = ("pixel",) def __init__(self, nside=None, *args, **kwargs): - # Set up axes from passed arguments if nside is not None: kwargs["pixel"] = 12 * nside**2 @@ -1128,7 +1113,6 @@ class Map(FreqContainer, HealpixContainer): } def __init__(self, polarisation=True, *args, **kwargs): - # Set up axes from passed arguments kwargs["pol"] = ( np.array(["I", "Q", "U", "V"]) if polarisation else np.array(["I"]) @@ -1486,7 +1470,6 @@ class GridBeam(FreqContainer): } def __init__(self, coords="celestial", *args, **kwargs): - super(GridBeam, self).__init__(*args, **kwargs) self.attrs["coords"] = coords @@ -1657,7 +1640,6 @@ def __init__( *args, **kwargs, ): - if theta is not None and phi is not None: if len(theta) != len(phi): raise RuntimeError( @@ -2347,7 +2329,6 @@ class Powerspectrum2D(ContainerBase): } def __init__(self, kperp_edges=None, kpar_edges=None, *args, **kwargs): - # Construct the kperp axis from the bin edges if kperp_edges is not None: centre = 0.5 * (kperp_edges[1:] + kperp_edges[:-1]) @@ -2762,7 +2743,6 @@ def copy_datasets_filter( stack = [source] while stack: - item = stack.pop() if memh5.is_group(item): @@ -2783,7 +2763,6 @@ def copy_datasets_filter( axis_ind = axes.index(axis) if isinstance(item, memh5.MemDatasetDistributed): - if (item.distributed_axis == axis_ind) and not allow_distributed: raise RuntimeError( f"Cannot redistristribute dataset={item.name} along " diff --git a/draco/core/io.py b/draco/core/io.py index c4cebcb9f..2320c38f7 100644 --- a/draco/core/io.py +++ b/draco/core/io.py @@ -63,7 +63,6 @@ def _list_of_filelists(files: Union[List[str], List[List[str]]]) -> List[List[st f2 = [] for filelist in files: - if isinstance(filelist, str): if "*" not in filelist and not os.path.isfile(filelist): raise ConfigError("File not found: %s" % filelist) @@ -110,7 +109,6 @@ def _list_or_glob(files: Union[str, List[str]]) -> List[str]: # We presume a string is an actual path... if isinstance(files, str): - # Check that it exists and is a file (or dir if zarr format) if files.endswith(".zarr"): if not os.path.isdir(files): @@ -156,7 +154,6 @@ def _list_of_filegroups(groups: Union[List[Dict], Dict]) -> List[Dict]: # Iterate over groups, set the tag if needed, and process the file list # through glob for gi, group in enumerate(groups): - try: files = group["files"] except KeyError: @@ -220,7 +217,6 @@ def next(self): # Iterate over all the files in the group, load them into a Map # container and add them all together for mfile in group["files"]: - self.log.debug("Loading file %s", mfile) current_map = containers.Map.from_file(mfile, distributed=True) @@ -233,7 +229,6 @@ def next(self): # Otherwise, check that the new map has consistent frequencies, # nside and pol and stack up. else: - if (current_map.freq != map_stack.freq).all(): raise RuntimeError("Maps do not have consistent frequencies.") @@ -308,7 +303,6 @@ def process(self): # container and add them all together catalog_stack = [] for cfile in group["files"]: - self.log.debug("Loading file %s", cfile) # TODO: read out the weights from the catalogs @@ -451,7 +445,6 @@ def _resolve_sel(self): # To enforce the precedence of range vs index selections, we rely on the fact # that a sort will place the axis_range keys after axis_index keys for k in sorted(self.selections or []): - # Parse the key to get the axis name and type, accounting for the fact the # axis name may contain an underscore *axis, type_ = k.split("_") @@ -633,7 +626,6 @@ class Print(pipeline.TaskBase): """Stupid module which just prints whatever it gets. Good for debugging.""" def next(self, input_): - print(input_) return input_ @@ -953,7 +945,6 @@ def process(self): my_containers = self.containers[self._host_rank :: self._num_hosts] for container in my_containers: - self.log.info(f"Zipping {container}") if not container.endswith(".zarr") or not os.path.isdir(container): diff --git a/draco/core/misc.py b/draco/core/misc.py index d99ac6510..35da3f2fc 100644 --- a/draco/core/misc.py +++ b/draco/core/misc.py @@ -59,7 +59,6 @@ def process(self, tstream, gain): ) if isinstance(gain, containers.StaticGainData): - # Extract gain array and add in a time axis gain_arr = gain.gain[:][..., np.newaxis] @@ -77,7 +76,6 @@ def process(self, tstream, gain): containers.CommonModeSiderealGainData, ), ): - # Extract gain array gain_arr = gain.gain[:] @@ -91,7 +89,6 @@ def process(self, tstream, gain): gain, (containers.SiderealGainData, containers.CommonModeSiderealGainData), ): - # Check that we are defined at the same RA samples if (gain.ra != tstream.ra).any(): raise RuntimeError( @@ -204,7 +201,6 @@ def next(self, input_): self._items.append(input_) def finish(self): - # Remove the internal reference to the items so they don't hang around after the task # finishes items = self._items @@ -239,7 +235,6 @@ def setup(self): start_time = time.time() while time.time() - start_time < self.timeout: - success = all([r.get_status() for r in results]) if success: diff --git a/draco/core/task.py b/draco/core/task.py index 4b67e59a3..0126cac5a 100644 --- a/draco/core/task.py +++ b/draco/core/task.py @@ -28,7 +28,6 @@ class MPILogFilter(logging.Filter): def __init__( self, add_mpi_info=True, level_rank0=logging.INFO, level_all=logging.WARN ): - from mpi4py import MPI self.add_mpi_info = add_mpi_info @@ -39,7 +38,6 @@ def __init__( self.comm = MPI.COMM_WORLD def filter(self, record): - # Add MPI info if desired try: record.mpi_rank @@ -101,7 +99,6 @@ class SetMPILogging(pipeline.TaskBase): level_all = config.Property(proptype=_log_level, default=logging.WARN) def __init__(self): - from mpi4py import MPI import math @@ -133,7 +130,6 @@ class LoggedTask(pipeline.TaskBase): log_level = config.Property(proptype=_log_level, default=None) def __init__(self): - # Get the logger for this task self._log = logging.getLogger( "%s.%s" % (self.__module__, self.__class__.__name__) @@ -157,7 +153,6 @@ class MPITask(pipeline.TaskBase): comm = None def __init__(self): - from mpi4py import MPI # Set default communicator @@ -176,7 +171,6 @@ class _AddRankLogAdapter(logging.LoggerAdapter): calling_obj = None def process(self, msg, kwargs): - if "extra" not in kwargs: kwargs["extra"] = {} @@ -190,7 +184,6 @@ class MPILoggedTask(MPITask, LoggedTask): """A task base that has MPI aware logging.""" def __init__(self): - # Initialise the base classes MPITask.__init__(self) LoggedTask.__init__(self) @@ -411,7 +404,6 @@ def finish(self): return output def _process_output(self, output): - if not isinstance(output, memh5.MemDiskGroup): raise pipeline.PipelineRuntimeError( f"Task must output a valid memh5 container; given {type(output)}" @@ -481,7 +473,6 @@ def walk_dset_tree(grp, root=""): # Routine to write output if needed. if self.save: - # add metadata to output metadata = {"versions": self.versions, "config": self.pipeline_config} for key, value in metadata.items(): @@ -518,7 +509,6 @@ def _nan_process_output(self, output): nan_found = self._nan_check_walk(output) if nan_found and self.nan_dump: - # Construct the filename tag = output.attrs["tag"] if "tag" in output.attrs else self._count outfile = "nandump_" + self.__class__.__name__ + "_" + str(tag) + ".h5" @@ -571,7 +561,6 @@ def _nan_check_walk(self, cont): # Check the dataset for non-finite numbers if isinstance(n, memh5.MemDataset): - # Try to test for NaN's and infs. This will fail for compound datatypes... # casting to ndarray, bc MPI ranks may fall out of sync, if a nan or inf are found arr = n[:].view(np.ndarray) @@ -705,12 +694,10 @@ def group_tasks(*tasks): """ class TaskGroup(*tasks): - # TODO: figure out how to make the setup work at the moment it just picks the first in MRO # def setup(self, x): pass def process(self, x): - for t in tasks: self.log.debug("Calling process for subtask %s", t.__name__) x = t.process(self, x) diff --git a/draco/synthesis/gain.py b/draco/synthesis/gain.py index 1b8729c24..98cc15ca2 100644 --- a/draco/synthesis/gain.py +++ b/draco/synthesis/gain.py @@ -258,7 +258,6 @@ class RandomGains(BaseGains): _prev_phase = None def _generate_amp(self, time): - # Generate the correlation function cf_amp = self._corr_func(self.corr_length_amp, self.sigma_amp) ninput = self.ninput_local @@ -279,7 +278,6 @@ def _generate_amp(self, time): return gain_amp def _generate_phase(self, time): - # Generate the correlation function cf_phase = self._corr_func(self.corr_length_phase, self.sigma_phase) ninput = self.ninput_local @@ -437,7 +435,6 @@ def process_finish(self): def _ensure_list(x): - if hasattr(x, "__iter__"): y = [xx for xx in x] else: diff --git a/draco/synthesis/noise.py b/draco/synthesis/noise.py index edcc0ba5b..b2a142784 100644 --- a/draco/synthesis/noise.py +++ b/draco/synthesis/noise.py @@ -36,10 +36,8 @@ class ReceiverTemperature(task.SingleTask): recv_temp = config.Property(proptype=float, default=0.0) def process(self, data): - # Iterate over the products to find the auto-correlations and add the noise into them for pi, prod in enumerate(data.prodstack): - # Great an auto! if prod[0] == prod[1]: data.vis[:, pi] += self.recv_temp @@ -224,7 +222,6 @@ def process(self, data): # Iterate over the products to find the auto-correlations and add the noise for pi, prod in enumerate(data.prodstack): - # Auto: multiply by sqrt(2) because auto has twice the variance if prod[0] == prod[1]: visdata[:, pi].real += np.sqrt(2) * noise[:, pi].real @@ -300,7 +297,6 @@ def process(self, data_exp): # Iterate over frequencies for lfi, fi in vis_data.enumerate(0): - # Get the frequency interval df = data_exp.index_map["freq"]["width"][fi] * 1e6 @@ -309,7 +305,6 @@ def process(self, data_exp): # Iterate over time for lti, ti in vis_data.enumerate(2): - # Unpack visibilites into full matrix vis_utv = vis_data[lfi, :, lti].view(np.ndarray).copy() vis_mat = np.zeros((nfeed, nfeed), dtype=vis_utv.dtype) diff --git a/draco/synthesis/stream.py b/draco/synthesis/stream.py index 3bd1b4b8e..8de696151 100644 --- a/draco/synthesis/stream.py +++ b/draco/synthesis/stream.py @@ -325,7 +325,6 @@ def process(self): return tstream def _next_time_axis(self): - # Calculate the integration time if self.integration_time is not None: int_time = self.integration_time diff --git a/draco/util/random.py b/draco/util/random.py index 747f0cdbf..e2e83acff 100644 --- a/draco/util/random.py +++ b/draco/util/random.py @@ -298,7 +298,6 @@ def rng(self): """ if self._rng is None: - # Generate a new base seed for all MPI ranks if self.seed is None: # Use seed sequence to generate a random seed diff --git a/draco/util/regrid.py b/draco/util/regrid.py index 33fd1b90c..7f9f9bd53 100644 --- a/draco/util/regrid.py +++ b/draco/util/regrid.py @@ -68,7 +68,6 @@ def band_wiener(R, Ni, Si, y, bw): # Iterate through and solve noise for ki in range(k): - # Upcast noise weights to float type Ni_ki = Ni[ki].astype(np.float64) diff --git a/draco/util/rfi.py b/draco/util/rfi.py index 14b58a6b0..67c3e8739 100644 --- a/draco/util/rfi.py +++ b/draco/util/rfi.py @@ -50,7 +50,6 @@ def sumthreshold_py( m = 1 while m <= max_m: - if m == 1: threshold = threshold1 else: @@ -183,7 +182,6 @@ def sir(basemask, eta=0.2, only_freq=False, only_time=False): newmask = basemask.astype(bool).copy() for pp in range(nprod): - if not only_time: for tt in range(ntime): newmask[:, pp, tt] |= sir1d(basemask[:, pp, tt], eta=eta) diff --git a/draco/util/tools.py b/draco/util/tools.py index 39336195f..a5cbf0b4b 100644 --- a/draco/util/tools.py +++ b/draco/util/tools.py @@ -209,7 +209,6 @@ def apply_gain(vis, gain, axis=1, out=None, prod_map=None): # Iterate over input pairs and set gains for pp in range(nprod): - # Determine the inputs. ii, ij = prod_map[pp] @@ -374,13 +373,11 @@ def redefine_stack_index_map(telescope, inputs, prod, stack, reverse_stack): # Loop over the stacked baselines for sind, (ii, jj) in enumerate(prod[stack["prod"]]): - bi, bj = tel_index[ii], tel_index[jj] # Check that the represenative pair of inputs are present # in the telescope instance and not masked. if (bi is None) or (bj is None) or not telescope.feedmask[bi, bj]: - # Find alternative pairs of inputs using the reverse map this_stack = np.flatnonzero(reverse_stack["stack"] == sind) diff --git a/test/test_containers.py b/test/test_containers.py index 123d0f559..d843f3586 100644 --- a/test/test_containers.py +++ b/test/test_containers.py @@ -9,7 +9,6 @@ @pytest.fixture def ss_container(): - # This is implicitly distributed, so works correctly under MPI ss = containers.SiderealStream( stack=5, input=3, ra=16, freq=np.linspace(800.0, 750.0, 5) @@ -22,7 +21,6 @@ def ss_container(): def test_attrs_from(ss_container): - ts = containers.TimeStream(time=10, axes_from=ss_container, attrs_from=ss_container) # Check that the attributes have been copied over properly @@ -39,7 +37,6 @@ def test_attrs_from(ss_container): def test_copy(ss_container): - ss_container.vis[:] = np.arange(16) ss_container.weight[:] = np.arange(16) diff --git a/test/test_io.py b/test/test_io.py index c0801e50f..152e850ac 100644 --- a/test/test_io.py +++ b/test/test_io.py @@ -11,7 +11,6 @@ @pytest.fixture def mpi_tmp_path(tmp_path_factory): - dirname = None if mpiutil.rank0: dirname = str(tmp_path_factory.mktemp("mpi")) @@ -22,7 +21,6 @@ def mpi_tmp_path(tmp_path_factory): @pytest.fixture def ss_container(): - # This is implicitly distributed, so works correctly under MPI ss = containers.SiderealStream( stack=5, input=3, ra=16, freq=np.linspace(800.0, 750.0, 5) @@ -42,7 +40,6 @@ def ss_container(): def test_LoadBasicCont_simple(ss_container, mpi_tmp_path): - fname = str(mpi_tmp_path / "ss.h5") ss_container.save(fname) @@ -67,7 +64,6 @@ def test_LoadBasicCont_simple(ss_container, mpi_tmp_path): def test_LoadBasicCont_selection(ss_container, mpi_tmp_path): - fname = str(mpi_tmp_path / "ss.h5") ss_container.save(fname) diff --git a/test/test_tools.py b/test/test_tools.py index a3be1690b..47e02d560 100644 --- a/test/test_tools.py +++ b/test/test_tools.py @@ -22,7 +22,6 @@ "a", [random_complex_array, random_float_array, random_double_array] ) def test_invert_no_zero(a): - zero_ind = ((0, 10, 12), (56, 34, 78)) good_ind = np.ones(a.shape, dtype=bool) good_ind[zero_ind] = False @@ -46,7 +45,6 @@ def test_invert_no_zero(a): def test_invert_no_zero_mpiarray(): - comm = MPI.COMM_WORLD comm.Barrier() @@ -65,7 +63,6 @@ def test_invert_no_zero_mpiarray(): def test_invert_no_zero_noncontiguous(): - a = np.arange(100, dtype=np.float64).reshape(10, 10) res = np.ones((10, 10), dtype=np.float64)