Source code for pipeline.hif.tasks.polcal.polcalworker

import copy
import os
import shutil

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.vdp as vdp
from pipeline.h.heuristics import caltable as pcaltable
from pipeline.hif.tasks.common import commoncalinputs
from pipeline.infrastructure import casa_tasks

LOG = infrastructure.get_logger(__name__)


class PolcalWorkerResults(basetask.Results):
    """
    PolcalWorkerResults is the results class for the pipeline polcal calibration
    task.
    """

    def __init__(self, final=None, pool=None, polcal_returns=None):
        """
        Construct and return a new PolcalWorkerResults.

        PolcalWorkerResults can be initialised with an optional list of
        CalApplications detailing which calibrations, from a pool of candidate
        calibrations (pool), are considered the best to apply (final).

        :param final: the calibrations selected as the best to apply
        :type final: list of :class:`~pipeline.infrastructure.callibrary.CalApplication`
        :param pool: the pool of all calibrations evaluated by the task
        :type pool: list of :class:`~pipeline.infrastructure.callibrary.CalApplication`
        """
        if final is None:
            final = []
        if pool is None:
            pool = []
        if polcal_returns is None:
            polcal_returns = []

        super().__init__()
        self.pool = pool
        self.final = final
        self.polcal_returns = polcal_returns
        self.error = set()

    def merge_with_context(self, context, to_field=None, to_intent=None):
        if not self.final:
            LOG.error('No results to merge')
            return

        for calapp in self.final:
            calto = self._get_calto(calapp.calto, to_field, to_intent)

            LOG.debug(f'Adding calibration to callibrary:\n{calto}\n{calapp.calfrom}')
            context.callibrary.add(calto, calapp.calfrom)

    def _get_calto(self, calto, to_field, to_intent):
        """
        Prepare and return the CalTo to be used for results merging.
        """
        # Do not modify the CalTo directly, as the original values should be
        # preserved for subsequent applications. The CalLibrary makes a
        # defensive copy of the CalFrom, so we do not need to protect that
        # object ourselves.
        calto_copy = copy.deepcopy(calto)

        # When dividing a multi-vis task up into single-vis tasks, the
        # to_field and to_intent parameters are resolved down to single-vis
        # scope accordingly. Therefore, we can use the to_field and to_intent
        # values directly as they should already be appropriate for the target
        # measurement set specified in this result.

        # Give the astronomer a chance to override the destination field and
        # intents, so that the reduction does not need to be repeated just to
        # change how the caltable should be applied.
        if to_field is not None:
            calto_copy.field = to_field
        if to_intent is not None:
            calto_copy.intent = to_intent

        return calto_copy

    def __repr__(self):
        s = 'PolcalWorkerResults:\n'
        for calapp in self.final:
            s += f'\t{os.path.basename(calapp.vis)}: calibration application for table {calapp.gaintable}\n'
        return s


class PolcalWorkerInputs(commoncalinputs.VdpCommonCalibrationInputs):
    @vdp.VisDependentProperty
    def caltable(self):
        namer = pcaltable.PolcalCaltable()
        casa_args = self._get_task_args(ignore=('caltable',))
        return namer.calculate(output_dir=self.output_dir, stage=self.context.stage, **casa_args)

    @vdp.VisDependentProperty
    def intent(self):
        return 'POLARIZATION,POLANGLE,POLLEAKAGE'

    @intent.convert
    def intent(self, value):
        if isinstance(value, list):
            value = [str(v).replace('*', '') for v in value]
        if isinstance(value, str):
            value = value.replace('*', '')
        return value

    def __init__(self, context, output_dir=None, vis=None, caltable=None, intent=None, field=None, spw=None,
                 refant=None, antenna=None, minblperant=None, selectdata=None, uvrange=None, scan=None, solint=None,
                 combine=None, preavg=None, minsnr=None, poltype=None, smodel=None, append=None):
        super().__init__(context, output_dir=output_dir, vis=vis, intent=intent, field=field, spw=spw, refant=refant,
                         antenna=antenna, minblperant=minblperant, selectdata=selectdata, uvrange=uvrange)

        # Polcal input parameters
        self.caltable = caltable
        self.scan = scan
        self.solint = solint
        self.combine = combine
        self.preavg = preavg
        self.minsnr = minsnr
        self.poltype = poltype
        self.smodel = smodel
        self.append = append


[docs] class PolcalWorker(basetask.StandardTaskTemplate): Inputs = PolcalWorkerInputs
[docs] def prepare(self): inputs = self.inputs # Set caltable to itself to generate a permanent caltable name. inputs.caltable = inputs.caltable # Delete a pre-existing table if append it not set. This is to # replicate the somewhat convoluted old filenamer "dry_run" logic # (see PIPE-1688). if os.path.exists(inputs.caltable) and not inputs.append: LOG.debug(f'Deleting existing calibration table {inputs.caltable}') shutil.rmtree(inputs.caltable, ignore_errors=True) # Retrieve original Spw input, to attach to final CalApplication. origin = [callibrary.CalAppOrigin(task=PolcalWorker, inputs=inputs.to_casa_args())] orig_spw = inputs.spw # Retrieve the on-the-fly calibration state for the data selection. calto = callibrary.get_calto_from_inputs(inputs) calstate = inputs.context.callibrary.get_calstate(calto) jobs = [] # If no on-the-fly calibration is applicable for the data selection # then generate a single polcal job based on inputs. if not calstate.merged(): args = inputs.to_casa_args() jobs.append(casa_tasks.polcal(**args)) # Otherwise, generate a separate polcal job for each data selection # for which the CalLibrary has a separate entry of CalFrom/CalTo. else: for calto, calfroms in calstate.merged().items(): # Update inputs based on CalTo. inputs.spw = calto.spw inputs.field = calto.field inputs.intent = calto.intent inputs.antenna = calto.antenna # Convert to CASA task arguments. args = inputs.to_casa_args() # Set the on-the-fly calibration state for the data selection. calapp = callibrary.CalApplication(calto, calfroms) args['gaintable'] = calapp.gaintable args['gainfield'] = calapp.gainfield args['spwmap'] = calapp.spwmap args['interp'] = calapp.interp jobs.append(casa_tasks.polcal(**args)) # Append subsequent output to the same caltable. inputs.append = True # execute the jobs polcal_returns = [] for job in jobs: polcal_returns.append(self._executor.execute(job)) # create the data selection target defining which data this caltable # should calibrate calto = callibrary.CalTo(vis=inputs.vis, spw=orig_spw) # create the calfrom object describing which data should be selected # from this caltable when applied to other data. Set the table name # (mandatory) and gainfield (to conform to suggested script # standard), leaving spwmap, interp, etc. at their default values. calfrom = callibrary.CalFrom(inputs.caltable, caltype='polarization', gainfield='nearest') calapp = callibrary.CalApplication(calto, calfrom, origin) result = PolcalWorkerResults(pool=[calapp], polcal_returns=polcal_returns) return result
[docs] def analyse(self, result): # Check that the caltable was actually generated on_disk = [table for table in result.pool if table.exists()] result.final[:] = on_disk missing = [table for table in result.pool if table not in on_disk] result.error.clear() result.error.update(missing) return result