Source code for pipeline.hifv.tasks.testBPdcals.testBPdcals

import collections
import os
import shutil
from collections import defaultdict

import numpy as np

import pipeline.hif.heuristics.findrefant as findrefant
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.hifv.heuristics import getCalFlaggedSoln
from pipeline.hifv.heuristics import weakbp, do_bandpass, uvrange
from pipeline.hifv.heuristics.lib_EVLApipeutils import vla_minbaselineforcal
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry
from pipeline.infrastructure import utils
from pipeline.hifv.heuristics import getBCalStatistics


LOG = infrastructure.logging.get_logger(__name__)


class testBPdcalsInputs(vdp.StandardInputs):
    """Inputs class for the hifv_testBPdcals pipeline task.  Used on VLA measurement sets.

    The class inherits from vdp.StandardInputs.

    """
    weakbp = vdp.VisDependentProperty(default=False)
    refantignore = vdp.VisDependentProperty(default='')
    doflagundernspwlimit = vdp.VisDependentProperty(default=False)
    flagbaddef = vdp.VisDependentProperty(default=True)
    refant = vdp.VisDependentProperty(default='')

    @vdp.VisDependentProperty
    def iglist(self):
        return {}

    # docstring and type hints: supplements hifv_testBPdcals
    def __init__(self, context, vis=None, weakbp=None, refantignore=None, doflagundernspwlimit=None, flagbaddef=None, iglist=None, refant=None):
        """Initialize Inputs.

        Args:
            context (:obj:): Pipeline context

            vis(str, optional): The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the hifv_importdata task.

            weakbp(Boolean): Activate weak bandpass heuristics.
                Weak bandpass heuristics on/off - currently not used - see PIPE-104.

            refantignore(str): String list of antennas to ignore.

                Example:  refantignore='ea02, ea03'

            doflagundernspwlimit(Boolean): If the number of bad spws is greater than zero, and the keyword is True, then spws are flagged individually.

            flagbaddef(Boolean, optional): Enable/disable bad deformatter flagging. Default is True.
            iglist(dict, optional): When flagbaddef is True, skip bad deformatter flagging for elements in the ignore list.
                          Format: {antName:{band:{spw}}}
                          Example: {'ea02': {'L': {0, 1, '10~13'}}}
            refant(str): A csv string of reference antenna(s). When used, disables ``refantignore``.

                Example: refant = 'ea01, ea02'

        """
        super().__init__()
        self.context = context
        self.vis = vis
        self._weakbp = weakbp
        self.refantignore = refantignore
        self.doflagundernspwlimit = doflagundernspwlimit
        self.gain_solint1 = 'int'
        self.gain_solint2 = 'int'
        self.flagbaddef = flagbaddef
        self.iglist = iglist
        self.refant = refant


class testBPdcalsResults(basetask.Results):
    """Results class for the hifv_testBPdcals pipeline task.  Used on VLA measurement sets.

    The class inherits from basetask.Results.

    """
    def __init__(self, final=None, pool=None, preceding=None, gain_solint1=None,
                 shortsol1=None, vis=None, bpdgain_touse=None, gtypecaltable=None,
                 ktypecaltable=None, bpcaltable=None, flaggedSolnApplycalbandpass=None,
                 flaggedSolnApplycaldelay=None, result_amp=None, result_phase=None,
                 amp_collection=None, phase_collection=None, num_antennas=None, ignorerefant=None, bad_refant=None):
        """
        Args:
            vis(str): String name of the measurement set
            final(List, optional): Calibration list applied - not used
            pool(List, optional): Calibration list assesed - not used
            preceding(List, optional): DEPRECATED results from worker tasks executed by this task
            gain_solint1(Dict):  Dict of csv strings, keyed by band
            shortsol1(Dict):  Integration time determined from heuristics (1,3,10 x max int time) keyed by band
            bpdgain_touse(Dict):  Dictionary of tables per band
            gtypecaltable(Dict): Dictionary of tables per band
            ktypecaltable(Dict): Dictionary of tables per band
            bpcaltable(Dict): Dictionary of tables per band
            flaggedSolnApplycalbandpass(Dict): returned from getCalFlaggedSoln for bpdgain_touse (per band)
            flaggedSolnApplycaldelay(Dict): returned from getCalFlaggedSoln for ktypecaltable (per band)
            result_amp(Dict):  Bad deformatters amp flagging list per band
            result_phase(Dict): Bad deformatters phase flagging list per band
            amp_collection(Dict):  Bad deformatters amp weblog table per band
            phase_collection(Dict): Bad deformatters phase weblog table per band
            num_antennas(Dict):  Number of antennas (same per band, but included for weblog formatting)
            ignorerefant(List):  List of antennas removed if a baseband is determined to be bad for >50% of antennas.

        """

        if final is None:
            final = []
        if pool is None:
            pool = []
        if preceding is None:
            preceding = []
        if amp_collection is None:
            amp_collection = collections.defaultdict(list)
        if phase_collection is None:
            phase_collection = collections.defaultdict(list)
        if result_amp is None:
            result_amp = []
        if result_phase is None:
            result_phase = []
        if ignorerefant is None:
            ignorerefant = []

        super().__init__()

        self.vis = vis
        self.pool = pool[:]
        self.final = final[:]
        self.preceding = preceding[:]
        self.error = set()
        self.gain_solint1 = gain_solint1
        self.shortsol1 = shortsol1
        self.bpdgain_touse = bpdgain_touse
        self.gtypecaltable = gtypecaltable
        self.ktypecaltable = ktypecaltable
        self.bpcaltable = bpcaltable
        self.flaggedSolnApplycalbandpass = flaggedSolnApplycalbandpass
        self.flaggedSolnApplycaldelay = flaggedSolnApplycaldelay
        self.ignorerefant = ignorerefant

        self.result_amp = result_amp
        self.result_phase = result_phase
        self.amp_collection = amp_collection
        self.phase_collection = phase_collection
        self.num_antennas = num_antennas
        self.bad_refant = bad_refant

    def merge_with_context(self, context):
        m = context.observing_run.get_ms(self.vis)
        context.evla['msinfo'][m.name].gain_solint1 = self.gain_solint1
        context.evla['msinfo'][m.name].shortsol1 = self.shortsol1
        context.evla['msinfo'][m.name].ignorerefant = self.ignorerefant


