Source code for pipeline.hsd.tasks.skycal.skycal

"""The skycal task module to calibrate sky background."""
from __future__ import annotations

import collections
import copy
import os
from typing import TYPE_CHECKING

import numpy as np

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.sessionutils as sessionutils
from pipeline.infrastructure.utils import relative_path
import pipeline.infrastructure.vdp as vdp
from pipeline.h.heuristics import caltable as caltable_heuristic
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry
from pipeline.domain.datatable import OnlineFlagIndex
from ..common import SingleDishResults

if TYPE_CHECKING:
    from numpy import generic
    from numpy.typing import NDArray

    from pipeline.infrastructure.launcher import Context

LOG = infrastructure.get_logger(__name__)

# Threshold of the elevation difference of QA score
ELEVATION_DIFFERENCE_THRESHOLD = 3.0  # deg


class SDSkyCalInputs(vdp.StandardInputs):
    """Inputs class for SDSkyCal task."""
    parallel = sessionutils.parallel_inputs_impl()

    calmode = vdp.VisDependentProperty(default='auto')
    elongated = vdp.VisDependentProperty(default=False)
    field = vdp.VisDependentProperty(default='')
    fraction = vdp.VisDependentProperty(default='10%')
    noff = vdp.VisDependentProperty(default=-1)
    outfile = vdp.VisDependentProperty(default='')
    scan = vdp.VisDependentProperty(default='')
    spw = vdp.VisDependentProperty(default='')
    width = vdp.VisDependentProperty(default=0.5)

    @vdp.VisDependentProperty
    def infiles(self) -> str:
        """Return name of MS. Alias for "vis" attribute."""
        return self.vis

    @infiles.convert
    def infiles(self, value: str | list[str]) -> str | list[str]:
        """Convert value into expected type.

        Currently, no conversion is performed.

        Args:
            value: Name of MS, or the list of names.

        Returns:
            Converted value. Currently return input value as is.
        """
        self.vis = value
        return value

    # docstring and type hints: supplements hsd_skycal
    def __init__(
            self,
            context: Context,
            calmode: str | None = None,
            fraction: float | None = None,
            noff: int | None = None,
            width: float | None = None,
            elongated: bool | None = None,
            output_dir: str | None = None,
            infiles: str | None = None,
            outfile: str | None = None,
            field: str | None = None,
            spw: str | None = None,
            scan: str | None = None,
            parallel: bool | str | None = None
            ):
        """Initialize SDK2JyCalInputs instance.

        Args:
            context: Pipeline context object containing state information.

            calmode: Calibration mode.
                Available options are 'auto' (default), 'ps', 'otf', and
                'otfraster'. When 'auto' is set, the task will use preset
                calibration mode that is determined by inspecting data.
                'ps' mode is simple position switching using explicit reference
                scans. Other two modes, 'otf' and 'otfraster', will generate
                reference data from scans at the edge of the map. Those modes
                are intended for OTF observation and the former is defined for
                generic scanning pattern such as Lissajous, while the latter is
                specific use for raster scan.

                Options: ``'auto'``, ``'ps'``, ``'otf'``, ``'otfraster'``

                Default: ``None`` (equivalent to ``'auto'``)

            fraction: Sub-parameter for calmode. Edge marking parameter for
                'otf' and 'otfraster' mode. It specifies a number of OFF scans
                as a fraction of total number of data points.

                Options: String style like '20%', or float value less than 1.0.
                    For 'otfraster' mode, you can also specify 'auto'.

                Default: ``None`` (equivalent to ``'10%'``)

            noff: Sub-parameter for calmode. Edge marking parameter for
                'otfraster' mode. It is used to specify a number of OFF scans
                near edge directly instead to specify it by fractional
                number by 'fraction'. If it is set, the value will come
                before setting by 'fraction'.

                Options: any positive integer value

                Default: ``None`` (equivalent to ``''``)

            width: Sub-parameter for calmode. Edge marking parameter for
                'otf' mode. It specifies pixel width with respect to
                a median spatial separation between neighboring two data
                in time. Default will be fine in most cases.

                Options: any float value

                Default: ``None`` (equivalent to ``0.5``)

            elongated: Sub-parameter for calmode. Edge marking parameter
                for 'otf' mode. Please set True only if observed area is
                elongated in one direction.

                Default: ``None`` (equivalent to ``False``)

            output_dir: Name of output directory.

            infiles: List of data files. These must be a name of
                MeasurementSets that are registered to context
                via hsd_importdata or hsd_restoredata task.

                Example: ``vis=['X227.ms', 'X228.ms']``

            outfile: Name of the output file.

            field: Data selection by field name.

            spw: Data selection by spw.

                Example: ``'3,4'`` (generate caltable for spw 3 and 4), ``['0','2']`` (spw 0 for first data, 2 for second)

                Default: ``None`` (process all science spws)

            scan: Data selection by scan number. (default all scans)

                Example: ``'22,23'`` (use scan 22 and 23 for calibration), ``['22','24']`` (scan 22 for first data, 24 for second)

                Default: None (process all scans)

            parallel: Execute using CASA HPC functionality, if available.

                Options: ``'automatic'``, ``'true'``, ``'false'``, ``True``, ``False``

                Default: ``None`` (equivalent to ``'automatic'``)
        """
        super().__init__()

        # context and vis must be set first so that properties that require
        # domain objects can be function
        self.context = context
        self.infiles = infiles
        self.output_dir = output_dir
        self.outfile = outfile

        self.calmode = calmode
        self.fraction = fraction
        self.noff = noff
        self.width = width
        self.elongated = elongated

        self.field = field
        self.spw = spw
        self.scan = scan

        self.parallel = parallel

    def to_casa_args(self) -> dict:
        """Convert Inputs instance to the list of keyword arguments for sdcal.

        Returns:
            Keyword arguments for sdcal.
        """
        args = super().to_casa_args()

        # overwrite is always True
        args['overwrite'] = True

        # parameter name for input data is 'infile'
        args['infile'] = args.pop('infiles')

        # vis and parallel are not necessary
        del args['vis']
        del args['parallel']

        return args


