Source code for pipeline.hsd.tasks.restoredata.restoredata

"""
Restore task module for single dish data, based on h_restoredata.

The restore data module provides a class for reimporting, reflagging, and
recalibrating a subset of the ASDMs belonging to a member OUS, using pipeline
flagging and calibration data products.
"""
from __future__ import annotations

import os
from typing import TYPE_CHECKING

import pipeline.h.tasks.restoredata.restoredata as restoredata
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry
from .. import applycal
from ..importdata import importdata as importdata

if TYPE_CHECKING:
    from pipeline.hsd.tasks.applycal import SDApplycalResults
    from pipeline.hsd.tasks.importdata import SDImportDataResults
    from pipeline.infrastructure.launcher import Context

LOG = infrastructure.logging.get_logger(__name__)


class SDRestoreDataInputs(restoredata.RestoreDataInputs):
    """SDRestoreDataInputs manages the inputs for the SDRestoreData task."""

    asis = vdp.VisDependentProperty(default='SBSummary ExecBlock Annotation Antenna Station Receiver Source CalAtmosphere CalWVR SpectralWindow')
    ocorr_mode = vdp.VisDependentProperty(default='ao')
    hm_rasterscan = vdp.VisDependentProperty(default='time')

    # docstring and type hints: supplements hsd_restoredata
    def __init__(self, context: Context, copytoraw: bool | None = None, products_dir: str | None = None,
                 rawdata_dir: str | None = None, output_dir: str | None = None, session: str | None = None,
                 vis: list[str] = None, bdfflags: bool | None = None, lazy: bool | None = None,
                 asis: str | None = None, ocorr_mode: str | None = None, hm_rasterscan: str | None = None):
        """
        Initialise the Inputs, initialising any property values to those given here.

        Args:
            context: the pipeline Context state object

            copytoraw: Copy calibration and flagging tables from ``products_dir``
                to ``rawdata_dir`` directory.

                Example: ``copytoraw=False``

                Default: ``None`` (equivalent to ``True``)

            products_dir: Name of the data products directory to copy
                calibration products from.
                The parameter is effective only when ``copytoraw=True``.
                When ``copytoraw=False``, calibration products in
                ``rawdata_dir`` will be used.

                Example: products_dir='myproductspath'

                Default: ``None`` (equivalent to ``'../products'``)

            rawdata_dir: Name of the raw data directory.

                Example: ``rawdata_dir='myrawdatapath'``

                Default: ``None`` (equivalent to ``'../rawdata'``)

            output_dir: the working directory for the restored data

            session: List of sessions one per visibility file.

                Example: ``session=['session_3']``

            vis: List of raw visibility data files to be restored.
                Assumed to be in the directory specified by rawdata_dir.

                Example: ``vis=['uid___A002_X30a93d_X43e']``

            bdfflags: Apply BDF flags on import.

                Example: ``bdfflags=False``

                Default: ``None`` (equivalent to ``True``)

            lazy: Use the lazy filler option.

                Example: ``lazy=True``

                Default: ``None`` (equivalent to ``False``)

            asis: Creates verbatim copies of the ASDM tables in the output MS.
                The value given to this option must be a list of table names
                separated by space characters. Default value, None, is equivalent
                to the following list.

                    ``'SBSummary ExecBlock Annotation Antenna Station Receiver Source CalAtmosphere CalWVR SpectralWindow'``

                Example: ``asis='Source Receiver'``

            ocorr_mode: Selection of baseline correlation to import.
                Valid only if input visibility is ASDM. See a document of CASA,
                casatasks::importasdm, for available options.

                Example: ``ocorr_mode='ca'``

                Default: ``None`` (equivalent to ``'ao'``)

            hm_rasterscan: Heuristics method for raster scan analysis.
                Two analysis modes, time-domain analysis ('time') and
                direction analysis ('direction'), are available.

                Default: ``None`` (equivalent to ``'time'``)

        """
        super().__init__(context, copytoraw=copytoraw, products_dir=products_dir,
                         rawdata_dir=rawdata_dir, output_dir=output_dir, session=session,
                         vis=vis, bdfflags=bdfflags, lazy=lazy, asis=asis,
                         ocorr_mode=ocorr_mode)

        self.hm_rasterscan = hm_rasterscan


