Source code for pipeline.hifv.tasks.importdata.importdata

import collections
import os
import shutil

import numpy

import pipeline.h.tasks.importdata.importdata as importdata
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.mpihelpers as mpihelpers
import pipeline.infrastructure.sessionutils as sessionutils
import pipeline.infrastructure.vdp as vdp
from pipeline.hifv.heuristics.vlascanheuristics import VLAScanHeuristics
from pipeline.hifv.heuristics.specline_detect import detect_spectral_lines
from pipeline.infrastructure import casa_tasks, casa_tools, task_registry

LOG = infrastructure.get_logger(__name__)


class VLAImportDataInputs(importdata.ImportDataInputs):
    # Override defaults in ImportDataInputs
    asis = vdp.VisDependentProperty(default='Receiver CalAtmosphere')
    ocorr_mode = vdp.VisDependentProperty(default='co')
    bdfflags = vdp.VisDependentProperty(default=False)
    process_caldevice = vdp.VisDependentProperty(default=True)
    createmms = vdp.VisDependentProperty(default='false')
    specline_spws = vdp.VisDependentProperty(default='auto')
    minparang = vdp.VisDependentProperty(default=0.0)
    parallel = sessionutils.parallel_inputs_impl(default=False)

    # docstring and type hints: supplements hifv_importdata
    def __init__(
        self,
        context,
        vis=None,
        output_dir=None,
        asis=None,
        process_caldevice=None,
        session=None,
        overwrite=None,
        nocopy=None,
        bdfflags=None,
        lazy=None,
        save_flagonline=None,
        createmms=None,
        ocorr_mode=None,
        datacolumns=None,
        specline_spws=None,
        minparang=None,
        parallel=None,
    ):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            vis: List of visibility data files. These may be ASDMs, tar files of ASDMs, MSes, or tar files of MSes, If ASDM files are specified, they will be
                converted  to MS format.

                Example: ``vis=['X227.ms', 'asdms.tar.gz']``

            output_dir: Output directory.
                Defaults to None, which corresponds to the current working directory.

            asis: Creates verbatim copies of the ASDM tables in the output MS. The value given to this option must be a list of table names
                separated by space characters.
                examples:

                - ``asis='Receiver CalAtmosphere'``
                - ``asis='Receiver'``, ``asis=''``

            process_caldevice:

            session: List of sessions to which the visibility files belong. Defaults to a single session containing all the visibility files, otherwise
                a session must be assigned to each vis file.

                Example: ``session=['Session_1', 'Sessions_2']``

            overwrite: Overwrite existing files on import.

            nocopy: When importing an MS, disable copying of the MS to the working directory.

            bdfflags:

            lazy:

            save_flagonline:

            createmms: Create a multi-MeasurementSet ('true') ready for parallel processing, or a standard MeasurementSet ('false'). The default setting
                ('automatic') creates an MMS if running in a cluster environment.

            ocorr_mode: Read in cross- and auto-correlation data(ca), cross- correlation data only (co), or autocorrelation data only (ao).

            datacolumns: Dictionary defining the data types of existing columns. The format is:

                ``{'data': 'data type 1'}``

                or

                ``{'data': 'data type 1', 'corrected': 'data type 2'}``

                For ASDMs the data type can only be RAW and one
                can only specify it for the data column.
                For MSes one can define two different data types
                for the DATA and CORRECTED_DATA columns and they
                can be any of the known data types (RAW,
                REGCAL_CONTLINE_ALL, REGCAL_CONTLINE_SCIENCE,
                SELFCAL_CONTLINE_SCIENCE, REGCAL_LINE_SCIENCE,
                SELFCAL_LINE_SCIENCE, BASELINED, ATMCORR). The
                intent selection strings _ALL or _SCIENCE can be
                skipped. In that case the task determines this
                automatically by inspecting the existing intents
                in the dataset.
                Usually, a single datacolumns dictionary is used
                for all datasets. If necessary, one can define a
                list of dictionaries, one for each EB, with
                different setups per EB.
                If no types are specified,
                {'data':'raw','corrected':'regcal_contline'}
                or {'data':'raw'} will be assumed, depending on
                whether the corrected column exists or not.

            specline_spws: String indicating how the pipeline should determine whether a spw should be processed as a spectral line window or continuum. The default setting of
                'auto' will use defined heuristics to determine this definition. Accepted
                values are 'auto', 'none' (no spws will be defined as spectral line), or
                a string of spw definitions in the CASA format.

                Example: ``specline_spws='2, 3, 4~9, 23'``

            minparang: Minimum required parallactic angle range for polarisation
                calibrator, in degrees. The default of 0.0 is used for
                non-polarisation processing.

            parallel: Process multiple MeasurementSets in parallel using the casampi parallelization framework.

                Options: ``'automatic'``, ``'true'``, ``'false'``, ``True``, ``False``

                Default: ``None`` (equivalent to ``False``)

        """
        super().__init__(context, vis=vis, output_dir=output_dir, asis=asis,
                         process_caldevice=process_caldevice, session=session,
                         overwrite=overwrite, nocopy=nocopy, bdfflags=bdfflags, lazy=lazy,
                         save_flagonline=save_flagonline, createmms=createmms,
                         ocorr_mode=ocorr_mode, datacolumns=datacolumns)
        self.specline_spws = specline_spws
        self.minparang = minparang
        self.parallel = parallel


class VLAImportDataResults(basetask.Results):
    def __init__(self, mses=None, setjy_results=None):
        super().__init__()

        if mses is None:
            mses = []
        self.mses = mses
        self.setjy_results = setjy_results
        self.origin = {}

    def merge_with_context(self, context):
        target = context.observing_run

        for ms in self.mses:
            LOG.info('Adding {0} to context'.format(ms.name))
            target.add_measurement_set(ms)

            if ms.antenna_array.name in ('EVLA', 'VLA', 'JVLA'):
                if not hasattr(context, 'evla'):
                    context.evla = collections.defaultdict(dict)

                msinfos = dict((ms.name, self._do_msinfo_heuristics(ms.name, context)) for ms in self.mses)
                context.evla['msinfo'].update(msinfos)
                context.project_summary.telescope = 'EVLA'
                context.project_summary.observatory = 'Karl G. Jansky Very Large Array'
                # context.evla['msinfo'] = { m.name : msinfo }

                # Dictionaries for per band solution intervals
                m = context.observing_run.get_ms(ms.name)
                spw2band = m.get_vla_spw2band()

                uniqueBands = set(spw2band.values())

                for bandname in uniqueBands:
                    context.evla['msinfo'][m.name].gain_solint1[bandname] = 'int'
                    context.evla['msinfo'][m.name].gain_solint2[bandname] = 'int'
                    context.evla['msinfo'][m.name].shortsol1[bandname] = 0.0
                    context.evla['msinfo'][m.name].shortsol2[bandname] = 0.0
                    context.evla['msinfo'][m.name].longsolint[bandname] = 0.0
                    context.evla['msinfo'][m.name].short_solint[bandname] = 0.0
                    context.evla['msinfo'][m.name].new_gain_solint1[bandname] = '1.0s'
                    context.evla['msinfo'][m.name].setjy_results = self.setjy_results
        if self.setjy_results:
            for result in self.setjy_results:
                result.merge_with_context(context)

    def _do_msinfo_heuristics(self, ms, context):
        """Gets heuristics for VLA via original msinfo script
        """

        msinfo = VLAScanHeuristics(ms)
        msinfo.makescandict()
        msinfo.calibratorIntents()
        msinfo.determine3C84()

        with casa_tools.TableReader(ms) as table:
            scanNums = sorted(numpy.unique(table.getcol('SCAN_NUMBER')))

        # Check for missing scans
        missingScans = 0
        missingScanStr = ''

        for i in range(max(scanNums)):
            if scanNums.count(i + 1) == 1:
                pass
            else:
                LOG.warning("WARNING: Scan " + str(i + 1) + " is not present")
                missingScans += 1
                missingScanStr = missingScanStr + str(i + 1) + ', '

        if missingScans > 0:
            LOG.warning("WARNING: There were " + str(missingScans) + " missing scans in this MS")
        else:
            LOG.info("No missing scans found.")

        return msinfo

    def __repr__(self):
        return 'VLAImportDataResults:\n\t{0}'.format('\n\t'.join([ms.name for ms in self.mses]))


class SerialVLAImportData(importdata.ImportData):
    Inputs = VLAImportDataInputs

    def prepare(self, **parameters):
        # get results object by running super.prepare()
        results = super().prepare()

        # create results object
        myresults = VLAImportDataResults(mses=results.mses, setjy_results=results.setjy_results)

        myresults.origin = results.origin

        for ms in myresults.origin:
            if myresults.origin[ms] == 'ASDM':
                myresults.origin[ms] = 'SDM'

        PbandWarning = ''
        for ms in myresults.mses:
            for key, value in ms.get_vla_spw2band().items():
                if 'P' in value:
                    PbandWarning = 'P-band data detected in the raw data. ' \
                                   'VLA P-band pipeline calibration has not yet been ' \
                                   'commissioned and may even fail. Please inspect all P-band pipeline ' \
                                   'products carefully.'

        if PbandWarning:
            LOG.warning(PbandWarning)

        # Spectral line detection tool
        for mset in myresults.mses:
            detect_spectral_lines(mset=mset, specline_spws=self.inputs.specline_spws)
            LOG.debug("Whether spectral window is designated as spectral line or continuum.")
            for spw in mset.get_all_spectral_windows():
                LOG.debug("SPW ID {}: {}".format(spw.id, spw.specline_window))

        return myresults

    def _do_importasdm(self, asdm):
        inputs = self.inputs
        vis = self._asdm_to_vis_filename(asdm)
        outfile = os.path.join(inputs.output_dir,
                               os.path.basename(asdm) + '.flagonline.txt')

        if inputs.save_flagonline:
            # Create the standard calibration flagging template file
            template_flagsfile = os.path.join(inputs.output_dir, os.path.basename(asdm) + '.flagtemplate.txt')
            self._make_template_flagfile(template_flagsfile, 'User flagging commands file for the calibration pipeline')
            # Create the imaging targets file
            template_flagsfile = os.path.join(inputs.output_dir, os.path.basename(asdm) + '.flagtargetstemplate.txt')
            self._make_template_flagfile(template_flagsfile, 'User flagging commands file for the imaging pipeline')

        createmms = mpihelpers.parse_mpi_input_parameter(inputs.createmms)

        with_pointing_correction = getattr(inputs, 'with_pointing_correction', True)

        task = casa_tasks.importasdm(asdm=asdm,
                                     vis=vis,
                                     savecmds=inputs.save_flagonline,
                                     outfile=outfile,
                                     process_caldevice=inputs.process_caldevice,
                                     asis=inputs.asis,
                                     overwrite=inputs.overwrite,
                                     bdfflags=inputs.bdfflags,
                                     lazy=inputs.lazy,
                                     with_pointing_correction=with_pointing_correction,
                                     ocorr_mode=inputs.ocorr_mode,
                                     process_pointing=True,
                                     createmms=createmms)

        self._executor.execute(task)

        for xml_filename in ['Source.xml', 'SpectralWindow.xml', 'DataDescription.xml']:
            asdm_source = os.path.join(asdm, xml_filename)
            if os.path.exists(asdm_source):
                vis_source = os.path.join(vis, xml_filename)
                LOG.info('Copying %s from ASDM to measurement set', xml_filename)
                LOG.trace('Copying %s: %s to %s', xml_filename, asdm_source, vis_source)
                shutil.copyfile(asdm_source, vis_source)


[docs] @task_registry.set_equivalent_casa_task('hifv_importdata') @task_registry.set_casa_commands_comment('If required, ASDMs are converted to MeasurementSets.') class VLAImportData(sessionutils.ParallelTemplate): """VLAImportData class for parallelization.""" Inputs = VLAImportDataInputs Task = SerialVLAImportData