class SDSkyCalResults(SingleDishResults):
    """Class to hold processing result of SDSkyCal task."""

    def __init__(
            self,
            task: str | None = None,
            success: bool | None = None,
            outcome: str | None = None
            ) -> None:
        """Initialize SDSkyCalResults instance.

        Args:
            task: Name of task.
            success: A boolean to indicate if the task execution was successful
            (True) or not (False).
            outcome: Outcome of the task.
        """
        super().__init__(task, success, outcome)
        self.final = [calapp for calapp, is_calibratable in self.outcome if is_calibratable]

    def merge_with_context(self, context: Context) -> None:
        """Merge result instance into context.

        The CalApplication instance updated by the skycal task is added to
        the pipeline context.

        Args:
            context: Pipeline context object containing state information.
        """
        super().merge_with_context(context)

        if self.outcome is None:
            return

        for calapp, is_calibratable in self.outcome:
            if is_calibratable:
                context.callibrary.add(calapp.calto, calapp.calfrom)
            else:
                continue

    def _outcome_name(self) -> str:
        """Return string representing the outcome.

        Returns:
            string of outcome.
        """
        return str(self.outcome)


class SerialSDSkyCal(basetask.StandardTaskTemplate):
    """Generate sky calibration table."""

    Inputs = SDSkyCalInputs

    def prepare(self) -> SDSkyCalResults:
        """Prepare arguments for CASA job and execute it.

        Returns:
           SDSkyCalResults object.
        """
        args = self.inputs.to_casa_args()
        LOG.trace('args: {}'.format(args))

        # retrieve ms domain object
        ms = self.inputs.ms
        calibration_strategy = ms.calibration_strategy
        default_field_strategy = calibration_strategy['field_strategy']

        # take calmode from calibration strategy if it is set to 'auto'
        if args['calmode'] is None or args['calmode'].lower() == 'auto':
            args['calmode'] = calibration_strategy['calmode']

        assert args['calmode'] in ['ps', 'otfraster', 'otf']

        # spw selection ---> task.prepare
        if args['spw'] is None or len(args['spw']) == 0:
            spw_list = ms.get_spectral_windows(science_windows_only=True)
            args['spw'] = ','.join(map(str, [spw.id for spw in spw_list]))

        # field selection ---> task.prepare
        if args['field'] is None or len(args['field']) == 0:
            field_strategy = default_field_strategy
        else:
            field_strategy = {}
            field_ids = casa_tools.ms.msseltoindex(vis=ms.name, field=args['field'])
            for field_id in field_ids:
                for target_id, reference_id in default_field_strategy.items():
                    if field_id == target_id:
                        field_strategy[field_id] = default_field_strategy[field_id]
                        continue
                    elif field_id == reference_id:
                        field_strategy[target_id] = field_id
                        continue

        # scan selection
        if args['scan'] is None:
            args['scan'] = ''

        # intent
        if args['calmode'] in ['otf', 'otfraster']:
            args['intent'] = 'OBSERVE_TARGET#ON_SOURCE'

        # output file
        if args["outfile"] is None or len(args["outfile"]) == 0:
            namer = caltable_heuristic.SDSkyCaltable()
            # we temporarily need 'vis'
            myargs = copy.deepcopy(args)
            myargs['vis'] = args['infile']

            full_path = namer.calculate(
                output_dir=self.inputs.output_dir,
                stage=self.inputs.context.stage,
                **myargs
            )

            args['outfile'] = relative_path(full_path)

        # field
        args["field"] = ",".join(sorted(map(str, field_strategy.values())))

        LOG.debug(f'args for sdcal: {args}')

        # create job
        job = casa_tasks.sdcal(**args)

        # execute job
        try:
            self._executor.execute(job)
        except Exception as e:
            LOG.warning(f"Error occurred during sdcal execution: {e}")

        has_caltable = os.path.exists(args['outfile'])

        # make a note of the current inputs state before we start fiddling
        # with it. This origin will be attached to the final CalApplication.
        origin = callibrary.CalAppOrigin(task=SerialSDSkyCal,
                                         inputs=args)

        calapps_with_status = []

        for target_id, reference_id in field_strategy.items():
            # check if caltable is empty
            if has_caltable:
                with casa_tools.TableReader(args['outfile']) as tb:
                    taql = f"FIELD_ID=={reference_id}"
                    try:
                        selected = tb.query(taql)
                        nrows = selected.nrows()
                        selected.close()
                    except Exception as e:
                        nrows = 0
                    is_calibratable = nrows > 0
            else:
                is_calibratable = False

            if not is_calibratable:
                _infile = os.path.basename(args['infile'])
                LOG.warning(
                    "No calibration solution found for "
                    f"MS {os.path.basename(args['outfile'])}, "
                    f"field {reference_id}. "
                    f"Corresponding data in {_infile} "
                    "should be excluded from the processing."
                )

            calto = callibrary.CalTo(vis=args['infile'],
                                     spw=args['spw'],
                                     field=str(target_id),
                                     intent='TARGET')

            # create SDCalFrom object
            calfrom = callibrary.CalFrom(gaintable=args['outfile'],
                                         gainfield=str(reference_id),
                                         interp='linear,linear',
                                         caltype=args['calmode'])

            # create CalApplication object
            calapp = callibrary.CalApplication(calto, calfrom, origin)
            calapps_with_status.append((calapp, is_calibratable))

        results = SDSkyCalResults(task=self.__class__,
                                  success=True,
                                  outcome=calapps_with_status)
        return results

    def analyse(self, result: SDSkyCalResults) -> SDSkyCalResults:
        """Analyse SDSkyCalResults instance produced by prepare.

        Args:
            result: SDSkyCalResults instance.

        Returns:
            Updated SDSkyCalResults instance.
        """
        return result


