Source code for pipeline.hsd.tasks.importdata.importdata

"""
The hsd_importdata task.

This task loads the specified visibility data into the pipeline
context unpacking and / or converting it as necessary.
"""
from __future__ import annotations

import os
from typing import TYPE_CHECKING, Any

import pipeline.h.tasks.importdata.importdata as importdata
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.sessionutils as sessionutils
import pipeline.infrastructure.vdp as vdp
from pipeline.hsd.tasks.common.inspection_util import merge_reduction_group
from pipeline.infrastructure import task_registry
from pipeline.infrastructure.utils import relative_path

from . import inspection

LOG = infrastructure.logging.get_logger(__name__)

if TYPE_CHECKING:
    from pipeline.domain import MeasurementSet, ObservingRun
    from pipeline.domain.singledish import MSReductionGroupDesc
    from pipeline.h.tasks.common.commonfluxresults import FluxCalibrationResults
    from pipeline.infrastructure.launcher import Context


class SDImportDataInputs(importdata.ImportDataInputs):
    """Class of inputs of SDImportData.

    This class extends importdata.ImportDataInputs.
    """

    asis = vdp.VisDependentProperty(default='SBSummary ExecBlock Annotation Antenna Station Receiver Source CalAtmosphere CalWVR SpectralWindow')
    ocorr_mode = vdp.VisDependentProperty(default='ao')
    with_pointing_correction = vdp.VisDependentProperty(default=True)
    createmms = vdp.VisDependentProperty(default='false')
    hm_rasterscan = vdp.VisDependentProperty(default='time')

    parallel = sessionutils.parallel_inputs_impl()

    # docstring and type hints: supplements hsd_importdata
    def __init__(self,
                 context: Context,
                 vis: list[str] | None = None,
                 output_dir: str | None = None,
                 asis: str | None = None,
                 process_caldevice: bool | None = None,
                 session: list[str] | None = None,
                 overwrite: bool | None = None,
                 nocopy: bool | None = None,
                 bdfflags: bool | None = None,
                 datacolumns: dict | None = None,
                 save_flagonline: bool | None = None,
                 lazy: bool | None = None,
                 with_pointing_correction: bool | None = None,
                 createmms: str | None = None,
                 ocorr_mode: str | None = None,
                 hm_rasterscan: str | None = None,
                 parallel: str | bool | None = None):
        """Initialise SDImportDataInputs class.

        Args:
            context: pipeline context

            vis: List of visibility data files. These may be ASDMs, tar files of ASDMs,
                MSes, or tar files of MSes, If ASDM files are specified, they will be
                converted to MS format.

                Example: ``vis=['X227.ms', 'asdms.tar.gz']``

            output_dir: path of output directory

            asis: Creates verbatim copies of the ASDM tables in the output MS.
                The value given to this option must be a list of table names
                separated by space characters. Default value, None, is
                equivalent to the following list.

                    ``'SBSummary ExecBlock Annotation Antenna Station Receiver Source CalAtmosphere CalWVR SpectralWindow'``

                Example: ``'Receiver'``, ``''``

            process_caldevice: Import the ASDM caldevice table.

                Example: ``True``

                Default: ``None`` (equivalent to ``True``)

            session: List of sessions to which the visibility files belong.
                Defaults to a single session containing all the visibility files,
                otherwise a session must be assigned to each vis file.

                Example: ``session=['Session_1', 'Sessions_2']``

            overwrite: Overwrite existing files on import. When converting
                ASDM to MS, if overwrite=False and the MS already exists in
                output directory, then this existing MS dataset will be used
                instead.

                Default: ``None`` (equivalent to ``False``)

            nocopy: Disable copying of MS to working directory

            bdfflags: Apply BDF flags on import.

            datacolumns: Dictionary defining the data types of existing columns.
                The format is:

                    ``{'data': 'data type 1'}``

                or

                    ``{'data': 'data type 1', 'corrected': 'data type 2'}``

                For ASDMs the data type can only be RAW and one can only specify
                it for the data column.
                For MSes one can define two different data types for the DATA and
                CORRECTED_DATA columns and they can be any of the known data types
                (RAW, REGCAL_CONTLINE_ALL, REGCAL_CONTLINE_SCIENCE,
                SELFCAL_CONTLINE_SCIENCE, REGCAL_LINE_SCIENCE,
                SELFCAL_LINE_SCIENCE, BASELINED, ATMCORR).
                The intent selection strings _ALL or _SCIENCE can be skipped.
                In that case the task determines this automatically by inspecting
                the existing intents in the dataset.
                Usually, a single datacolumns dictionary is used for all datasets.
                If necessary, one can define a list of dictionaries, one for each EB,
                with different setups per EB. If no type is specified, {'data':'raw'}
                will be assumed.

            save_flagonline: Save flag commands, flagging template, imaging targets, to text files。

                Default: ``None`` (equivalent to ``True``)

            lazy: Use the lazy filter import.

                Default: ``None`` (equivalent to ``False``)

            with_pointing_correction: Apply pointing correction to DIRECTION.
                Add (ASDM::Pointing::encoder - ASDM::Pointing::pointingDirection)
                to the value to be written in MS::Pointing::direction.

                Default: ``None`` (equivalent to ``True``)

            createmms: Create an MMS.

                Default: ``None`` (equivalent to ``False``)

            ocorr_mode: Selection of baseline correlation to import.
                Valid only if input visibility is ASDM. See a document of CASA,
                casatasks::importasdm, for available options.

                Default: ``None`` (equivalent to ``'ao'``)

            hm_rasterscan: Heuristics method for raster scan analysis.
                Two analysis modes, time-domain analysis ('time') and
                direction analysis ('direction'), are available.

                Default: ``None`` (equivalent to ``'time'``)

            parallel: Execute using CASA HPC functionality, if available.

                Options: ``'automatic'``, ``'true'``, ``'false'``, ``True``, ``False``

                Default: ``None`` (equivalent to ``'automatic'``)
        """
        super().__init__(context, vis=vis, output_dir=output_dir, asis=asis,
                         process_caldevice=process_caldevice, session=session,
                         overwrite=overwrite, nocopy=nocopy, bdfflags=bdfflags, lazy=lazy,
                         save_flagonline=save_flagonline, createmms=createmms,
                         ocorr_mode=ocorr_mode, datacolumns=datacolumns)
        self.with_pointing_correction = with_pointing_correction
        self.hm_rasterscan = hm_rasterscan
        self.parallel = parallel


