Source code for pipeline.hifv.tasks.flagging.flagcal

import os

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry

LOG = infrastructure.get_logger(__name__)


class FlagcalResults(basetask.Results):
    def __init__(self, final=[], pool=[], preceding=[]):
        super().__init__()
        self.pool = pool[:]
        self.final = final[:]
        self.preceding = preceding[:]
        self.error = set()

    def merge_with_context(self, context):
        """
        See :method:`~pipeline.infrastructure.api.Results.merge_with_context`
        """
        if not self.final:
            LOG.warning('No flagcal results')
            return

    def __repr__(self):
        #return 'FlagcalResults:\n\t{0}'.format(
        #    '\n\t'.join([ms.name for ms in self.mses]))
        return 'FlagcalResults:'


class FlagcalInputs(vdp.StandardInputs):
    caltable = vdp.VisDependentProperty(default='finalampgaincal.tbl')
    clipminmax = vdp.VisDependentProperty(default=[0.9, 1.1])

    # docstring and type hints: supplements hifv_flagcal
    def __init__(self, context, vis=None, caltable=None, clipminmax=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            vis: List of input visibility data.

            caltable: String name of the caltable.

            clipminmax: Range to use for clipping.

        """
        super().__init__()
        self.context = context
        self.vis = vis
        self.caltable = caltable
        self.clipminmax = clipminmax


[docs] @task_registry.set_equivalent_casa_task('hifv_flagcal') class Flagcal(basetask.StandardTaskTemplate): Inputs = FlagcalInputs
[docs] def prepare(self): LOG.info("This Flagcal class is running.") LOG.info(self.inputs.caltable) LOG.info(','.join([str(x) for x in self.inputs.clipminmax])) # Check finalcal stage prefixes. caltable = self.inputs.caltable if not os.path.exists(caltable): m = self.inputs.context.observing_run.get_ms(self.inputs.vis) try: caltable = self.inputs.context.evla['msinfo'][m.name].finalampgaincaltable except AttributeError: LOG.warning("Exception: 'finalampgaincaltable' is not present.") flagcal_result = self._do_flagdata(caltable=caltable, clipminmax=self.inputs.clipminmax) return flagcal_result
[docs] def analyse(self, results): return results
def _do_flagdata(self, caltable=None, clipminmax=None): task_args = {'vis' : caltable, 'mode' : 'clip', 'correlation' : 'ABS_ALL', 'datacolumn' : 'CPARAM', 'clipminmax' : clipminmax, 'clipoutside' : True, 'action' : 'apply', 'flagbackup' : False, 'savepars' : False} job = casa_tasks.flagdata(**task_args) self._executor.execute(job) return FlagcalResults([job])