Source code for pipeline.hif.tasks.setmodel.setmodel

import copy
import os

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.sessionutils as sessionutils
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.h.heuristics import fieldnames as fieldnames
from pipeline.h.tasks.common import commonfluxresults
from pipeline.infrastructure import task_registry

from . import setjy

LOG = infrastructure.get_logger(__name__)


class SetModelsInputs(vdp.StandardInputs):
    normfluxes = vdp.VisDependentProperty(default=True)
    refintent = vdp.VisDependentProperty(default = 'AMPLITUDE')
    scalebycan = vdp.VisDependentProperty(default=True)
    transintent = vdp.VisDependentProperty(default = 'BANDPASS')

    @vdp.VisDependentProperty
    def reference(self):
        # this will give something like '0542+3243,0343+242'
        field_fn = fieldnames.IntentFieldnames()
        reference_fields = field_fn.calculate(self.ms, self.refintent)
        # run the answer through a set, just in case there are duplicates
        fields = utils.deduplicate(s for s in utils.safe_split(reference_fields))
        return ','.join(fields)

    @vdp.VisDependentProperty
    def reffile(self):
        value = os.path.join(self.context.output_dir, 'flux.csv')
        return value

    @vdp.VisDependentProperty
    def transfer(self):
        transfer_fn = fieldnames.IntentFieldnames()
        # call the heuristic to get the transfer fields as a string
        transfer_fields = transfer_fn.calculate(self.ms, self.transintent)

        # remove the reference field should it also have been observed with
        # the transfer intent
        transfers = set(self.ms.get_fields(task_arg=transfer_fields))
        references = set(self.ms.get_fields(task_arg=self.reference))
        diff = transfers.difference(references)

        transfer_names = sorted(f.name for f in diff)
        fields_with_name = self.ms.get_fields(name=transfer_names)
        if len(fields_with_name) != len(diff) or len(diff) != len(transfer_names):
            return ','.join(str(f.id) for f in sorted(diff, key=lambda f: f.id))
        else:
            return ','.join(transfer_names)

    parallel = sessionutils.parallel_inputs_impl(default=False)

    # docstring and type hints: supplements hif_setmodels
    def __init__(self, context, output_dir=None, vis=None, reference=None,
                 refintent=None, transfer=None, transintent=None,
                 reffile=None, normfluxes=None, scalebychan=None, parallel=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            output_dir: Output directory.
                Defaults to ``None``, which corresponds to the current working directory.

            vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the pipeline context.

                Example: ``['M32A.ms', 'M32B.ms']``

            reference: A string containing a comma delimited list of  field names defining the reference calibrators. Defaults to field names with
                intent 'AMPLITUDE'.

                Example: ``'M82,3C273'``

            refintent: A string containing a comma delimited list of intents used to select the reference calibrators. Defaults to 'AMPLITUDE'.

                Example: ``'BANDPASS'``

            transfer: A string containing a comma delimited list of  field names defining the transfer calibrators. Defaults to field names with
                intent ''.

                Example: ``'J1328+041,J1206+30'``

            transintent: A string containing a comma delimited list of intents defining the transfer calibrators. Defaults to ``'BANDPASS,PHASE,CHECK'``.
                ``''`` stands for no transfer sources.

                Example: ``'PHASE'``

            reffile: The reference file containing a lookup table of point source models This file currently defaults to 'flux.csv' in the working directory. This
                file must conform to the standard pipeline 'flux.csv' format

                Example: ``'myfluxes.csv'``

            normfluxes: Normalize the transfer source flux densities.

            scalebychan: Scale the flux density on a per channel basis or else on a per spw basis

            parallel: Process multiple MeasurementSets in parallel using the casampi parallelization framework.

                Options: ``'automatic'``, ``'true'``, ``'false'``, ``True``, ``False``

                Default: ``None`` (equivalent to ``False``)

        """
        super().__init__()
        self.context = context
        self.vis = vis
        self.output_dir = output_dir
        self.reference = reference
        self.refintent = refintent
        self.transfer = transfer
        self.transintent = transintent
        self.reffile = reffile
        self.normfluxes = normfluxes
        self.scalebychan = scalebychan
        self.parallel = parallel


class SerialSetModels(basetask.StandardTaskTemplate):
    Inputs = SetModelsInputs

    def prepare(self, **parameters):

        # Initialize the result.
        result = commonfluxresults.FluxCalibrationResults(vis=self.inputs.vis)

        # Set reference calibrator models.
        #    These models will be assigned the lookup reference frequency,
        #    Stokes parameters, and spix if they are available. If they are not the
        #    Setjy defaults (spw center frequency, [1.0, 0.0, 0.0, 0.0], 0.0)
        #    will be used
        reference_fields = self.inputs.reference
        reference_intents = self.inputs.refintent
        if reference_fields not in (None, ''):
            refresults = self._do_setjy(
                reference_fields, reference_intents, reffile=self.inputs.reffile,
                normfluxes=False, scalebychan=self.inputs.scalebychan)
            # Add measurements to the results object
            result.measurements.update(copy.deepcopy(refresults.measurements))

        # Set transfer calibrator models.
        #    These models will  be assigned the lookup reference frequency,
        #    Stokes parameters, and spix if they are available. If they are not the
        #    Setjy defaults (spw center frequency, [1.0, 0.0, 0.0, 0.0], 0.0)
        #    will be used . If normfluxes is True then the stokes parameters
        #    will be normalized to a value of 1
        transfer_fields = self.inputs.transfer
        transfer_intents = self.inputs.transintent
        if transfer_fields not in (None, ''):
            transresults = self._do_setjy(
                transfer_fields, transfer_intents, reffile=self.inputs.reffile,
                normfluxes=self.inputs.normfluxes, scalebychan=self.inputs.scalebychan)
            # Add measurements to the results object
            result.measurements.update(copy.deepcopy(transresults.measurements))

        return result

    def analyse(self, result):
        return result

    # Call the Setjy task
    def _do_setjy(self, field, intent, reffile=None, normfluxes=None, scalebychan=None):
        task_args = {
            'output_dir': self.inputs.output_dir,
            'vis': self.inputs.vis,
            'field': field,
            'intent': intent,
            'fluxdensity': -1,
            'reffile': reffile,
            'normfluxes': normfluxes,
            'scalebychan': scalebychan
        }

        task_inputs = vdp.InputsContainer(setjy.Setjy, self.inputs.context, **task_args)
        task = setjy.Setjy(task_inputs)
        results_list = self._executor.execute(task, merge=False)
        return results_list[0]


[docs] @task_registry.set_equivalent_casa_task('hif_setmodels') class SetModels(sessionutils.ParallelTemplate): Inputs = SetModelsInputs Task = SerialSetModels