Source code for pipeline.hifv.tasks.finalcals.applycals

import collections
import os

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.vdp as vdp
from pipeline.h.tasks import applycal as happlycal
from pipeline.hif.tasks import applycal
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry
from pipeline.infrastructure import utils

LOG = infrastructure.get_logger(__name__)


class ApplycalsInputs(applycal.IFApplycalInputs):
    """
    ApplycalInputs defines the inputs for the Applycal pipeline task.
    """
    # docstring and type hints: supplements hifv_applycals
    def __init__(self, context, output_dir=None, vis=None,
                 # data selection arguments
                 field=None, spw=None, antenna=None, intent=None,
                 # preapply calibrations
                 parang=None, applymode=None, calwt=None,
                 flagbackup=None, flagsum=None, flagdetailedsum=None, gainmap=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            output_dir: Output directory.
                Defaults to None, which corresponds to the current working directory.

            vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the hifv_importdata task.

            field: A string containing the list of field names or field ids to which the calibration will be applied. Defaults to all fields in the pipeline
                context.

                Example: '3C279', '3C279, M82'

            spw: The list of spectral windows and channels to which the calibration will be applied. Defaults to all science windows in the pipeline.

                Example: '17', '11, 15'

            antenna: The selection of antennas to which the calibration will be applied. Defaults to all antennas. Not currently supported.

            intent: A string containing the list of intents against which the selected fields will be matched. Defaults to all supported intents
                in the pipeline context.

                Example: `'*TARGET*'`

            parang:

            applymode: Calibration apply mode.

                - 'calflag': calibrate data and apply flags from solutions
                - 'calflagstrict': same as above except flag spws for which calibration is
                  unavailable in one or more tables (instead of allowing them to pass
                  uncalibrated and unflagged)
                - 'trial': report on flags from solutions, dataset entirely unchanged
                - 'flagonly': apply flags from solutions only, data not calibrated
                - 'flagonlystrict': same as above except flag spws for which calibration is
                  unavailable in one or more tables
                - 'calonly': calibrate data only, flags from solutions NOT applied

            calwt:

            flagbackup: Backup the flags before the apply.

            flagsum: Compute before and after flagging summary statistics.

            flagdetailedsum: Compute detailed flagging statistics.

            gainmap: Mode to map gainfields to scans.

        """
        super().__init__(context, output_dir=output_dir, vis=vis, field=field, spw=spw,
                                              antenna=antenna, intent=intent, parang=parang,
                                              applymode=applymode, flagbackup=flagbackup, flagsum=flagsum,
                                              flagdetailedsum=flagdetailedsum)
        self.calwt = calwt
        self.gainmap = gainmap

    parang = vdp.VisDependentProperty(default=True)
    field = vdp.VisDependentProperty(default='')
    spw = vdp.VisDependentProperty(default='')
    intent = vdp.VisDependentProperty(default='')
    flagbackup = vdp.VisDependentProperty(default=True)
    calwt = vdp.VisDependentProperty(default=False)
    gainmap = vdp.VisDependentProperty(default=False)

    def to_casa_args(self):
        d = super().to_casa_args()
        d['intent'] = ''
        d['field'] = ''
        d['spw'] = ''
        return d


[docs] @task_registry.set_equivalent_casa_task('hifv_applycals') class Applycals(applycal.SerialIFApplycal): Inputs = ApplycalsInputs # Note this is a temporary workaround antenna_to_apply = '*&*'
[docs] def prepare(self): # Run applycal applycal_results = self._do_applycal() return applycal_results
[docs] def analyse(self, results): return results
def _do_applycal(self): result = self.applycal_run() return result
[docs] def applycal_run(self): inputs = self.inputs # Get the target data selection for this task as a CalTo object calto = callibrary.get_calto_from_inputs(inputs) # Now get the calibration state for that CalTo data selection. The # returned dictionary of CalTo:CalFroms specifies the calibrations to # be applied and the data selection to apply them to. # # Note that no 'ignore' argument is given to get_calstate # (specifically, we don't say ignore=['calwt'] like many other tasks) # as applycal is a task that can handle calwt and so different values # of calwt should in this case result in different tasks. calstate = inputs.context.callibrary.get_calstate(calto) merged = calstate.merged() # run a flagdata job to find the flagged state before applycal if inputs.flagsum: # 20170406 TN # flagdata task arguments are indirectly given so that sd applycal task is # able to edit them summary_args = dict(vis=inputs.vis, mode='summary') flagdata_summary_job = casa_tasks.flagdata(**self._get_flagsum_arg(summary_args)) stats_before = self._executor.execute(flagdata_summary_job) stats_before['name'] = 'before' if inputs.gainmap: applycalgroups = self.match_fields_scans() else: applycalgroups = collections.defaultdict(list) applycalgroups['1'] = [''] jobs = [] for gainfield, scanlist in applycalgroups.items(): for calto, calfroms in merged.items(): # if there's nothing to apply for this data selection, continue if not calfroms: continue # arrange a calibration job for the unique data selection inputs.spw = calto.spw inputs.field = calto.field inputs.intent = calto.intent args = inputs.to_casa_args() # Do this a different way ? args.pop('flagsum', None) # Flagsum is not a CASA applycal task argument args.pop('flagdetailedsum', None) # Flagdetailedsum is not a CASA applycal task argument # set the on-the-fly calibration state for the data selection. calapp = callibrary.CalApplication(calto, calfroms) # Note this is a temporary workaround ### args['antenna'] = self.antenna_to_apply # Note this is a temporary workaround ### # PIPE-1729: including only the gain tables that are present. taql = '' gaintables = [] gainfields = [] spwmaps = [] interps = [] calwts = [] for gaintable, gfield, spwmap, interp, calwt in zip(calapp.gaintable, calapp.gainfield, calapp.spwmap, calapp.interp, calapp.calwt): if gfield: ms = inputs.context.observing_run.get_measurement_sets()[0] gainfield_obj = ms.get_fields(gfield) if gainfield_obj: taql = (f"FIELD_ID == {gainfield_obj[0].id}") else: taql = '' LOG.warning(f"Unable to get ID for gainfield {gfield}") else: taql = '' if os.path.exists(gaintable) and utils.get_row_count(gaintable, taql) != 0: gaintables.append(gaintable) gainfields.append(gfield) spwmaps.append(spwmap) interps.append(interp) calwts.append(calwt) args['gaintable'] = gaintables args['gainfield'] = gainfields args['spwmap'] = spwmaps args['interp'] = interps args['calwt'] = calwts args['applymode'] = inputs.applymode # Ensure all gaintables are present if inputs.gainmap: # Determine what tables gainfield should used with if mode='gainmap' for i, table in enumerate(args['gaintable']): if 'finalampgaincal' in table or 'finalphasegaincal' in table: taql = (f"FIELD_ID == {gainfield}") if utils.get_row_count(table, taql): args['interp'][i] = 'linear' args['gainfield'][i] = gainfield else: LOG.warning(f"No data found for {gfield} in {table}, using gainfield = ''") args['scan'] = ','.join(scanlist) LOG.info("Using gainfield {!s} and scan={!s}".format(gainfield, ','.join(scanlist))) args.pop('gainmap', None) jobs.append(casa_tasks.applycal(**args)) if inputs.gainmap: for calto, calfroms in merged.items(): # if there's nothing to apply for this data selection, continue if not calfroms: continue # arrange a calibration job for the unique data selection inputs.spw = calto.spw inputs.field = calto.field inputs.intent = calto.intent args = inputs.to_casa_args() args['intent'] = 'CALIBRATE*' # Do this a different way ? args.pop('flagsum', None) # Flagsum is not a CASA applycal task argument args.pop('flagdetailedsum', None) # Flagdetailedsum is not a CASA applycal task argument # set the on-the-fly calibration state for the data selection. calapp = callibrary.CalApplication(calto, calfroms) # PIPE-1729: including only the gain tables that are present. taql = '' gaintables = [] gainfields = [] spwmaps = [] interps = [] calwts = [] for gaintable, gfield, spwmap, interp, calwt in zip(calapp.gaintable, calapp.gainfield, calapp.spwmap, calapp.interp, calapp.calwt): if gfield: ms = inputs.context.observing_run.get_measurement_sets()[0] gainfield_obj = ms.get_fields(gfield) if gainfield_obj: taql = (f"FIELD_ID == {gainfield_obj[0].id}") else: taql = '' LOG.warning(f"Unable to get ID for gainfield {gfield}") else: taql = '' if os.path.exists(gaintable) and utils.get_row_count(gaintable, taql) != 0: gaintables.append(gaintable) gainfields.append(gfield) spwmaps.append(spwmap) interps.append(interp) calwts.append(calwt) # Note this is a temporary workaround ### args['antenna'] = self.antenna_to_apply # Note this is a temporary workaround ### args['gaintable'] = gaintables args['gainfield'] = gainfields args['spwmap'] = spwmaps args['interp'] = interps args['calwt'] = calwts args['applymode'] = inputs.applymode args.pop('gainmap', None) jobs.append(casa_tasks.applycal(**args)) # execute the jobs for job in jobs: self._executor.execute(job) # run a final flagdata job to get the flagging statistics after # application of the potentially flagged caltables if inputs.flagsum: stats_after = self._executor.execute(flagdata_summary_job) stats_after['name'] = 'applycal' applied = [callibrary.CalApplication(calto, calfroms) for calto, calfroms in merged.items()] result = happlycal.ApplycalResults(applied=applied, data_type=self.applied_data_type) if inputs.flagsum: result.summaries = [stats_before, stats_after] # Flagging stats by spw and antenna if inputs.flagsum and inputs.flagdetailedsum: ms = self.inputs.context.observing_run.get_ms(inputs.vis) spws = ms.get_spectral_windows() spwids = [spw.id for spw in spws] # Note should intent be set to inputs.intent as shown below or is there # a reason not to do this. # fields = ms.get_fields(intent=inputs.intent) fields = ms.get_fields(intent='BANDPASS,PHASE,AMPLITUDE,CHECK,TARGET') if 'VLA' in self.inputs.context.project_summary.telescope: calfields = ms.get_fields(intent='AMPLITUDE,PHASE,BANDPASS') alltargetfields = ms.get_fields(intent='TARGET') fields = calfields Nplots = (len(alltargetfields) // 30) + 1 targetfields = [field for field in alltargetfields[0:len(alltargetfields):Nplots]] fields.extend(targetfields) flagsummary = collections.OrderedDict() flagkwargs = [] for field in fields: flagsummary[field.name.strip('"')] = {} for spwid in spwids: flagline = "spw='" + str(spwid) + "' fieldcnt=True mode='summary' name='AntSpw" + str(spwid).zfill(3) flagkwargs.append(flagline) # 20170406 TN # Tweak flagkwargs (default is do nothing) flagkwargs = self._tweak_flagkwargs(flagkwargs) # BRK note - Added kwarg fieldcnt based on Justo's changes, July 2015 # Need to have fieldcnt in the flagline above flaggingjob = casa_tasks.flagdata(vis=inputs.vis, mode='list', inpfile=flagkwargs, flagbackup=False) flagdicts = self._executor.execute(flaggingjob) # BRK note - for Justo's new flagging scheme, need to rearrrange # the dictionary keys in the order of field, spw report, antenna, with added name and type keys # on the third dictionary level. # Set into single dictionary report (single spw) if only one dict returned if len(flagkwargs) == 1: flagdictssingle = flagdicts flagdicts = {} flagdicts['report0'] = flagdictssingle for key in flagdicts: # report level fieldnames = list(flagdicts[key].keys()) fieldnames.remove('name') fieldnames.remove('type') for fieldname in fieldnames: try: flagsummary[fieldname][key] = flagdicts[key][fieldname] # TODO: review if this relies on order of keys. spwid = list(flagdicts[key][fieldname]['spw'].keys())[0] flagsummary[fieldname][key]['name'] = 'AntSpw' + str(spwid).zfill(3) + 'Field_' + str(fieldname) flagsummary[fieldname][key]['type'] = 'summary' except Exception as ex: LOG.debug("No flags to report for " + str(key) + str(ex)) result.flagsummary = flagsummary return result
[docs] def match_fields_scans(self): m = self.inputs.context.observing_run.get_ms(self.inputs.vis) # Count the number of groups intentslist = [list(scan.intents) for scan in m.get_scans(scan_intent='PHASE,TARGET')] intents = [] from itertools import groupby groups = [list(j) for i, j in groupby(intents)] primarygroups = [list(set(group))[0] for group in groups] ngroups = primarygroups.count('TARGET') targetscans = [] groups = [] phase1 = False phase2 = False prev_phase2 = False TargetGroup = collections.namedtuple("TargetGroup", "phase1 targetscans phase2") scans = m.get_scans(scan_intent='PHASE,TARGET') for idx, scan in enumerate(scans): fieldset = scan.fields fieldobj = list(fieldset)[0] fieldid = fieldobj.id if 'PHASE' in list(scan.intents): if not phase1 or (phase1 and bool(targetscans) == False): phase1 = scan else: phase2 = scan prev_phase2 = phase2 targetscans.append(scan) elif 'TARGET' in list(scan.intents): if not phase1 or prev_phase2: phase1 = prev_phase2 targetscans.append(scan) # Check for consecutive time ranges # see if this scan is the last one in the relevant scan list # or see if we have a phase2 # if so, end the group if phase2 or idx == len(scans)-1: groups.append(TargetGroup(phase1, targetscans, phase2)) phase1 = False phase2 = False targetscans = [] applycalgroups = collections.defaultdict(list) for idx, group in enumerate(groups): if group.phase2: fieldset = group.phase1.fields fieldobj = list(fieldset)[0] phase1fieldid = fieldobj.id fieldset = group.phase2.fields fieldobj = list(fieldset)[0] phase2fieldid = fieldobj.id gainfield = ','.join([str(phase1fieldid), str(phase2fieldid)]) else: fieldset = group.phase1.fields fieldobj = list(fieldset)[0] gainfield = str(fieldobj.id) targetscans = [str(targetscan.id) for targetscan in group.targetscans] try: gainfieldkey = gainfield.split(',')[1] except Exception as ex: LOG.debug(str(ex)) gainfieldkey = gainfield.split(',')[0] applycalgroups[gainfieldkey].extend(targetscans) for gainfield, scanlist in applycalgroups.items(): print("Applycal Group") print("\tGainfield.... {}".format(gainfield)) print("\t\tScanList... {}".format(','.join(scanlist))) print(" ") return applycalgroups