Source code for pipeline.hsdn.tasks.restoredata.restoredata

"""
Restore task module for NRO data, based on h_restoredata.

The restore data module provides a class for reimporting, reflagging, and
recalibrating a subset of the MSes belonging to a member OUS, using pipeline
flagging and calibration data products.
"""
from __future__ import annotations

import os
from typing import TYPE_CHECKING

import pipeline.h.tasks.restoredata.restoredata as restoredata
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.vdp as vdp
from pipeline.hsd.tasks.applycal import applycal
from pipeline.infrastructure import casa_tools, task_registry
from pipeline.infrastructure.basetask import ResultsList

from ..importdata import importdata as importdata
from . import ampcal

LOG = infrastructure.logging.get_logger(__name__)

if TYPE_CHECKING:
    from pipeline.h.tasks.applycal import ApplycalResults
    from pipeline.infrastructure.launcher import Context


class NRORestoreDataInputs(restoredata.RestoreDataInputs):
    """NRORestoreDataInputs manages the inputs for the NRORestoreData task."""

    reffile = vdp.VisDependentProperty(default='nroscalefile.csv')
    caltable = vdp.VisDependentProperty(default='')
    hm_rasterscan = vdp.VisDependentProperty(default='time')

    # docstring and type hints: supplements hsdn_retoredata
    def __init__(self, context: Context, vis: list[str] = None, caltable: vdp.VisDependentProperty = None,
                 reffile: vdp.VisDependentProperty = None, products_dir: str = None,
                 copytoraw: vdp.VisDependentProperty = None, rawdata_dir: str = None, output_dir: str = None,
                 hm_rasterscan: str | None = None):
        """
        Initialise the Inputs, initialising any property values to those given here.

        Args:
            context: the pipeline Context state object

            vis: list of raw visibility data files to be restored.
                Assumed to be in the directory specified by rawdata_dir.

                Example: ``vis=['mg2.ms']``

            caltable: Name of output gain calibration tables.

                Example: ``caltable='ngc5921.gcal'``

            reffile: Path to a file containing scaling factors between beams.
                The format is equals to jyperk.csv with five fields:

                    - MS name
                    - beam name (instead of antenna name)
                    - spectral window id
                    - polarization string
                    - the scaling factor

                Example for the file is as follows::

                    #MS,Beam,Spwid,Polarization,Factor
                    mg2-20181016165248-181017.ms,NRO-BEAM0,0,I,1.000000000
                    mg2-20181016165248-181017.ms,NRO-BEAM0,1,I,1.000000000
                    mg2-20181016165248-181017.ms,NRO-BEAM0,2,I,1.000000000
                    mg2-20181016165248-181017.ms,NRO-BEAM0,3,I,1.000000000
                    mg2-20181016165248-181017.ms,NRO-BEAM1,0,I,3.000000000
                    mg2-20181016165248-181017.ms,NRO-BEAM1,1,I,3.000000000
                    mg2-20181016165248-181017.ms,NRO-BEAM1,2,I,3.000000000
                    mg2-20181016165248-181017.ms,NRO-BEAM1,3,I,3.000000000
                    mg2-20181016165248-181017.ms,NRO-BEAM2,0,I,0.500000000
                    mg2-20181016165248-181017.ms,NRO-BEAM2,1,I,0.500000000
                    mg2-20181016165248-181017.ms,NRO-BEAM2,2,I,0.500000000
                    mg2-20181016165248-181017.ms,NRO-BEAM2,3,I,0.500000000
                    mg2-20181016165248-181017.ms,NRO-BEAM3,0,I,2.000000000
                    mg2-20181016165248-181017.ms,NRO-BEAM3,1,I,2.000000000
                    mg2-20181016165248-181017.ms,NRO-BEAM3,2,I,2.000000000
                    mg2-20181016165248-181017.ms,NRO-BEAM3,3,I,2.000000000

                If no file name is specified or specified file doesn't exist,
                all the factors are set to 1.0.

                Example: ``reffile=''``, ``reffile='nroscalefile.csv'``

            products_dir: Name of the data products directory.

                Example: ``products_dir='myproductspath'``

                Default: ``None`` (equivalent to ``'../products'``)

            copytoraw: Copy calibration and flagging tables to
                raw data directory.

                Example: ``copytoraw=False``

                Default: ``None`` (equivalent to ``True``)

            rawdata_dir: Name of the raw data directory.

                Example: ``rawdata_dir='myrawdatapath'``

                Default: ``None`` (equivalent to ``'../rawdata'``)

            output_dir: the working directory for the restored data

            hm_rasterscan: Heuristics method for raster scan analysis.
                Two analysis modes, time-domain analysis ('time') and
                direction analysis ('direction'), are available.

                Default: ``None`` (equivalent to ``'time'``)

        """
        super().__init__(context, vis=vis, products_dir=products_dir,
                         copytoraw=copytoraw, rawdata_dir=rawdata_dir,
                         output_dir=output_dir)

        self.caltable = caltable
        self.reffile = reffile
        self.hm_rasterscan = hm_rasterscan