class SDImportDataResults(basetask.Results):
    """SDImportDataResults is an equivalent class with ImportDataResults.

    Purpose of SDImportDataResults is to replace QA scoring associated
    with ImportDataResults with single dish specific QA scoring, which
    is associated with this class.

    ImportDataResults holds the results of the ImportData task. It contains
    the resulting MeasurementSet domain objects and optionally the additional
    SetJy results generated from flux entries in Source.xml.
    """

    def __init__(self,
                 mses: list[MeasurementSet] | None = None,
                 reduction_group_list: list[dict[int, MSReductionGroupDesc] | None] = None,
                 datatable_prefix: str | None = None,
                 setjy_results: list[FluxCalibrationResults] | None = None,
                 org_directions: dict[str, str | dict[str, str | float | None]] = None):
        """Initialise SDImportDataResults class.

        Args:
            mses: List of MeasurementSet domain objects
            reduction_group_list: List of dictionaries that consist of reduction group IDs (key) and MSReductionGroupDesc (value)
            datatable_prefix: path to directory that stores DataTable of each MeasurementSet
            setjy_results: the flux results generated from Source.xml
            org_directions: Dict of Direction objects of the origin
        """
        super().__init__()
        self.mses = [] if mses is None else mses
        self.reduction_group_list = reduction_group_list
        self.datatable_prefix = datatable_prefix
        self.setjy_results = setjy_results
        self.org_directions = org_directions
        self.origin = {}
        self.rasterscan_heuristics_results_direction = {}
        self.rasterscan_heuristics_results_time = {}
        self.results = importdata.ImportDataResults(mses=mses, setjy_results=setjy_results)

    def merge_with_context(self, context: Context):
        """Override method of basetask.Results.merge_with_context.

        Args:
            context: pipeline context
        """
        self.results.merge_with_context(context)
        self.__merge_reduction_group(context.observing_run, self.reduction_group_list)
        context.observing_run.ms_datatable_name = self.datatable_prefix
        context.observing_run.org_directions = self.org_directions

    def __merge_reduction_group(self, observing_run: ObservingRun, reduction_group_list: list[dict[int, MSReductionGroupDesc]]):
        """Call merge_reduction_group.

        Args:
            observing_run: pipeline.domain.observingrun.ObservingRun object
            reduction_group_list: List of dictionaries that consist of reduction group IDs (key) and MSReductionGroupDesc (value)
        """
        for reduction_group in reduction_group_list:
            merge_reduction_group(observing_run, reduction_group)

    def __repr__(self) -> str:
        """Override of __repr__.

        Returns:
            str: repr string
        """
        return 'SDImportDataResults:\n\t{0}'.format('\n\t'.join([ms.name for ms in self.mses]))


