Source code for pipeline.hif.tasks.refant.referenceantenna

import os

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.sessionutils as sessionutils
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import task_registry
from ...heuristics import findrefant

__all__ = [
    'RefAntInputs',
    'RefAntResults',
    'RefAnt',
    'SerialRefAnt',
]

LOG = infrastructure.get_logger(__name__)


class RefAntInputs(vdp.StandardInputs):
    # PIPE-1691: hif_refant is now implicitly a parallel task, but by default
    # running with parallel=False.
    parallel = sessionutils.parallel_inputs_impl(default=False)

    @vdp.VisDependentProperty
    def field(self):
        # return each field in the current ms that has been observed
        # with the desired intent
        fields = self.ms.get_fields(intent=self.intent)

        unique_field_names = {f.name for f in fields}
        field_ids = {f.id for f in fields}

        # Fields with different intents may have the same name. Check for this
        # and return the IDs if necessary
        if len(unique_field_names) == len(field_ids):
            return ','.join(unique_field_names)
        else:
            return ','.join([str(i) for i in field_ids])

    flagging = vdp.VisDependentProperty(default=True)
    geometry = vdp.VisDependentProperty(default=True)
    intent = vdp.VisDependentProperty(default='AMPLITUDE,BANDPASS,PHASE,POLARIZATION')
    refant = vdp.VisDependentProperty(default='')
    refantignore = vdp.VisDependentProperty(default='')
    spw = vdp.VisDependentProperty(default='')

    hm_refant = vdp.VisDependentProperty(default='automatic')

    @hm_refant.convert
    def hm_refant(self, value):
        return 'manual' if value == 'manual' else 'automatic'

    @vdp.VisDependentProperty
    def spw(self):
        return ','.join(str(spw.id) for spw in self.ms.get_spectral_windows(intent=self.intent))

    def to_casa_args(self):
        # refant does not use CASA tasks
        raise NotImplementedError

    # docstring and type hints: supplements hif_refant
    def __init__(self, context, vis=None, output_dir=None, field=None, spw=None, intent=None, hm_refant=None,
                 refant=None, geometry=None, flagging=None, refantignore=None, parallel=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets in the pipeline context.

                Example: ['M31.ms']

            output_dir: Output directory.
                Defaults to None, which corresponds to the current working directory.

            field: The comma delimited list of field names or field ids for which flagging scores are computed if hm_refant='automatic' and  flagging = True

                Example: '' (Default to fields with the specified intents), '3C279', '3C279,M82'

            spw: A string containing the comma delimited list of spectral window ids for which flagging scores are computed if hm_refant='automatic' and  flagging = True.

                Example: '' (all spws observed with the specified intents), '11,13,15,17'

            intent: A string containing a comma delimited list of intents against which the selected fields are matched. Defaults to all supported
                intents.

                Example: 'BANDPASS', 'AMPLITUDE,BANDPASS,PHASE,POLARIZATION'

            hm_refant: The heuristics method or mode for selection the reference antenna. The options are 'manual' and 'automatic. In manual
                mode a user supplied reference antenna refant is supplied.
                In 'automatic' mode the antennas are selected automatically.

            refant: The user supplied reference antenna for hm_refant='manual'. If no antenna list is supplied an empty list is returned.

                Example: 'DV05'

            geometry: Score antenna by proximity to the center of the array. This option is quick as only the ANTENNA table must be read.
                Parameter is available when ``hm_refant``='automatic'.

            flagging: Score antennas by percentage of unflagged data.  This option requires computing flagging statistics.
                Parameter is available when ``hm_refant``='automatic'.

            refantignore: string list to be ignored as reference antennas.

                Example:  refantignore='ea02,ea03'

            parallel: Process multiple MeasurementSets in parallel using the casampi parallelization framework.

                Options: ``'automatic'``, ``'true'``, ``'false'``, ``True``, ``False``

                Default: ``None`` (equivalent to ``False``)

        """
        self.context = context
        self.vis = vis
        self.output_dir = output_dir

        self.field = field
        self.spw = spw
        self.intent = intent

        self.hm_refant = hm_refant
        self.refant = refant
        self.geometry = geometry
        self.flagging = flagging
        self.refantignore = refantignore

        self.parallel = parallel


class RefAntResults(basetask.Results):
    def __init__(self, vis, refant):
        super().__init__()
        self._vis = vis
        self._refant = ','.join([str(ant) for ant in refant])

    def merge_with_context(self, context):
        if self._vis is None or self._refant is None:
            LOG.error('No results to merge')
            return

        # Do we also need to locate the antenna in the measurement set?
        # This might become necessary when using different sessions
        ms = context.observing_run.get_ms(name=self._vis)

        if ms:
            LOG.debug('Setting refant for {!s} to {!r}'.format(ms.basename, self._refant))
            ms.reference_antenna = self._refant

    def __str__(self):
        if self._vis is None or self._refant is None:
            return ('Reference antenna results:\n'
                    '\tNo reference antenna selected')
        else:
            return ('Reference antenna results:\n'
                    '{!s}: refant={!r}'.format(os.path.basename(self._vis), self._refant))

    def __repr__(self):
        return 'RefAntResults({!r}, {!r})'.format(self._vis, self._refant)


[docs] class SerialRefAnt(basetask.StandardTaskTemplate): Inputs = RefAntInputs def __init__(self, inputs): super().__init__(inputs)
[docs] def prepare(self, **parameters): inputs = self.inputs # Get the reference antenna list if inputs.hm_refant == 'manual': refant = inputs.refant.split(',') elif inputs.hm_refant == 'automatic': casa_intents = utils.to_CASA_intent(inputs.ms, inputs.intent) heuristics = findrefant.RefAntHeuristics(vis=inputs.vis, field=inputs.field, spw=inputs.spw, intent=casa_intents, geometry=inputs.geometry, flagging=inputs.flagging, refantignore=inputs.refantignore) refant = heuristics.calculate() else: raise NotImplemented('Unhandled hm_refant value: {!r}'.format(inputs.hm_refant)) return RefAntResults(inputs.vis, refant)
[docs] def analyse(self, results): return results
[docs] @task_registry.set_equivalent_casa_task('hif_refant') @task_registry.set_casa_commands_comment( 'Antennas are prioritized and enumerated based on fraction flagged and position in the array. The best antenna is ' 'used as a reference antenna unless it gets flagged, in which case the next-best antenna is used.\n' 'This stage performs a pipeline calculation without running any CASA commands to be put in this file.' ) class RefAnt(sessionutils.ParallelTemplate): Inputs = RefAntInputs Task = SerialRefAnt