class NRORestoreDataResults(restoredata.RestoreDataResults):
    """Results object of NRORestoreData."""

    def __init__(self, importdata_results: ResultsList = None, applycal_results: ResultsList = None,
                 ampcal_results: ResultsList = None, flagging_summaries: list[dict[str, str]] = None):
        """
        Initialise the results objects.

        Args:
            importdata_results: results of importdata
            applycal_results: results of applycal
            ampcal_results: results of ampcal
            flagging_summaries: summaries of flagdata
        """
        super().__init__(importdata_results, applycal_results, flagging_summaries)
        self.ampcal_results = ampcal_results

    def merge_with_context(self, context: Context):
        """
        Merge results with context.

        Args:
            context: Context object
        """
        super().merge_with_context(context)

        # set amplitude scaling factor to ms domain objects
        if isinstance(self.applycal_results, ResultsList):
            for result in self.applycal_results:
                self._merge_ampcal(context, result)
        else:
            self._merge_ampcal(context, self.applycal_results)

    def _merge_ampcal(self, context: Context, applycal_results: ApplycalResults):
        """
        Merge results of applycal with context.

        Args:
            context: Context object
            applycal_results: results of applycal
        """
        for calapp in applycal_results.applied:
            msobj = context.observing_run.get_ms(name=os.path.basename(calapp.vis))
            if not hasattr(msobj, 'k2jy_factor'):
                for _calfrom in calapp.calfrom:
                    if _calfrom.caltype == 'amp' or _calfrom.caltype == 'gaincal':
                        LOG.debug('Adding k2jy factor to {0}'.format(msobj.basename))
                        # k2jy gaincal table
                        k2jytable = _calfrom.gaintable
                        k2jy_factor = {}
                        with casa_tools.TableReader(k2jytable) as tb:
                            spws = tb.getcol('SPECTRAL_WINDOW_ID')
                            antennas = tb.getcol('ANTENNA1')
                            params = tb.getcol('CPARAM').real
                            nrow = tb.nrows()
                        for irow in range(nrow):
                            spwid = spws[irow]
                            antenna = antennas[irow]
                            param = params[:, 0, irow]
                            npol = param.shape[0]
                            antname = msobj.get_antenna(antenna)[0].name
                            dd = msobj.get_data_description(spw=int(spwid))
                            if dd is None:
                                continue
                            for ipol in range(npol):
                                polname = dd.get_polarization_label(ipol)
                                k2jy_factor[(spwid, antname, polname)] = 1.0 / (param[ipol] * param[ipol])
                        msobj.k2jy_factor = k2jy_factor
            LOG.debug('msobj.k2jy_factor = {0}'.format(getattr(msobj, 'k2jy_factor', 'N/A')))


[docs] @task_registry.set_equivalent_casa_task('hsdn_restoredata') class NRORestoreData(restoredata.RestoreData): """Restore flagged and calibrated data produced during a previous pipeline run and archived on disk.""" Inputs = NRORestoreDataInputs
[docs] def prepare(self): """Prepare results.""" inputs = self.inputs LOG.debug('prepare inputs = {0}'.format(inputs)) # run prepare method in the parent class results = super().prepare() ampcal_results = self.ampcal_results # apply baseline table and produce baseline-subtracted MSs # apply final flags for baseline-subtracted MSs results = NRORestoreDataResults(results.importdata_results, results.applycal_results, ampcal_results, results.flagging_summaries) return results
def _do_importasdm(self, sessionlist: list[str], vislist: list[str]): """ Execute importasdm. Args: sessionlist: a list of sessions vislist: a list of vis """ inputs = self.inputs # NROImportDataInputs operate in the scope of a single measurement set. # To operate in the scope of multiple MSes we must use an # InputsContainer. LOG.debug('_do_importasdm inputs = {0}'.format(inputs)) container = vdp.InputsContainer(importdata.NROImportData, inputs.context, vis=vislist, output_dir=None, hm_rasterscan=inputs.hm_rasterscan) importdata_task = importdata.NROImportData(container) return self._executor.execute(importdata_task, merge=True) def _do_applycal(self): """Execute applycal.""" inputs = self.inputs LOG.debug('_do_applycal inputs = {0}'.format(inputs)) # Before applycal, sensitively (amplitude) correction using k2jycal task and # a scalefile (=reffile) given by Observatory. This is the special operation for NRO data. # If no scalefile exists in the working directory, skip this process. if os.path.exists(inputs.reffile): container = vdp.InputsContainer(ampcal.SDAmpCal, inputs.context, reffile=inputs.reffile) else: LOG.info('No scale factor file exists. Skip scaling.') container = vdp.InputsContainer(ampcal.SDAmpCal, inputs.context) LOG.debug('ampcal container = {0}'.format(container)) ampcal_task = ampcal.SDAmpCal(container) self.ampcal_results = self._executor.execute(ampcal_task, merge=True) # SDApplyCalInputs operates in the scope of a single measurement set. # To operate in the scope of multiple MSes we must use an # InputsContainer. container = vdp.InputsContainer(applycal.SerialSDApplycal, inputs.context) applycal_task = applycal.SerialSDApplycal(container) LOG.debug('_do_applycal container = {0}'.format(container)) return self._executor.execute(applycal_task, merge=True)