class SerialSDImportData(importdata.ImportData):
    """Data import execution task of SingleDish.

    This class extends importdata.ImportData class, and methods execute main logics depends on it.
    """

    Inputs = SDImportDataInputs

    def prepare(self, **parameters: dict[str, Any]) -> SDImportDataResults:
        """Prepare job requests for execution.

        Args:
            parameters: the parameters to pass through from the superclass.
        Returns:
            SDImportDataResults : result object
        """
        # get results object by running super.prepare()
        results = super().prepare()

        # per MS inspection
        table_prefix = relative_path(os.path.join(self.inputs.context.name, 'MSDataTable.tbl'),
                                     self.inputs.output_dir)
        reduction_group_list = []
        direction_rsh_result_list = []
        time_rsh_result_list = []
        org_directions_dict = {}
        for ms in results.mses:
            LOG.debug('Start inspection for %s' % ms.basename)
            table_name = os.path.join(table_prefix, ms.basename)
            inspector = inspection.SDInspection(self.inputs.context, table_name, ms=ms, hm_rasterscan=self.inputs.hm_rasterscan)
            reduction_group, org_directions, msglist, direction_rsh_result, time_rsh_result = self._executor.execute(inspector, merge=False)
            reduction_group_list.append(reduction_group)
            direction_rsh_result_list.append(direction_rsh_result)
            time_rsh_result_list.append(time_rsh_result)

            # update org_directions_dict for only new keys in org_directions
            for key in org_directions:
                org_directions_dict.setdefault(key, org_directions[key])

        # create results object
        myresults = SDImportDataResults(mses=results.mses,
                                        reduction_group_list=reduction_group_list,
                                        datatable_prefix=table_prefix,
                                        setjy_results=results.setjy_results,
                                        org_directions=org_directions_dict)

        myresults.origin = results.origin
        myresults.msglist = msglist
        for rsh in direction_rsh_result_list:
            myresults.rasterscan_heuristics_results_direction.setdefault(rsh.ms.origin_ms, []).append(rsh)
        for rsh in time_rsh_result_list:
            myresults.rasterscan_heuristics_results_time.setdefault(rsh.ms.origin_ms, []).append(rsh)
        return myresults

    def _get_fluxes(self, context, observing_run):
        # override _get_fluxes not to create flux.csv (PIPE-1846)
        # do nothing, return empty results
        return None, [], None


# Tier-0 parallelization
[docs] @task_registry.set_equivalent_casa_task('hsd_importdata') @task_registry.set_casa_commands_comment('If required, ASDMs are converted to MeasurementSets.') class SDImportData(sessionutils.ParallelTemplate): """SDImportData class for parallelization.""" Inputs = SDImportDataInputs Task = SerialSDImportData