Source code for pipeline.hifa.tasks.lock_refant.lock_refant

from __future__ import annotations

from typing import TYPE_CHECKING

import os
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import task_registry

if TYPE_CHECKING:
    from pipeline.infrastructure.launcher import Context

__all__ = [
    'LockRefAnt',
    'LockRefAntInputs',
    'LockRefAntResults',
]

LOG = infrastructure.logging.get_logger(__name__)


class LockRefAntInputs(vdp.StandardInputs):
    # PIPE-3016 the default use case of lock_refant is within the polcal
    # recipe prior to second loop of calibration stages. Before that second loop,
    # the spwphaseup caltables need to be unregistered. This will be
    # unregistered by default (i.e. unregister_spwphaseup = True).
    unregister_spwphaseup = vdp.VisDependentProperty(default=True)

    def to_casa_args(self):
        # refant does not use CASA tasks
        raise NotImplementedError

    # docstring and type hints: supplements hifa_lock_refant
    def __init__(self, context, output_dir=None, vis=None, unregister_spwphaseup=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            vis: List of input MeasurementSets. Defaults to the list of
                MeasurementSets specified in the pipeline context.

                Example: ``vis=['ngc5921.ms']``

            output_dir: Output directory.
                Defaults to None, which corresponds to the current working directory.

            unregister_spwphaseup: Boolean option to remove any caltable created by
                any hifa_spwphaseup stage run prior to lock_refant.
                In the current Pipeline use case, hifa_spwphaseup makes phase
                offset tables (per spectral spec) to align spws.
                Defaults to True

        """
        self.context = context
        self.vis = vis
        self.output_dir = output_dir
        self.unregister_spwphaseup = unregister_spwphaseup

        
class LockRefAntResults(basetask.Results):
    def __init__(self, vis: str, unregister_spwphaseup: bool):
        super().__init__()
        self._vis = vis
        self.unregister_spwphaseup = unregister_spwphaseup

    def merge_with_context(self, context: Context):
        if self._vis is None:
            LOG.error('No results to merge')
            return

        ms = context.observing_run.get_ms(name=self._vis)
        if ms:
            LOG.debug('Locking refant for %s', ms.basename)
            ms.reference_antenna_locked = True

        # PIPE-3016: if requested (True by default), unregister previous
        # hifa_spwphaseup caltables from the context upon completion of
        # the hifa_lock_refant stage.
        if self.unregister_spwphaseup:
            # Identify the MS to process
            ms_basename = os.path.basename(str(self._vis))

            # predicate function that triggers when the spwphaseup caltable is
            # detected for this MS
            def spwphaseup_matcher(calto: callibrary.CalToArgs, calfrom: callibrary.CalFrom) -> bool:
                calto_vis = {os.path.basename(v) for v in calto.vis}
                do_delete = 'hifa_spwphaseup' in calfrom.gaintable and ms_basename in calto_vis
                if do_delete:
                    LOG.info('Unregistering previous spwphaseup offset table for %s', ms_basename)
                return do_delete

            context.callibrary.unregister_calibrations(spwphaseup_matcher)


    def __str__(self):
        return 'Lock reference antenna results: refant list locked'

    def __repr__(self):
        return f'LockRefAntResults({self._vis})'


[docs] @task_registry.set_equivalent_casa_task('hifa_lock_refant') @task_registry.set_casa_commands_comment( 'The reference antenna list for all measurement sets is locked to prevent further modification' ) class LockRefAnt(basetask.StandardTaskTemplate): Inputs = LockRefAntInputs
[docs] def prepare(self, **parameters): return LockRefAntResults(self.inputs.vis, self.inputs.unregister_spwphaseup)
[docs] def analyse(self, results): return results