Source code for pipeline.hifv.tasks.priorcals.priorcals

"""
Example usage:

inputs = pipeline.vla.tasks.priorcals.Priorcals.Inputs(context)
task = pipeline.vla.tasks.priorcals.Priocals(inputs)
result = task.execute()
result.accept(context)

"""
import os

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.hifv.tasks.gaincurves import GainCurves
from pipeline.hifv.tasks.opcal import Opcal
from pipeline.hifv.tasks.rqcal import Rqcal
from pipeline.hifv.tasks.swpowcal import Swpowcal
from pipeline.hifv.tasks.tecmaps import TecMaps
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry
from casatasks.private.correct_ant_posns_evla import correct_ant_posns_evla as correct_ant_posns
from . import resultobjects
from . import vlaantpos


LOG = infrastructure.get_logger(__name__)


class PriorcalsInputs(vdp.StandardInputs):
    """Inputs class for the hifv_priorcals pipeline task.  Used on VLA measurement sets.

    The class inherits from vdp.StandardInputs.

    """
    swpow_spw = vdp.VisDependentProperty(default='')
    show_tec_maps = vdp.VisDependentProperty(default=True)
    apply_tec_correction = vdp.VisDependentProperty(default=False)
    apply_gaincurves = vdp.VisDependentProperty(default=True)
    apply_opcal = vdp.VisDependentProperty(default=True)
    apply_rqcal = vdp.VisDependentProperty(default=True)
    apply_antpos = vdp.VisDependentProperty(default=True)
    apply_swpowcal = vdp.VisDependentProperty(default=False)
    ant_pos_time_limit = vdp.VisDependentProperty(default=150)

    # docstring and type hints: supplements hifv_priorcals
    def __init__(self, context, vis=None, show_tec_maps=None, apply_tec_correction=None, apply_gaincurves=None, apply_opcal=None,
                 apply_rqcal=None, apply_antpos=None, apply_swpowcal=None, swpow_spw=None, ant_pos_time_limit=None):
        """Initialize Inputs.

        Args:
            context (:obj:): Pipeline context

            vis(str): List of visibility data files. These may be ASDMs, tar files of ASDMs, MSes, or tar files of MSes, If ASDM files are specified, they will be
                converted  to MS format.

                Example: ``vis=['X227.ms', 'asdms.tar.gz']``

            show_tec_maps(bool): Plot tec maps. Display the plot output from the CASA tec_maps recipe function.

            apply_tec_correction: Apply tec correction. CASA tec_maps recipe function is executed -
                this bool determines if gencal is
                executed and the resulting table applied

            apply_gaincurves(bool): Apply gain curves correction, default True.

            apply_opcal(bool): Apply opacities correction, default True.

            apply_rqcal(bool): Apply requantizer gains correction, default True.

            apply_antpos(bool): Apply antenna position corrections, default True.

            apply_swpowcal(bool): Apply switched power table, default False.  If set True, ``apply_rqcal`` is ignored and no requantizer gain correction will be applied.

            swpow_spw(str): Spectral-window(s) for plotting: "" ==>all, spw="6, 14"

            ant_pos_time_limit(int): Antenna position time limit in days, default to 150 days.

        """
        self.context = context
        self.vis = vis
        self.show_tec_maps = show_tec_maps
        self.apply_tec_correction = apply_tec_correction
        self.swpow_spw = swpow_spw
        self.ant_pos_time_limit = ant_pos_time_limit
        self.apply_gaincurves = apply_gaincurves
        self.apply_opcal = apply_opcal
        self.apply_rqcal = apply_rqcal
        self.apply_antpos = apply_antpos
        self.apply_swpowcal = apply_swpowcal
        # PIPE-1665: comment from the ticket
        # If I can added a requirement, there should also be the ability to apply the switched power table.
        # The default for the switched power table should be to not apply it. However, if a user sets application
        # of switched power to true, this should implicity turn off the requantizer table, because switched power
        # includes the requantizer gain information implicitly.
        if apply_swpowcal:
            self.apply_rqcal = False

    def to_casa_args(self):
        raise NotImplementedError


