Source code for pipeline.hif.tasks.antpos.antpos

import csv
import os
import warnings

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.vdp as vdp
from pipeline.h.heuristics import caltable as acaltable
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry

__all__ = [
    'Antpos',
    'AntposInputs',
    'AntposResults'
]

LOG = infrastructure.get_logger(__name__)


class AntposResults(basetask.Results):
    def __init__(self, final=[], pool=[], preceding=[], antenna='', offsets=[]):
        super().__init__()
        self.pool = pool[:]
        self.final = final[:]
        self.preceding = preceding[:]
        self.error = set()
        self.antenna = antenna
        self.offsets = offsets

    def merge_with_context(self, context):
        """
        See :method:`~pipeline.api.Results.merge_with_context`
        """
        if not self.final:
            LOG.warning('No antenna position results to merge')
            return

        for calapp in self.final:
            LOG.debug('Adding calibration to callibrary:\n'
                      '%s\n%s' % (calapp.calto, calapp.calfrom))
            context.callibrary.add(calapp.calto, calapp.calfrom)

    def __repr__(self):
        s = 'AntposResults:\n'
        for calapplication in self.final:
            s += '\tBest caltable for spw #{spw} in {vis} is {name}\n'.format(
                spw=calapplication.spw, vis=os.path.basename(calapplication.vis),
                name=calapplication.gaintable)
        return s


class AntposInputs(vdp.StandardInputs):
    """
    AntposInputs defines the inputs for the Antpos pipeline task.
    """
    antenna = vdp.VisDependentProperty(default='')
    antposfile = vdp.VisDependentProperty(default='antennapos.csv')
    hm_antpos = vdp.VisDependentProperty(default='manual')

    @vdp.VisDependentProperty
    def offsets(self):
        return []

    @vdp.VisDependentProperty
    def caltable(self):
        """Get the caltable argument for these inputs.

        If set to a table-naming heuristic, this should give a sensible name
        considering the current CASA task arguments.
        """
        namer = acaltable.AntposCaltable()
        casa_args = self._get_task_args(ignore=('caltable',))
        return namer.calculate(output_dir=self.output_dir, stage=self.context.stage, **casa_args)

    # docstring and type hints: supplements hif_antpos
    def __init__(self, context, output_dir=None, vis=None, caltable=None, hm_antpos=None, antposfile=None, antenna=None,
                 offsets=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            output_dir: Output directory.
                Defaults to None, which corresponds to the current working directory.

            vis: List of input visibility files.

                Example: ``['ngc5921.ms']``

            caltable: Name of output gain calibration tables.

                Example: ``['ngc5921.gcal']``

            hm_antpos: Heuristics method for retrieving the antenna position corrections. The options are ``'online'`` (not yet implemented), ``'manual'``,
                and ``'file'``.

            antposfile: The file(s) containing the antenna offsets. Used if hm_antpos is ``'file'``.

                Example: ``'antennapos.csv'``

            antenna: The list of antennas for which the positions are to be corrected. Available when ``hm_antpos='manual'``.

                Example: ``antenna='DV05,DV07'``

            offsets: The list of antenna offsets for each antenna in ``'antennas'``. Each offset is a set of 3 floating point numbers separated by commas, specified
                in the ITRF frame. Available when ``hm_antpos='manual'``.

                Example: ``offsets=[0.01, 0.02, 0.03, 0.03, 0.02, 0.01]``
        """
        super().__init__()

        # pipeline inputs
        self.context = context
        # vis must be set first, as other properties may depend on it
        self.vis = vis
        self.output_dir = output_dir

        # data selection arguments
        self.antenna = antenna
        self.antposfile = antposfile
        self.caltable = caltable

        # solution parameters
        self.hm_antpos = hm_antpos
        self.offsets = offsets

    def to_casa_args(self):
        # Get the antenna and offset lists.
        if self.hm_antpos == 'manual':
            antenna = self.antenna
            offsets = self.offsets
        elif self.hm_antpos == 'file':
            filename = os.path.join(self.output_dir, self.antposfile)
            antenna, offsets = self._read_antpos_csvfile(
                filename, os.path.basename(self.vis))
        else:
            antenna = ''
            offsets = []

        return {'vis': self.vis,
                'caltable': self.caltable,
                'antenna': antenna,
                'parameter': offsets}

    @staticmethod
    def _read_antpos_csvfile(filename, msbasename):
        """
        Read and return the contents of a file or list of files.
        """

        # This assumes a very simple csv file format containing the following
        # columns
        #    ms
        #    antenna
        #    xoffset in meters
        #    yoffset in meters
        #    zoffset in meters
        #    comment
        antennas = []
        parameters = []

        if not os.path.exists(filename):
            LOG.warning('Antenna position offsets file does not exist')
            return ','.join(antennas), parameters

        with open(filename, 'rt') as f:
            reader = csv.reader(f)

            # First row is header row
            next(reader)

            # Loop over the rows
            for row in reader:
                if len(row) == 6:
                    (ms_name, ant_name, xoffset, yoffset, zoffset, _) = row
                else:
                    msg = "Cannot read antenna position file: %s. Row %s is not correctly formatted." % (filename, reader.line_num)
                    LOG.error(msg)
                    raise Exception(msg)
                if ms_name != msbasename:
                    continue
                antennas.append(ant_name)
                parameters.extend(
                    [float(xoffset), float(yoffset), float(zoffset)])

        # Convert the list to a string since CASA wants it that way?
        return ','.join(antennas), parameters


[docs] @task_registry.set_equivalent_casa_task('hif_antpos') class Antpos(basetask.StandardTaskTemplate): Inputs = AntposInputs
[docs] def prepare(self): inputs = self.inputs gencal_args = inputs.to_casa_args() gencal_job = casa_tasks.gencal(caltype='antpos', **gencal_args) if inputs.hm_antpos == 'file' and gencal_args['antenna'] == '': LOG.info('No antenna position offsets are defined') else: # PIPE-1309: we put the casa task call under the catch_warnings contextmanager to prevent # gencal(caltype='antpos') from raising UserWarnings as exceptions. This could be # removed after CAS-13614 is fixed. with warnings.catch_warnings(): self._executor.execute(gencal_job) calto = callibrary.CalTo(vis=inputs.vis) # careful now! Calling inputs.caltable mid-task will remove the # newly-created caltable, so we must look at the task arguments # instead calfrom = callibrary.CalFrom(gencal_args['caltable'], caltype='antpos', spwmap=[], interp='', calwt=False) calapp = callibrary.CalApplication(calto, calfrom) return AntposResults(pool=[calapp], antenna=gencal_args['antenna'], offsets=gencal_args['parameter'])
[docs] def analyse(self, result): # With no best caltable to find, our task is simply to set the one # caltable as the best result # double-check that the caltable was actually generated on_disk = [ca for ca in result.pool if ca.exists()] result.final[:] = on_disk missing = [ca for ca in result.pool if ca not in on_disk] result.error.clear() result.error.update(missing) return result