class SDRestoreDataResults(restoredata.RestoreDataResults):
    """Results object of SDRestoreData."""

    def __init__(self, importdata_results: SDImportDataResults, applycal_results: SDApplycalResults,
                 flagging_summaries: list[dict[str, str]]):
        """
        Initialise the results objects.

        Args:
            importdata_results: results of importdata
            applycal_results: results of applycal
            flagging_summaries: summaries of flagdata
        """
        super().__init__(importdata_results, applycal_results, flagging_summaries)

    def merge_with_context(self, context: Context):
        """
        Call same method of superclass and _merge_k2jycal().

        Args:
            context: the pipeline Context state object
        """
        super().merge_with_context(context)

        # set k2jy factor to ms domain objects
        if isinstance(self.applycal_results, basetask.ResultsList):
            for result in self.applycal_results:
                self._merge_k2jycal(context, result)
        else:
            self._merge_k2jycal(context, self.applycal_results)

    def _merge_k2jycal(self, context: Context, applycal_results: SDApplycalResults):
        """
        Merge k2jycal caltable into context.

        Jy/K factors are retrieved from caltables and are attached to
        measurementset domain object.

        Args:
            context: the pipeline Context state object
            applycal_results: results object of applycal
        """
        for calapp in applycal_results.applied:
            msobj = context.observing_run.get_ms(name=os.path.basename(calapp.vis))
            if not hasattr(msobj, 'k2jy_factor'):
                for _calfrom in calapp.calfrom:
                    if _calfrom.caltype == 'amp' or _calfrom.caltype == 'gaincal':
                        LOG.debug('Adding k2jy factor to {0}'.format(msobj.basename))
                        # k2jy gaincal table
                        k2jytable = _calfrom.gaintable
                        k2jy_factor = {}
                        with casa_tools.TableReader(k2jytable) as tb:
                            spws = tb.getcol('SPECTRAL_WINDOW_ID')
                            antennas = tb.getcol('ANTENNA1')
                            params = tb.getcol('CPARAM').real
                            nrow = tb.nrows()
                        for irow in range(nrow):
                            spwid = spws[irow]
                            antenna = antennas[irow]
                            param = params[:, 0, irow]
                            npol = param.shape[0]
                            antname = msobj.get_antenna(antenna)[0].name
                            dd = msobj.get_data_description(spw=int(spwid))
                            if dd is None:
                                continue
                            for ipol in range(npol):
                                polname = dd.get_polarization_label(ipol)
                                k2jy_factor[(spwid, antname, polname)] = 1.0 / (param[ipol] * param[ipol])
                        msobj.k2jy_factor = k2jy_factor
            LOG.debug('msobj.k2jy_factor = {0}'.format(getattr(msobj, 'k2jy_factor', 'N/A')))


[docs] @task_registry.set_equivalent_casa_task('hsd_restoredata') class SDRestoreData(restoredata.RestoreData): """Restore flagged and calibrated data produced during a previous pipeline run and archived on disk.""" Inputs = SDRestoreDataInputs
[docs] def prepare(self): """Call prepare method of superclass, create Results ofject.""" # run prepare method in the parent class results = super().prepare() # apply baseline table and produce baseline-subtracted MSs # apply final flags for baseline-subtracted MSs sdresults = SDRestoreDataResults(results.importdata_results, results.applycal_results, results.flagging_summaries) return sdresults
def _do_importasdm(self, sessionlist: list[str], vislist: list[str]): """ Execute importasdm task. Args: sessionlist: session list of pipeline vislist: MeasurementSet list of pipeline """ inputs = self.inputs # SDImportDataInputs operate in the scope of a single measurement set. # To operate in the scope of multiple MSes we must use an # InputsContainer. container = vdp.InputsContainer(importdata.SerialSDImportData, inputs.context, vis=vislist, session=sessionlist, save_flagonline=False, lazy=inputs.lazy, bdfflags=inputs.bdfflags, asis=inputs.asis, ocorr_mode=inputs.ocorr_mode, hm_rasterscan=inputs.hm_rasterscan) importdata_task = importdata.SerialSDImportData(container) return self._executor.execute(importdata_task, merge=True) def _do_applycal(self): """Execute applycal task.""" inputs = self.inputs # SDApplyCalInputs operates in the scope of a single measurement set. # To operate in the scope of multiple MSes we must use an # InputsContainer. container = vdp.InputsContainer(applycal.SerialSDApplycal, inputs.context) applycal_task = applycal.SerialSDApplycal(container) return self._executor.execute(applycal_task, merge=True)