[docs] @task_registry.set_equivalent_casa_task('hsd_skycal') @task_registry.set_casa_commands_comment('Generates sky calibration table according to calibration strategy.') class SDSkyCal(sessionutils.ParallelTemplate): """Class to generate sky calibration table.""" Inputs = SDSkyCalInputs Task = SerialSDSkyCal
def get_elevation( datatable_name: str, spw_id: int | str, antenna_id: int | str, field_id: int | str, on_source: bool ) -> dict[str, NDArray[generic]]: """Get elevation and associated time and flag from datatable. Args: datatable_name: Name of the datatable. spw_id: Spectral window ID. antenna_id: Antenna ID. field_id: Field ID. on_source: If True, get elevation for on-source data, otherwise for off-source data. Returns: Dictionary with time and elevation. - time: Array of time in seconds. - el: Array of elevation in radians. - online_flag: Array of online flags (False for valid, True for invalid data). """ ro_datatable_name = os.path.join(datatable_name, 'RO') rw_datatable_name = os.path.join(datatable_name, 'RW') with casa_tools.TableReader(ro_datatable_name) as tb: taql = f'IF=={spw_id}&&ANTENNA=={antenna_id}&&FIELD_ID=={field_id}' if on_source: taql += '&&SRCTYPE==0' else: taql += '&&SRCTYPE!=0' selected = tb.query(taql) if selected.nrows() == 0: selected.close() return { 'time': np.array([], dtype=float), 'el': np.array([], dtype=float), 'online_flag': np.array([], dtype=bool) } npol = selected.getcell('NPOL', 0) time = selected.getcol('TIME') el = selected.getcol('EL') rows = selected.rownumbers() selected.close() with casa_tools.TableReader(rw_datatable_name) as tb: it = ( np.all(tb.getcellslice( 'FLAG_PERMANENT', i, blc=[0, OnlineFlagIndex], trc=[npol - 1, OnlineFlagIndex], incr=[1, 1] ) != 1) for i in rows ) online_flag = np.fromiter(it, dtype=bool) return {'time': time, 'el': el, 'online_flag': online_flag} def compute_elevation_difference(context: Context, results: SDSkyCalResults) -> dict: """Compute elevation difference. Args: context: Pipeline context object containing state information. results: SDSkyCalResults instance. Returns: dictionary[field_id][antenna_id][spw_id] Value of the dictionary should be ElevationDifference and the value should contain the result from one MS (given that SDSkyCal is per-MS task). """ ElevationDifference = collections.namedtuple('ElevationDifference', ['timeon', 'elon', 'flagon', 'timecal', 'elcal', 'time0', 'eldiff0', 'time1', 'eldiff1']) if not isinstance(results, SDSkyCalResults): raise TypeError('Results type should be SDSkyCalResults') calapps = results.final resultdict = {} for calapp in calapps: calto = calapp.calto vis = calto.vis ms = context.observing_run.get_ms(vis) target_field = calto.field if target_field.isdigit(): field_id_on = int(target_field) else: fields = ms.get_fields(name=target_field) assert len(fields) > 0 field_id_on = fields[0].id antenna_ids = [ant.id for ant in ms.antennas] science_spw = ms.get_spectral_windows(science_windows_only=True) calfroms = calapp.calfrom for calfrom in calfroms: caltable = calfrom.gaintable # FIELD_ID gainfield = calfrom.gainfield if gainfield.isdigit(): field_id_off = int(gainfield) else: fields = ms.get_fields(name=gainfield) assert len(fields) > 0 field_id_off = fields[0].id LOG.info('Computing elevation difference for "{}" Field ID {} (ON) and {} (OFF)' ''.format(ms.basename, field_id_on, field_id_off)) resultfield = {} for antenna_id in antenna_ids: resultant = {} for spw in science_spw: spw_id = spw.id # get timestamp from caltable with casa_tools.TableReader(caltable) as tb: selected = tb.query("&&".join([ f'FIELD_ID=={field_id_off}', f'SPECTRAL_WINDOW_ID=={spw_id}', f'ANTENNA1=={antenna_id}' ])) timecal = selected.getcol('TIME') / 86400.0 # sec -> day selected.close() if len(timecal) == 0: LOG.info( f"No calibration data for field {field_id_off}, " f"spw {spw_id}, antenna {antenna_id}. " ) continue # access DataTable to get elevation datatable_name = os.path.join( context.observing_run.ms_datatable_name, os.path.basename(ms.origin_ms) ) data_on = get_elevation( datatable_name, spw_id, antenna_id, field_id_on, on_source=True ) timeon = data_on['time'] elon = data_on['el'] flagon = data_on['online_flag'] data_off = get_elevation( datatable_name, spw_id, antenna_id, field_id_off, on_source=False ) timeoff = data_off['time'] eloff = data_off['el'] flagoff = data_off['online_flag'] eloff_valid = eloff[np.logical_not(flagoff)] timeoff_valid = timeoff[np.logical_not(flagoff)] if len(timeoff_valid) == 0: LOG.info( f"No valid off-source data for field {field_id_off}, " f"spw {spw_id}, antenna {antenna_id}. " ) continue elcal = eloff_valid[ [np.argmin(np.abs(timeoff_valid - t)) for t in timecal] ] eldiff0 = [] eldiff1 = [] time0 = [] time1 = [] for t, el, flg in zip(timeon, elon, flagon): # do not process flagged data if flg: continue dt = timecal - t idx0 = np.where(dt < 0)[0] if len(idx0) > 0: i = np.argmax(timecal[idx0]) time0.append(t) eldiff0.append(el - elcal[idx0[i]]) idx1 = np.where(dt >= 0)[0] if len(idx1) > 0: i = np.argmin(timecal[idx1]) time1.append(t) eldiff1.append(el - elcal[idx1[i]]) eldiff0 = np.asarray(eldiff0) eldiff1 = np.asarray(eldiff1) time0 = np.asarray(time0) time1 = np.asarray(time1) result = ElevationDifference(timeon=timeon, elon=elon, flagon=flagon, timecal=timecal, elcal=elcal, time0=time0, eldiff0=eldiff0, time1=time1, eldiff1=eldiff1) resultant[spw_id] = result resultfield[antenna_id] = resultant resultdict[field_id_on] = resultfield return resultdict