[docs] @task_registry.set_equivalent_casa_task('hifv_testBPdcals') class testBPdcals(basetask.StandardTaskTemplate): """Class for the testBPdcals pipeline task. Used on VLA measurement sets. The class inherits from basetask.StandardTaskTemplate """ Inputs = testBPdcalsInputs
[docs] def prepare(self): """Bulk of task execution occurs here. Args: None Returns: testBPdcalsResults() """ self.ignorerefant = [] m = self.inputs.context.observing_run.get_ms(self.inputs.vis) spw2band = m.get_vla_spw2band() band2spw = collections.defaultdict(list) spwobjlist = m.get_spectral_windows(science_windows_only=True) listspws = [spw.id for spw in spwobjlist] for spw, band in spw2band.items(): if spw in listspws: # Science intents only band2spw[band].append(str(spw)) gtypecaltable = {} ktypecaltable = {} bpcaltable = {} bpdgain_touse = {} flaggedSolnApplycalbandpass = {} flaggedSolnApplycaldelay = {} gain_solint1 = {} shortsol1 = {} result_amp = {} result_phase = {} amp_collection = {} phase_collection = {} num_antennas = {} result_amp_perband = [] result_phase_perband = [] amp_collection_perband = defaultdict(list) phase_collection_perband = defaultdict(list) num_antennas_perband = len(m.antennas) bad_refant = {} # PIPE-2580: used for QA score for band, spwlist in band2spw.items(): bad_refant[band] = [] for i in [0, 1, 2]: # PIPE-1554: backing up the flags before applycal with version name. # The given version name is used to restore the flags when required. task = casa_tasks.flagmanager(vis=m.name, mode='save', versionname="testbpdcals_applycal") try: self._executor.execute(task) except Exception: LOG.error("Failed to save the last applied flags for %s" % m.basename) raise LOG.debug(" RUNNING FIRST PART TESTBPDCALS ") gain_solint1perband, shortsol1perband, vis, bpdgain_tousename, gtypecaltablename, ktypecaltablename, bpcaltablename, \ flaggedSolnApplycalbandpassperband, flaggedSolnApplycaldelayperband, refant = self._do_testBPdcals(band, spwlist) """ If an entire baseband is determined to be bad for >50% of antennas, the pipeline should do the following: 1. Do not flag any data due to bad deformatters. 2. Remove the first reference antenna from the refant list and ignore that antenna in refant calculations for the **entire pipeline run** 3. Recalculate the reference antenna list 4. Re-run hifv_testbpdcals and flagbaddef 5. Repeat up to three times and then just drive ahead. """ # PIPE-1183, adding an option to skip bad deformatter flagging. if self.inputs.flagbaddef: LOG.debug(" RUNNING SECOND PART BADDEFORMATTERS ") result_amp_perband, result_phase_perband, amp_collection_perband, phase_collection_perband, \ num_antennas_perband, amp_job, phase_job = self._run_baddeformatters(bpcaltablename) pct_amp_ant = len(result_amp_perband) / num_antennas_perband pct_phase_ant = len(result_phase_perband) / num_antennas_perband ant_threshold = 0.5 if (pct_amp_ant < ant_threshold and pct_phase_ant < ant_threshold) or i == 2: if amp_job: LOG.info("Executing bad deformatters amp flag commands for band {!s}...".format(band)) self._executor.execute(amp_job) if phase_job: LOG.info("Executing bad deformatters phase flag commands for band {!s}...".format(band)) self._executor.execute(phase_job) break else: # Criteria to finish not met - remove the first reference antenna from consideration self.ignorerefant.append(refant) LOG.warning("A baseband is determined to be bad for >50% of antennas. " "Removing reference antenna(s) {!s} and rerunning the test calibration.".format(','.join(self.ignorerefant))) bad_refant[band].append(refant) else: LOG.info("Skipping bad deformatter flagging") # PIPE-1554: restoring saved version of the flags as baseband is bad for >50% of antennas. flag_version_name = "testbpdcals_applycal" task = casa_tasks.flagmanager(vis=m.name, mode='restore', versionname=flag_version_name) try: flag_restore_results = self._executor.execute(task) except Exception: LOG.error("Failed to restore the last applied flags for %s" % m.basename) raise gtypecaltable[band] = gtypecaltablename ktypecaltable[band] = ktypecaltablename bpcaltable[band] = bpcaltablename bpdgain_touse[band] = bpdgain_tousename flaggedSolnApplycalbandpass[band] = flaggedSolnApplycalbandpassperband flaggedSolnApplycaldelay[band] = flaggedSolnApplycaldelayperband gain_solint1[band] = gain_solint1perband shortsol1[band] = shortsol1perband result_amp[band] = result_amp_perband result_phase[band] = result_phase_perband amp_collection[band] = amp_collection_perband phase_collection[band] = phase_collection_perband num_antennas[band] = num_antennas_perband return testBPdcalsResults(gain_solint1=gain_solint1, shortsol1=shortsol1, vis=vis, bpdgain_touse=bpdgain_touse, gtypecaltable=gtypecaltable, ktypecaltable=ktypecaltable, bpcaltable=bpcaltable, flaggedSolnApplycalbandpass=flaggedSolnApplycalbandpass, flaggedSolnApplycaldelay=flaggedSolnApplycaldelay, result_amp=result_amp, result_phase=result_phase, amp_collection=amp_collection, phase_collection=phase_collection, num_antennas=num_antennas, ignorerefant=self.ignorerefant, bad_refant=bad_refant)
[docs] def analyse(self, results): """Determine the best parameters by analysing the given jobs before returning any final jobs to execute. Override method of basetask.StandardTaskTemplate.analyze() Args: results (list of class: `~pipeline.infrastructure.jobrequest.JobRequest`): the job requests generated by :func:`~SimpleTask.prepare` Returns: class:`~pipeline.api.Result` """ return results
def _do_testBPdcals(self, band: str, spwlist: list[str]): """Execute testBPdcals heuristics per band and spwlist Args: band(str): String band single letter identifier - 'L' 'U' 'X' etc. spwlist(List): List of string values for spws - ['0', '1', '2', '3'] Returns: gain_solint1(str): solution interval value shortsol1(str): Integration time determined from heuristics (1,3,10 x max int time) vis: MS name bpdgain_touse(str): bp'd gain table used gtypecaltable(str): G-type table from gaincal ktypecaltable(str): K-type table from gaincal bpcaltable(str): BP cal table flaggedSolnApplycalbandpass(Dict): returned from getCalFlaggedSoln for bpdgain_tous flaggedSolnApplycaldelay(Dict): returned from getCalFlaggedSoln for ktypecaltable RefAntOutput(str): Reference antenna used """ LOG.info("Executing for band {!s} spws: {!s}".format(band, ','.join(spwlist))) self.parang = True m = self.inputs.context.observing_run.get_ms(self.inputs.vis) # PIPE-2164: getting setjy result stored in context self.setjy_results = self.inputs.context.evla['msinfo'][m.name].setjy_results try: stage_number = self.inputs.context.results[-1].read()[0].stage_number + 1 except Exception as e: stage_number = self.inputs.context.results[-1].read().stage_number + 1 tableprefix = os.path.basename(self.inputs.vis) + '.' + 'hifv_testBPdcals.s' gtypecaltable = tableprefix + str(stage_number) + '_1.' + 'testdelayinitialgain_{!s}.tbl'.format(band) ktypecaltable = tableprefix + str(stage_number) + '_2.' + 'testdelay_{!s}.tbl'.format(band) bpcaltable = tableprefix + str(stage_number) + '_4.' + 'testBPcal_{!s}.tbl'.format(band) tablebase = tableprefix + str(stage_number) + '_3.' + 'testBPdinitialgain' table_suffix = ['_{!s}.tbl'.format(band), '3_{!s}.tbl'.format(band), '10_{!s}.tbl'.format(band)] soltimes = [1.0, 3.0, 10.0] m = self.inputs.context.observing_run.get_ms(self.inputs.vis) # PIPE-1703: In multi-band data, the maximum integration time returned was determined # by considering the integration time of all scans. The get_vla_max_integration_time # method has been updated to return the maximum integration time for the input band. integration_time = m.get_integration_time_stats(stat_type="max", band=band, science_windows_only=True) soltimes = [integration_time * x for x in soltimes] solints = ['int', str(soltimes[1]) + 's', str(soltimes[2]) + 's'] soltime = soltimes[0] solint = solints[0] # Remove tables if they exist for tablename in [gtypecaltable, ktypecaltable, bpcaltable, tablebase + table_suffix[0], tablebase + table_suffix[1], tablebase + table_suffix[2]]: if os.path.isdir(tablename): LOG.info("Removing table: {!s}".format(tablename)) shutil.rmtree(tablename) refantignore = utils.build_refantignore(refantignore=self.inputs.refantignore, ignorerefant=self.ignorerefant) refantfield = self.inputs.context.evla['msinfo'][m.name].calibrator_field_select_string # PIPE-595: if refant list is not provided, compute refants else use provided refant list. if len(self.inputs.refant) == 0: refantobj = findrefant.RefAntHeuristics(vis=self.inputs.vis, field=refantfield, geometry=True, flagging=True, intent='', spw='', refantignore=refantignore) RefAntOutput = refantobj.calculate() else: RefAntOutput = self.inputs.refant.split(",") LOG.info("RefAntOutput: {}".format(RefAntOutput)) self._do_gtype_delaycal(caltable=gtypecaltable, RefAntOutput=RefAntOutput, spwlist=spwlist) LOG.info("Initial phase calibration on delay calibrator complete for band {!s}".format(band)) fracFlaggedSolns = 1.0 critfrac = m.get_vla_critfrac() # Iterate and check the fraction of Flagged solutions, each time running gaincal in 'K' mode flagcount = 0 while fracFlaggedSolns > critfrac and flagcount < 4: self._do_ktype_delaycal(caltable=ktypecaltable, addcaltable=gtypecaltable, RefAntOutput=RefAntOutput, spw=','.join(spwlist)) flaggedSolnResult = getCalFlaggedSoln(ktypecaltable) (fracFlaggedSolns, RefAntOutput) = self._check_flagSolns(flaggedSolnResult, RefAntOutput) LOG.info("Fraction of flagged solutions = " + str(flaggedSolnResult['all']['fraction'])) LOG.info("Median fraction of flagged solutions per antenna = " + str(flaggedSolnResult['antmedian']['fraction'])) flagcount += 1 # Do initial amplitude and phase gain solutions on the BPcalibrator and delay # calibrator; the amplitudes are used for flagging; only phase # calibration is applied in final BP calibration, so that solutions are # not normalized per spw and take out the baseband filter shape # Try running with solint of int_time, 3*int_time, and 10*int_time. # If there is still a large fraction of failed solutions with # solint=10*int_time the source may be too weak, and calibration via the # pipeline has failed; will need to implement a mode to cope with weak # calibrators (later) bpdgain_touse = tablebase + table_suffix[0] self._do_gtype_bpdgains(tablebase + table_suffix[0], addcaltable=ktypecaltable, solint=solint, RefAntOutput=RefAntOutput, spwlist=spwlist) flaggedSolnResult1 = getCalFlaggedSoln(tablebase + table_suffix[0]) LOG.info("For solint = " + solint + " fraction of flagged solutions = " + str(flaggedSolnResult1['all']['fraction'])) LOG.info("Median fraction of flagged solutions per antenna = " + str(flaggedSolnResult1['antmedian']['fraction'])) if flaggedSolnResult1['all']['total'] > 0: fracFlaggedSolns1 = flaggedSolnResult1['antmedian']['fraction'] else: fracFlaggedSolns1 = 1.0 gain_solint1 = solint shortsol1 = soltime if fracFlaggedSolns1 > 0.05: soltime = soltimes[1] solint = solints[1] self._do_gtype_bpdgains(tablebase + table_suffix[1], addcaltable=ktypecaltable, solint=solint, RefAntOutput=RefAntOutput, spwlist=spwlist) flaggedSolnResult3 = getCalFlaggedSoln(tablebase + table_suffix[1]) LOG.info("For solint = " + solint + " fraction of flagged solutions = " + str(flaggedSolnResult3['all']['fraction'])) LOG.info("Median fraction of flagged solutions per antenna = " + str(flaggedSolnResult3['antmedian']['fraction'])) if flaggedSolnResult3['all']['total'] > 0: fracFlaggedSolns3 = flaggedSolnResult3['antmedian']['fraction'] else: fracFlaggedSolns3 = 1.0 if fracFlaggedSolns3 < fracFlaggedSolns1: gain_solint1 = solint shortsol1 = soltime bpdgain_touse = tablebase + table_suffix[1] if fracFlaggedSolns3 > 0.05: soltime = soltimes[2] solint = solints[2] self._do_gtype_bpdgains(tablebase + table_suffix[2], addcaltable=ktypecaltable, solint=solint, RefAntOutput=RefAntOutput, spwlist=spwlist) flaggedSolnResult10 = getCalFlaggedSoln(tablebase + table_suffix[2]) LOG.info("For solint = " + solint + " fraction of flagged solutions = " + str(flaggedSolnResult10['all']['fraction'])) LOG.info("Median fraction of flagged solutions per antenna = " + str(flaggedSolnResult10['antmedian']['fraction'])) if flaggedSolnResult10['all']['total'] > 0: fracFlaggedSolns10 = flaggedSolnResult10['antmedian']['fraction'] else: fracFlaggedSolns10 = 1.0 if fracFlaggedSolns10 < fracFlaggedSolns3: gain_solint1 = solint shortsol1 = soltime bpdgain_touse = tablebase + table_suffix[2] if fracFlaggedSolns10 > 0.05: LOG.warning("There is a large fraction of flagged solutions, " + "there might be something wrong with your data. " + "The fraction of flagged solutions is " + str(fracFlaggedSolns10)) LOG.info("Test amp and phase calibration on delay and bandpass calibrators complete for band {!s}".format(band)) if solint == solints[0]: LOG.info("Using short solint = {!s} = {:.6f}s for band {!s}".format(str(gain_solint1), soltimes[0], band)) else: LOG.info("Using short solint = {!s} for band {!s}".format(str(gain_solint1), band)) LOG.info("Doing test bandpass calibration for band {!s}".format(band)) if self.inputs.weakbp: # LOG.info("USING WEAKBP HEURISTICS") interp = weakbp(self.inputs.vis, bpcaltable, context=self.inputs.context, RefAntOutput=RefAntOutput, ktypecaltable=ktypecaltable, bpdgain_touse=bpdgain_touse, solint='inf', append=False, executor=self._executor, spw=','.join(spwlist)) else: # LOG.info("Using REGULAR heuristics") interp = '' do_bandpass(self.inputs.vis, bpcaltable, context=self.inputs.context, RefAntOutput=RefAntOutput, spw=','.join(spwlist), ktypecaltable=ktypecaltable, bpdgain_touse=bpdgain_touse, solint='inf', append=False, executor=self._executor) AllCalTables = sorted(self.inputs.context.callibrary.active.get_caltable()) AllCalTables.append(ktypecaltable) AllCalTables.append(bpdgain_touse) AllCalTables.append(bpcaltable) ntables = len(AllCalTables) interp = [''] * ntables LOG.info("Using 'linear,linearflag' for bandpass table") interp[-1] = 'linear,linearflag' LOG.info("Test bandpass calibration complete") LOG.info("Fraction of flagged solutions = {!s}".format(str(flaggedSolnResult['all']['fraction']))) LOG.info( "Median fraction of flagged solutions per antenna = " + str(flaggedSolnResult['antmedian']['fraction'])) LOG.info("Executing flagdata in clip mode.") self._do_clipflag(bpcaltable) LOG.info("Applying test calibrations to BP and delay calibrators for band {!s}".format(band)) self._do_applycal(ktypecaltable=ktypecaltable, bpdgain_touse=bpdgain_touse, bpcaltable=bpcaltable, interp=interp, spw=','.join(spwlist)) flaggedSolnApplycalbandpass = getCalFlaggedSoln(bpdgain_touse) flaggedSolnApplycaldelay = getCalFlaggedSoln(ktypecaltable) return gain_solint1, shortsol1, self.inputs.vis, bpdgain_touse, gtypecaltable,\ ktypecaltable, bpcaltable, flaggedSolnApplycalbandpass, flaggedSolnApplycaldelay, RefAntOutput[0] def _do_gtype_delaycal(self, caltable: str = None, RefAntOutput: list[str] = None, spwlist: list[str] = []) -> bool: """Perform a G-Type delay calibration with CASA task gaincal Args: caltable(str): Name of the caltable to be created RefAntOutput(List): List of string antenna values to use as reference antennas - ['ea01', 'ea24', ...] spwlist(List): List of string values for spws pertaining to the particular band - ['0', '1', '2', ...] Returns: Boolean """ m = self.inputs.context.observing_run.get_ms(self.inputs.vis) delay_field_select_string = self.inputs.context.evla['msinfo'][m.name].delay_field_select_string tst_delay_spw = m.get_vla_tst_bpass_spw(spwlist=spwlist) delay_scan_select_string = self.inputs.context.evla['msinfo'][m.name].delay_scan_select_string minBL_for_cal = vla_minbaselineforcal() delaycal_task_args = {'vis': self.inputs.vis, 'caltable': caltable, 'field': '', 'spw': tst_delay_spw, 'intent': '', 'selectdata': True, 'uvrange': '', 'scan': delay_scan_select_string, 'solint': 'int', 'combine': 'scan', 'preavg': -1.0, 'refant': ','.join(RefAntOutput), 'minblperant': minBL_for_cal, 'minsnr': 3.0, 'solnorm': False, 'gaintype': 'G', 'smodel': [], 'calmode': 'p', 'append': False, 'docallib': False, 'gaintable': sorted(self.inputs.context.callibrary.active.get_caltable()), 'gainfield': [''], 'interp': [''], 'spwmap': [], 'parang': self.parang} fields = delay_field_select_string.split(',') for fieldidstring in fields: fieldid = int(fieldidstring) uvrangestring = uvrange(self.setjy_results, fieldid) delaycal_task_args['field'] = fieldidstring delaycal_task_args['uvrange'] = uvrangestring if os.path.exists(caltable): delaycal_task_args['append'] = True job = casa_tasks.gaincal(**delaycal_task_args) self._executor.execute(job) return True def _do_ktype_delaycal(self, caltable: str = None, addcaltable: str = None, RefAntOutput: list[str] = None, spw: str = '') -> bool: """Perform a K-Type delay calibration with CASA task gaincal Args: caltable(str): Name of the caltable to be created addcaltable(str): String name of table to temporarily be added to the gaincal gaintable parameter RefAntOutput(List): List of string antenna values to use as reference antennas - ['ea01', 'ea24', ...] spw(str): csv string values for spws pertaining to the particular band - '0,1,2,3,4,5,6' Returns: Boolean """ m = self.inputs.context.observing_run.get_ms(self.inputs.vis) delay_field_select_string = self.inputs.context.evla['msinfo'][m.name].delay_field_select_string delay_scan_select_string = self.inputs.context.evla['msinfo'][m.name].delay_scan_select_string minBL_for_cal = vla_minbaselineforcal() GainTables = sorted(self.inputs.context.callibrary.active.get_caltable()) GainTables.append(addcaltable) delaycal_task_args = {'vis': self.inputs.vis, 'caltable': caltable, 'field': '', 'spw': spw, 'intent': '', 'selectdata': True, 'uvrange': '', 'scan': delay_scan_select_string, 'solint': 'inf', 'combine': 'scan', 'preavg': -1.0, 'refant': ','.join(RefAntOutput), 'minblperant': minBL_for_cal, 'minsnr': 3.0, 'solnorm': False, 'gaintype': 'K', 'smodel': [], 'calmode': 'p', 'append': False, 'docallib': False, 'gaintable': GainTables, 'gainfield': [''], 'interp': [''], 'spwmap': [], 'parang': self.parang} for fieldidstring in delay_field_select_string.split(','): fieldid = int(fieldidstring) uvrangestring = uvrange(self.setjy_results, fieldid) delaycal_task_args['field'] = fieldidstring delaycal_task_args['uvrange'] = uvrangestring if os.path.exists(caltable): delaycal_task_args['append'] = True job = casa_tasks.gaincal(**delaycal_task_args) self._executor.execute(job) return True def _check_flagSolns(self, flaggedSolnResult: dict, RefAntOutput: list[str] = None) -> tuple[float, list[str]]: """Change reference antenna list based on a critical fraction of flagged solutions (defined in the domain ms object) Args: flaggedSolnResult(Dict): Breakdown of flagged solutions RefAntOutput(List): List of string antenna values to use as reference antennas - ['ea01', 'ea24', ...] Returns: fracFlaggedSolns(float): fraction of flagged solutions used in this function RefAntOutput(List): List of string antenna values to use as reference antennas - ['ea01', 'ea24', ...] Modified if fraction of flagged solutions is greater than critical fraction """ if flaggedSolnResult['all']['total'] > 0: fracFlaggedSolns = flaggedSolnResult['antmedian']['fraction'] else: fracFlaggedSolns = 1.0 m = self.inputs.context.observing_run.get_ms(self.inputs.vis) critfrac = m.get_vla_critfrac() if fracFlaggedSolns > critfrac: RefAntOutput = np.delete(RefAntOutput, 0) self.inputs.context.observing_run.measurement_sets[0].reference_antenna = ','.join(RefAntOutput) LOG.info("Not enough good solutions, trying a different reference antenna.") LOG.info("The pipeline will start with antenna "+RefAntOutput[0].lower()+" as the reference.") return fracFlaggedSolns, RefAntOutput def _do_gtype_bpdgains(self, caltable: str, addcaltable: str = None, solint: str = 'int', RefAntOutput: list[str] = None, spwlist: list[str] = []) -> bool: """Perform a G-Type cal with CASA task gaincal on the bp'd gaintable Args: caltable(str): Name of the caltable to be created addcaltable(str): String name of table to temporarily be added to the gaincal gaintable parameter solint(str): String value for solint keyword of CASA task gaincal RefAntOutput(List): List of string antenna values to use as reference antennas - ['ea01', 'ea24', ...] spwlist(List): List of string values for spws pertaining to the particular band - ['0', '1', '2', ...] Returns: Boolean """ m = self.inputs.context.observing_run.get_ms(self.inputs.vis) tst_bpass_spw = m.get_vla_tst_bpass_spw(spwlist=spwlist) delay_scan_select_string = self.inputs.context.evla['msinfo'][m.name].delay_scan_select_string bandpass_scan_select_string = self.inputs.context.evla['msinfo'][m.name].bandpass_scan_select_string minBL_for_cal = vla_minbaselineforcal() if delay_scan_select_string == bandpass_scan_select_string: testgainscans = bandpass_scan_select_string else: testgainscans = bandpass_scan_select_string + ',' + delay_scan_select_string GainTables = sorted(self.inputs.context.callibrary.active.get_caltable()) GainTables.append(addcaltable) bpdgains_task_args = {'vis': self.inputs.vis, 'caltable': caltable, 'field': '', 'spw': tst_bpass_spw, 'intent': '', 'selectdata': True, 'uvrange': '', 'scan': testgainscans, 'solint': solint, 'combine': 'scan', 'preavg': -1.0, 'refant': ','.join(RefAntOutput), 'minblperant': minBL_for_cal, 'minsnr': 5.0, 'solnorm': False, 'gaintype': 'G', 'smodel': [], 'calmode': 'ap', 'append': False, 'docallib': False, 'gaintable': GainTables, 'gainfield': [''], 'interp': [''], 'spwmap': [], 'parang': self.parang} testgainscanslist = list(map(int, testgainscans.split(','))) scanobjlist = m.get_scans(scan_id=testgainscanslist) fieldidlist = [] for scanobj in scanobjlist: fieldobj, = scanobj.fields if str(fieldobj.id) not in fieldidlist: fieldidlist.append(str(fieldobj.id)) for fieldidstring in fieldidlist: fieldid = int(fieldidstring) uvrangestring = uvrange(self.setjy_results, fieldid) bpdgains_task_args['field'] = fieldidstring bpdgains_task_args['uvrange'] = uvrangestring if os.path.exists(caltable): bpdgains_task_args['append'] = True job = casa_tasks.gaincal(**bpdgains_task_args) self._executor.execute(job) return True def _do_clipflag(self, bpcaltable: str): """Execute CASA task flagdata on the bpcaltable Args: bpcaltable(str): caltable to flag Returns: Executed job """ task_args = {'vis': bpcaltable, 'mode': 'clip', 'datacolumn': 'CPARAM', 'clipminmax': [0.0, 2.0], 'correlation': 'ABS_ALL', 'clipoutside': True, 'flagbackup': False, 'savepars': False, 'action': 'apply'} job = casa_tasks.flagdata(**task_args) return self._executor.execute(job) def _do_applycal(self, ktypecaltable: str = None, bpdgain_touse: str = None, bpcaltable: str = None, interp: str = None, spw: str = ''): """Run CASA task applycal with tables from priorcals task plus those generated in testBPdcals Args: ktypecaltable(str): output from K-type gaincal bpgain_touse(str): gaintable determined to be used from heuristics bpcaltable(str): BP caltable to use interp(str): applycal CASA task keyword spw(str): csv string values for spws pertaining to the particular band - '0,1,2,3,4,5,6' Returns: Executed job """ m = self.inputs.context.observing_run.get_ms(self.inputs.vis) testgainscans = self.inputs.context.evla['msinfo'][m.name].testgainscans AllCalTables = sorted(self.inputs.context.callibrary.active.get_caltable()) AllCalTables.append(ktypecaltable) AllCalTables.append(bpdgain_touse) AllCalTables.append(bpcaltable) ntables = len(AllCalTables) applycal_task_args = {'vis': self.inputs.vis, 'field': '', 'spw': spw, 'intent': '', 'selectdata': True, 'scan': testgainscans, 'docallib': False, 'gaintable': AllCalTables, 'gainfield': [''], 'interp': interp, 'spwmap': [], 'calwt': [False]*ntables, 'parang': self.parang, 'applymode': 'calflagstrict', 'flagbackup': False } job = casa_tasks.applycal(**applycal_task_args) return self._executor.execute(job) def _run_baddeformatters(self, bpcaltable: str): """Setting control parameters as method arguments Args: bpcaltable(str): BP cal table to use Return: result_amp(List): Bad deformatters amp flagging result_phase(List): Bad deformatters phase flagging amp_collection(Dict): Collection for weblog display phase_collection(Dict): Collection for weblog display num_antennas(int): Number of antennas (for weblog convenience) amp_job(Dict): flagdata result from the amplitude execution phase_job(Dict): flagdata result from the phase execution """ method_args = {'testq': 'amp', # Which quantity to test? ['amp','phase','real','imag'] 'tstat': 'rat', # Which stat to use?['min','max','mean','var']or'rat'=min/max or 'diff'=max-min 'doprintall': True, # Print detailed flagging stats 'testlimit': 0.15, # Limit for test (flag values under/over this limit) 'testunder': True, 'nspwlimit': 4, # Number of spw per baseband to trigger flagging entire baseband 'doflagundernspwlimit': self.inputs.doflagundernspwlimit, # Flag individual spws when below nspwlimit 'doflagemptyspws': False, # Flag data for spws with no unflagged channel solutions in any poln? 'calBPtablename': bpcaltable, # Define the table 'flagreason': 'bad_deformatters_amp or RFI'} # Define the REASON given for the flags (result_amp, amp_collection, num_antennas, amp_job) = self._do_flag_baddeformatters(**method_args) method_args = {'testq': 'phase', 'tstat': 'diff', 'doprintall': True, 'testlimit': 50, 'testunder': False, 'nspwlimit': 4, 'doflagundernspwlimit': self.inputs.doflagundernspwlimit, 'doflagemptyspws': False, 'calBPtablename': bpcaltable, 'flagreason': 'bad_deformatters_phase or RFI'} (result_phase, phase_collection, num_antennas, phase_job) = self._do_flag_baddeformatters(**method_args) return result_amp, result_phase, amp_collection, phase_collection, num_antennas, amp_job, phase_job def _do_flag_baddeformatters(self, testq: str = None, tstat: str = None, doprintall: bool = True, testlimit: float = None, testunder: bool = True, nspwlimit: int = 4, doflagundernspwlimit: bool = True, doflagemptyspws: bool = False, calBPtablename: str = None, flagreason: str = None): """Determine bad deformatters in the MS and flag them Looks for bandpass solutions that have small ratio of min/max amplitudes Args: testq(str): Which quantity to test? ['amp','phase','real','imag'] Original script: 'amp' tstat(str): Which stat to use? ['min','max','mean','var'] or 'rat'=min/max or 'diff'=max-min Original script: 'rat' doprintall(bool): Print detailed flagging stats Original script: True testlimit(float): Limit for test (flag values under/over this limit) Original script: 0.15 testunder(bool): Will flag values under limit Original script: True nspwlimit(int): Number of spw per baseband to trigger flagging entire baseband Original script: 4 doflagundernspwlimit(bool): Flag individual spws when below nspwlimit Original script: True doflagemptyspws(bool): Flag data for spws with no unflagged channel solutions in any poln? calBPtablename(str): caltable name flagreason(str): Reason for flagging Returns: flaglist(List): phase or amp flagging commands list weblogflagdict(Dict): collection for weblog display num_antennas(int): number of antennas job(Dict): Result of flagdata execution """ m = self.inputs.context.observing_run.get_ms(self.inputs.vis) num_antennas = len(m.antennas) startdate = m.start_time['m0']['value'] iglist = self.inputs.iglist LOG.info("Start date for flag bad deformatters is: " + str(startdate)) if startdate <= 56062.7: doflagdata = False else: doflagdata = True LOG.info("Will test on quantity: "+testq) LOG.info("Will test using statistic: "+tstat) if testunder: LOG.info("Will flag values under limit = "+str(testlimit)) else: LOG.info("Will flag values over limit = "+str(testlimit)) LOG.info("Will identify basebands with more than "+str(nspwlimit)+" bad spw") if doflagundernspwlimit: LOG.info("Will identify individual spw when less than "+str(nspwlimit)+" bad spw") if doflagemptyspws: LOG.info("Will identify spw with no unflagged channels") LOG.info("Will use flag REASON = "+flagreason) if doflagdata: LOG.info("Will flag data based on what we found") else: LOG.info("Will NOT flag data based on what we found") calBPstatresult = getBCalStatistics(calBPtablename) flaglist = [] extflaglist = [] weblogflagdict = collections.defaultdict(list) badpols = collections.defaultdict(lambda: collections.defaultdict(list)) crosshands = ['RL', 'LR'] for iant in calBPstatresult['antband']: antName = calBPstatresult['antDict'][iant] # PIPE-1183, only antenna name provided in ignore list so # skip bad deformatter flagging for all bands and spws corresponding to that antenna isAntIgnored = True if antName in iglist.keys() else False if isAntIgnored and len(iglist[antName]) == 0: LOG.info("Skipping bad deformatter flagging for all bands and spws corresponding to {!s} antenna".format(antName)) continue badspwlist = [] flaggedspwlist = [] for rrx in calBPstatresult['antband'][iant]: trrx = rrx.replace("EVLA_", "") if "EVLA_" in rrx else rrx # PIPE-1183, antenna name and band provided in ignore list so # skip bad deformatter flagging for spws corresponding to that antenna and band isBandIgnored = True if isAntIgnored and trrx in iglist[antName].keys() else False if isBandIgnored and len(iglist[antName][trrx]) == 0: LOG.info("Skipping bad deformatter flagging for all spws corresponding to {!s} band and {!s} antenna".format(rrx, antName)) continue for bband in calBPstatresult['antband'][iant][rrx]: # List of spw in this baseband spwl = calBPstatresult['rxBasebandDict'][rrx][bband] nbadspws = 0 badspws = [] flaggedspws = [] ignoredSPWs = [] if len(spwl) > 0: if doprintall: LOG.info(' Ant %s (%s) %s %s processing spws=%s' % (str(iant), antName, rrx, bband, str(spwl))) if isBandIgnored: for ispw in iglist[antName][trrx]: ignoredSPWs.extend(utils.range_to_list(ispw)) # PIPE-1183, skip bad deformatter flagging for SPWs in ignore list for ispw in spwl: if ispw in ignoredSPWs: LOG.info("Skipping bad deformatter flagging for {!s} spw corresponding to {!s} band and {!s} antenna".format(ispw, rrx, antName)) continue testvalid = False if ispw in calBPstatresult['antspw'][iant]: for poln in calBPstatresult['antspw'][iant][ispw]: # Get stats of this ant/spw/poln nbp = calBPstatresult['antspw'][iant][ispw][poln]['inner']['number'] if nbp > 0: if tstat == 'rat': bpmax = calBPstatresult['antspw'][iant][ispw][poln]['inner'][testq]['max'] bpmin = calBPstatresult['antspw'][iant][ispw][poln]['inner'][testq]['min'] if bpmax == 0.0: tval = 0.0 else: tval = bpmin/bpmax elif tstat == 'diff': bpmax = calBPstatresult['antspw'][iant][ispw][poln]['inner'][testq]['max'] bpmin = calBPstatresult['antspw'][iant][ispw][poln]['inner'][testq]['min'] tval = bpmax-bpmin else: # simple test on quantity tval = calBPstatresult['antspw'][iant][ispw][poln]['inner'][testq][tstat] if not testvalid: testval = tval testvalid = True elif testunder: if tval < testval: testval = tval else: if tval > testval: testval = tval # Test on extrema of the polarizations for this ant/spw if not testvalid: # these have no unflagged channels in any poln flaggedspws.append(ispw) else: if (testunder and testval < testlimit) or (not testunder and testval > testlimit): nbadspws += 1 badspws.append(ispw) badpols[iant][ispw].append(poln) if doprintall: LOG.info(' Found Ant %s (%s) %s %s spw=%s %s %s=%6.4f' % (str(iant), antName, rrx, bband, str(ispw), testq, tstat, testval)) else: # this spw is missing from this antenna/rx if doprintall: LOG.info(' Ant %s (%s) %s %s spw=%s missing solution' % (str(iant), antName, rrx, bband, str(ispw))) # Test to see if this baseband should be entirely flagged if nbadspws > 0 and nbadspws >= nspwlimit: # Flag all spw in this baseband bbspws = calBPstatresult['rxBasebandDict'][rrx][bband] badspwlist.extend(bbspws) LOG.info('Ant %s (%s) %s %s bad baseband spws=%s' % (str(iant), antName, rrx, bband, str(bbspws))) elif nbadspws > 0 and doflagundernspwlimit: # Flag spws individually badspwlist.extend(badspws) LOG.info('Ant %s (%s) %s %s bad spws=%s' % (str(iant), antName, rrx, bband, str(badspws))) if len(flaggedspws) > 0: flaggedspwlist.extend(flaggedspws) LOG.info('Ant %s (%s) %s %s no unflagged solutions spws=%s ' % (str(iant), antName, rrx, bband, str(flaggedspws))) if len(badspwlist) > 0: spw_info_list = [] corr_str = '' for ispw in badspwlist: # PIPE-1435: get list of bad polarizations for this spw pol_list = badpols[iant].get(ispw, []) if len(pol_list) > 0: dd = m.get_data_description(ispw) off_corr = [dd.get_polarization_label(polid) for polid in pol_list] all_corr = m.get_vla_corrlist_from_spw(str(ispw)) crosshands_in_spw = [c for c in all_corr if c in crosshands] corr_to_flag = list(set(off_corr + crosshands_in_spw)) if corr_to_flag: corr_str = ','.join(map(str, corr_to_flag)) flagstr = ( f"mode='manual' antenna='{antName}' " f"spw='{ispw}' correlation='{corr_str}' " ) else: flagstr = ( f"mode='manual' antenna='{antName}' spw='{ispw}' " ) else: flagstr = ( f"mode='manual' antenna='{antName}' spw='{ispw}' " ) flaglist.append(flagstr) spw_info_list.append({ "spw": str(ispw), "correlation": corr_str }) weblogflagdict[antName].extend(spw_info_list) if doflagemptyspws and len(flaggedspwlist) > 0: corr_str = '' spw_info_list = [] for ispw in flaggedspwlist: pol_list = badpols[iant].get(ispw, []) if len(pol_list) > 0: corr_str = ','.join(map(str, pol_list)) flagstr = ( f"mode='manual' antenna='{antName}' " f"spw='{ispw}' correlation='{corr_str}' " ) else: flagstr = ( f"mode='manual' antenna='{antName}' spw='{ispw}' " ) flaglist.append(flagstr) spw_info_list.append({ "spw": str(ispw), "correlation": corr_str }) extflaglist.extend(flaglist) weblogflagdict[antName].extend(spw_info_list) # Get basebands matched with spws. spws is a single element list with a single csv string tempDict = {} for antNamekey, spws in weblogflagdict.items(): basebands = [] for spwentry in spws: spw_id = spwentry.get("spw") if spw_id is None: continue spw = m.get_spectral_window(spw_id) basebands.append(spw.name.split('#')[0] + ' ' + spw.name.split('#')[1]) basebands = list(set(basebands)) # Unique basebands tempDict[antNamekey] = {'spws': spws, 'basebands': basebands} weblogflagdict = tempDict nflagcmds = len(flaglist) + len(extflaglist) if nflagcmds < 1: LOG.info("No bad basebands/spws found") else: LOG.info("Possible bad basebands/spws found:") for flagstr in flaglist: LOG.info(" "+flagstr) if len(extflaglist) > 0: LOG.info(" ") for flagstr in extflaglist: LOG.info(" "+flagstr) flaglist.extend(extflaglist) if doflagdata: LOG.info("Setting flag commands for bad deformatters in the ms (quantity {!s})...".format(testq)) task_args = {'vis': self.inputs.vis, 'mode': 'list', 'action': 'apply', 'inpfile': flaglist, 'savepars': True, 'flagbackup': True} job = casa_tasks.flagdata(**task_args) return flaglist, weblogflagdict, num_antennas, job # If the flag commands are not executed. return [], collections.defaultdict(list), num_antennas, None