[docs] @task_registry.set_equivalent_casa_task('hifv_priorcals') class Priorcals(basetask.StandardTaskTemplate): """Class for the Priorcals pipeline task. Used on VLA measurement sets. The class inherits from basetask.StandardTaskTemplate """ Inputs = PriorcalsInputs
[docs] def prepare(self): callist = [] gc_result = self._do_gaincurves() oc_result = self._do_opcal() rq_result = self._do_rqcal() sw_result = self._do_swpowcal() antpos_result, antcorrect = self._do_antpos() tecmaps_result = None if self.inputs.show_tec_maps or self.inputs.apply_tec_correction: tecmaps_result = self._do_tecmaps(show_tec_maps=self.inputs.show_tec_maps, apply_tec_correction=self.inputs.apply_tec_correction) # try: # antpos_result.merge_withcontext(self.inputs.context) # except: # LOG.error('No antenna position corrections.') return resultobjects.PriorcalsResults(pool=callist, gc_result=gc_result, oc_result=oc_result, rq_result=rq_result, antpos_result=antpos_result, antcorrect=antcorrect, tecmaps_result=tecmaps_result, sw_result=sw_result)
[docs] def analyse(self, results): return results
def _do_gaincurves(self): """Run gaincurves task""" inputs = GainCurves.Inputs(self.inputs.context, vis=self.inputs.vis) task = GainCurves(inputs) return self._executor.execute(task) def _do_opcal(self): """Run opcal task""" inputs = Opcal.Inputs(self.inputs.context, vis=self.inputs.vis) task = Opcal(inputs) return self._executor.execute(task) def _do_rqcal(self): """Run requantizer gains task""" inputs = Rqcal.Inputs(self.inputs.context, vis=self.inputs.vis) task = Rqcal(inputs) return self._executor.execute(task) def _do_swpowcal(self): """Run switched power task""" inputs = Swpowcal.Inputs(self.inputs.context, vis=self.inputs.vis, spw=self.inputs.swpow_spw) task = Swpowcal(inputs) return self._executor.execute(task) def _do_antpos(self): """Run hif_antpos to correct for antenna positions""" inputs = vlaantpos.VLAAntpos.Inputs(self.inputs.context, vis=self.inputs.vis, ant_pos_time_limit=self.inputs.ant_pos_time_limit) task = vlaantpos.VLAAntpos(inputs) result = self._executor.execute(task) antcorrect = {} try: antpos_caltable = result.final[0].gaintable if os.path.exists(antpos_caltable): LOG.info("Start antenna position corrections") antparamlist = correct_ant_posns(inputs.vis, print_offsets=False, time_limit=inputs.ant_pos_time_limit) LOG.info("End antenna position corrections") self._check_tropdelay(antpos_caltable) antList = antparamlist[1].split(',') N = 3 subList = [antparamlist[2][n:n+N] for n in range(0, len(antparamlist[2]), N)] antcorrect = dict(zip(antList, subList)) except Exception as ex: LOG.info("No offsets found. No caltable created.") LOG.debug(ex) m = self.inputs.context.observing_run.get_ms(self.inputs.vis) fracantcorrect = float(len(antcorrect)) / float(len(m.antennas)) if fracantcorrect > 0.5: LOG.warning("{:5.2f} percent of antennas needed position corrections.".format(100.0 * fracantcorrect)) return result, antcorrect def _do_tecmaps(self, show_tec_maps=True, apply_tec_correction=False): """Run tec_maps function""" inputs = TecMaps.Inputs(self.inputs.context, vis=self.inputs.vis, output_dir='', show_tec_maps=show_tec_maps, apply_tec_correction=apply_tec_correction) task = TecMaps(inputs) return self._executor.execute(task) def _check_tropdelay(self, antpos_caltable): # Insert value if required for testing """ #print "ADDED TEST TROP VALUE" trdelscale = 1.23 tb = casa_tools.table() tb.open(antpos_caltable, nomodify=False) tb.putkeyword('VLATrDelCorr', trdelscale) tb.close() #print "END OF ADDING TEST TROP VALUE" """ # Detect EVLA 16B Trop Del Corr # (Silent if required keyword absent, or has value=0.0) # antpostable = 'cal.antpos' trdelkw = 'VLATrDelCorr' with casa_tools.TableReader(antpos_caltable) as tb: if tb.keywordnames().count(trdelkw) == 1: trdelscale = tb.getkeyword(trdelkw) if trdelscale != 0.0: warning_message = "NB: This EVLA dataset appears to fall within the period of semester 16B " \ "during which the online tropospheric delay model was mis-applied. " \ "A correction for the online tropospheric delay model error WILL BE APPLIED! " \ "Tropospheric delay error correction coefficient="+str(-trdelscale/1000.0)+ " (ps/m) " LOG.debug("EVLA 16B Online Trop Del Corr is ON, scale=" + str(trdelscale)) LOG.warning(warning_message)