Source code for pipeline.hifa.tasks.wvrgcal.wvrgcal

from __future__ import annotations

import collections
import os
import shutil
from typing import TYPE_CHECKING

import numpy as np

import pipeline.domain.measures as measures
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.vdp as vdp
from pipeline.h.heuristics import caltable as caltable_heuristic
from pipeline.h.tasks.common import commonhelpermethods
from pipeline.hif.tasks import gaincal
from pipeline.hifa.heuristics import atm as atm_heuristic
from pipeline.hifa.heuristics import wvrgcal as wvrgcal_heuristic
from pipeline.hifa.tasks import bandpass
from pipeline.infrastructure import casa_tasks, task_registry
from . import resultobjects
from . import wvrg_qa

if TYPE_CHECKING:
    from collections.abc import Callable

    from pipeline.hif.tasks.bandpass.common import BandpassResults
    from pipeline.hif.tasks.gaincal.common import GaincalResults

__all__ = [
    'Wvrgcal',
    'WvrgcalInputs'
]

LOG = infrastructure.logging.get_logger(__name__)

WVRInfo = collections.namedtuple('WVRInfo',
                                 'antenna wvr flag rms disc')


class WvrgcalInputs(vdp.StandardInputs):
    """
    WvrgcalInputs defines the inputs for the Wvrgcal pipeline task.
    """
    accept_threshold = vdp.VisDependentProperty(default=1.0)
    bandpass_result = vdp.VisDependentProperty(default='', hidden=True)
    disperse = vdp.VisDependentProperty(default=False)
    hm_smooth = vdp.VisDependentProperty(default='automatic')
    hm_tie = vdp.VisDependentProperty(default='automatic')
    hm_toffset = vdp.VisDependentProperty(default='automatic')

    @vdp.VisDependentProperty(null_input=[None, '', -1, -1.0])
    def maxdistm(self):
        # Identify fraction of antennas that are 7m.
        nants_7m = len([ant
                        for ant in self.ms.antennas
                        if ant.diameter == 7.0])
        frac = float(nants_7m) / len(self.ms.antennas)

        # If more than 50% of the antennas are 7m, then set maxdistm to a
        # lower value.
        if frac > 0.5:
            return 100.0
        else:
            return 500.0

    minnumants = vdp.VisDependentProperty(default=2)
    mingoodfrac = vdp.VisDependentProperty(default=0.8)
    nowvr_result = vdp.VisDependentProperty(default='', hidden=True)
    nsol = vdp.VisDependentProperty(default=1)
    # Default for offsetstable is blank, which means the wvrgcal task will not
    # apply any offsets.
    offsetstable = vdp.VisDependentProperty(default='')
    refant = vdp.VisDependentProperty(default='')
    # Default for qa_bandpass_intent is blank, which allows the bandpass task
    # to select a sensible default if the dataset lacks data with BANDPASS
    # intent.
    qa_bandpass_intent = vdp.VisDependentProperty(default='')
    qa_intent = vdp.VisDependentProperty(default='')
    qa_spw = vdp.VisDependentProperty(default='', hidden=True)
    scale = vdp.VisDependentProperty(default=1.0)
    segsource = vdp.VisDependentProperty(default=True)
    smooth = vdp.VisDependentProperty(default='')

    @vdp.VisDependentProperty
    def sourceflag(self):
        return []

    @vdp.VisDependentProperty
    def tie(self):
        return []

    toffset = vdp.VisDependentProperty(default=0)

    @vdp.VisDependentProperty
    def wvrflag(self):
        return []

    @wvrflag.convert
    def wvrflag(self, value):
        if isinstance(value, str):
            if value.startswith('['):
                # Remove the characters [, ], and ' from the value.
                value = value.translate(str.maketrans("[]'"))
            return value.split(',')
        else:
            return value

    # docstring and type hints: supplements hifa_wvrgcal
    def __init__(self, context, output_dir=None, vis=None, caltable=None, offsetstable=None, hm_toffset=None,
                 toffset=None, segsource=None, hm_tie=None, tie=None, sourceflag=None, nsol=None, disperse=None,
                 wvrflag=None, hm_smooth=None, smooth=None, scale=None, maxdistm=None, minnumants=None,
                 mingoodfrac=None, refant=None, qa_intent=None, qa_bandpass_intent=None, qa_spw=None,
                 accept_threshold=None, bandpass_result=None, nowvr_result=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            output_dir: Output directory.
                Defaults to ``None``, which corresponds to the current working directory.

            vis: List of input visibility files.
                Default: ``None``, in which case the vis files to be used
                will be read from the context.

                Example: ``vis=['ngc5921.ms']``

            caltable: List of output gain calibration tables.
                Default: none, in which case the names of the caltables
                will be generated automatically.

                Example: ``caltable='ngc5921.wvr'``

            offsetstable: List of input temperature offsets table files to subtract
                from WVR measurements before calculating phase corrections.
                Default: none, in which case no offsets are applied.

                Example: ``offsetstable=['ngc5921.cloud_offsets']``

            hm_toffset: If 'manual', set the ``toffset`` parameter to the user-specified value.
                If 'automatic', set the ``toffset`` parameter according to the
                date of the MeasurementSet; ``toffset=-1`` if before 2013-01-21T00:00:00
                ``toffset=0`` otherwise.

            toffset: Time offset (sec) between interferometric and WVR data.

            segsource: If True calculate new atmospheric phase correction coefficients
                for each source, subject to the constraints of the ``tie`` parameter.
                'segsource' is forced to be True if the ``tie`` parameter is set to a
                non-empty value by the user or by the automatic heuristic.

            hm_tie: If 'manual', set the ``tie`` parameter to the
                user-specified value. If 'automatic', set the ``tie``
                parameter to include with the target all calibrators
                that are within 15 degrees of it: if no calibrators are
                that close then ``tie`` is left empty.

            tie: Use the same atmospheric phase correction coefficients
                when calculating the WVR correction for all sources in
                the ``tie``. If ``tie`` is not empty then ``segsource``
                is forced to be True. Ignored unless ``hm_tie`` = 'manual'.

                Example: ``tie=['3C273,NGC253', 'IC433,3C279']``

            sourceflag: Flag the WVR data for these source(s) as bad and do not
                produce corrections for it. Requires ``segsource`` = True.

                Example: ``['3C273']``

            nsol: Number of solutions for phase correction coefficients during this
                observation, evenly distributed in time throughout the observation. It
                is used only if ``segsource`` = False because if ``segsource`` = True then the
                coefficients are recomputed whenever the telescope moves to a new
                source (within the limits imposed by ``tie``).

            disperse: Apply correction for dispersion. (Deprecated; will be removed)

            wvrflag: Flag the WVR data for the listed antennas as bad and replace
                their data with values interpolated from the 3 nearest antennas with
                unflagged data.

                Example: ``['DV03','DA05','PM02']``

            hm_smooth: If 'manual' set the ``smooth`` parameter to the user-specified value.
                If 'automatic', run the wvrgcal task with the range of ``smooth`` parameters
                required to match the integration time of the wvr data to that of the
                interferometric data in each spectral window.

            smooth: Smooth WVR data on this timescale before calculating the correction.
                Ignored unless hm_smooth='manual'.

            scale: Scale the entire phase correction by this factor.

            maxdistm: Maximum distance in meters of an antenna used for interpolation
                from a flagged antenna.
                Default: -1  (automatically set to 100m if >50% of
                antennas are 7m antennas without WVR and otherwise set to
                500m).

                Example: ``maxdistm=550``

            minnumants: Minimum number of nearby antennas (up to 3) used for
                interpolation from a flagged antenna.

                Example: ``minnumants=3``

            mingoodfrac: Minimum fraction of good data per antenna.

            refant: Ranked comma delimited list of reference antennas.

                Example: ``refant='DV01,DV02'``

            qa_intent: The list of data intents on which the wvr correction is to be
                tried as a means of estimating its effectiveness.
                A QA 'view' will be calculated for each specified intent, in each spectral
                window in each vis file.
                Each QA 'view' will consist of a pair of 2-d images with dimensions
                ['ANTENNA', 'TIME'], one showing the data phase-noise before the
                wvr application, the second showing the phase noise after (both 'before'
                and 'after' images have a bandpass calibration applied as well).
                An overall QA score is calculated for each vis file, by dividing the
                'before' images by the 'after' and taking the median of the result. An
                overall score of 1 would correspond to no change in the phase noise,
                a score > 1 implies an improvement.
                If the overall score for a vis file is less than the value in
                'accept_threshold' then the wvr calibration file is not made available
                for merging into the context for use in the subsequent reduction.
                If you do not want any QA calculations then set qa_intent=''.

                Example: ``qa_intent='PHASE'``

            qa_bandpass_intent: The data intent to use for the bandpass calibration in
                the qa calculation. The default is blank to allow the underlying bandpass
                task to select a sensible intent if the dataset lacks BANDPASS data.

            qa_spw: The SpW(s) to use for the qa calculation, in the order that they
                should be tried. Input as a comma-separated list. The default is blank, in
                which case the task will try SpWs in order of decreasing median sky
                opacity.

            accept_threshold: The phase-rms improvement ratio
                (rms without wvr / rms with wvr) above which the wrvg file will be
                accepted into the context for subsequent application.

            bandpass_result:

            nowvr_result:

        """
        super().__init__()

        # pipeline inputs
        self.context = context
        # vis must be set first, as other properties may depend on it
        self.vis = vis
        self.output_dir = output_dir

        # data selection arguments
        self.caltable = caltable
        self.offsetstable = offsetstable

        # solution parameters
        self.hm_toffset = hm_toffset
        self.toffset = toffset
        self.segsource = segsource
        self.hm_tie = hm_tie
        self.tie = tie
        self.sourceflag = sourceflag
        self.nsol = nsol
        self.disperse = disperse
        self.wvrflag = wvrflag
        self.hm_smooth = hm_smooth
        self.smooth = smooth
        self.scale = scale
        self.maxdistm = maxdistm
        self.minnumants = minnumants
        self.mingoodfrac = mingoodfrac
        self.refant = refant
        self.qa_intent = qa_intent
        self.qa_bandpass_intent = qa_bandpass_intent
        self.qa_spw = qa_spw
        self.accept_threshold = accept_threshold
        self.bandpass_result = bandpass_result
        self.nowvr_result = nowvr_result


[docs] @task_registry.set_equivalent_casa_task('hifa_wvrgcal') class Wvrgcal(basetask.StandardTaskTemplate): Inputs = WvrgcalInputs def __init__(self, inputs): super().__init__(inputs)
[docs] def prepare(self): inputs = self.inputs result = resultobjects.WvrgcalResult(vis=inputs.vis) jobs = [] # get parameters that can be set from outside or which will be derived # from heuristics otherwise wvrheuristics = wvrgcal_heuristic.WvrgcalHeuristics( context=inputs.context, vis=inputs.vis, hm_tie=inputs.hm_tie, tie=inputs.tie, hm_smooth=inputs.hm_smooth, smooth=inputs.smooth, sourceflag=inputs.sourceflag, nsol=inputs.nsol, segsource=inputs.segsource) # return an empty results object if no WVR data available if not wvrheuristics.wvr_available(): # only 12m antennas are expected to have WVRs fitted if any([a for a in inputs.ms.antennas if a.diameter == 12.0]): LOG.error('WVR data expected but not found in %s' '' % os.path.basename(inputs.vis)) return result elif len([a for a in inputs.ms.antennas if a.diameter == 12.0]) <= 1: # WVR data available for only 1 antenna LOG.error('WVR data available for only 1 antenna in %s' '' % os.path.basename(inputs.vis)) return result if inputs.hm_toffset == 'automatic': toffset = wvrheuristics.toffset() else: toffset = inputs.toffset if inputs.segsource is None: segsource = wvrheuristics.segsource() else: segsource = inputs.segsource if inputs.hm_tie == 'automatic': tie = wvrheuristics.tie() else: tie = inputs.tie # add tie to results object for display in the weblog. Tie is not spw # dependent, so we can add it early. result.tie = tie if inputs.sourceflag is None: sourceflag = wvrheuristics.sourceflag() else: sourceflag = inputs.sourceflag if inputs.nsol is None: nsol = wvrheuristics.nsol() else: nsol = inputs.nsol # get parameters that must be set from outside disperse = inputs.disperse wvrflag = inputs.wvrflag scale = inputs.scale maxdistm = inputs.maxdistm minnumants = inputs.minnumants mingoodfrac = inputs.mingoodfrac ms = inputs.context.observing_run.get_ms(name=inputs.vis) # Set the reference antenna refant = inputs.refant if refant == '': refant = ms.reference_antenna if not (refant and refant.strip()): msg = ('No reference antenna specified and none found in ' 'context for %s' % ms.basename) LOG.error(msg) refant = [] else: refant = refant.split(',') # smooth may vary with spectral window so need to ensure we calculate # results that can cover them all science_spws = ms.get_spectral_windows(science_windows_only=True) science_spwids = [spw.id for spw in science_spws] wvr_spwids = sorted([spw.id for spw in ms.spectral_windows if spw.num_channels == 4 and 'WVR' in spw.intents]) smooths_done = set() callist = [] caltables = [] for spw in science_spwids: if inputs.hm_smooth == 'automatic': # smooth = wvrheuristics.smooth(spw) # Force the smooth heuristics to a single value # (If integration times vary between spws this may not be # the right thing to do). Integration times varying between # intents within an spw then it is right to smooth according # to the longest. smooth = wvrheuristics.smoothall(science_spwids) else: smooth = inputs.smooth # prepare to run the wvrgcal task if necessary caltable = caltable_heuristic.WvrgCaltable() caltable = caltable(output_dir=inputs.output_dir, stage=inputs.context.stage, vis=inputs.vis, smooth=smooth) if smooth not in smooths_done: # different caltable for each smoothing, remove old versions shutil.rmtree(caltable, ignore_errors=True) task = casa_tasks.wvrgcal(vis=inputs.vis, caltable=caltable, offsetstable=inputs.offsetstable, toffset=toffset, segsource=segsource, tie=tie, sourceflag=sourceflag, nsol=nsol, disperse=disperse, wvrflag=wvrflag, smooth=smooth, scale=scale, maxdistm=maxdistm, minnumants=minnumants, mingoodfrac=mingoodfrac, spw=science_spwids, wvrspw=[wvr_spwids[0]], refant=refant) jobs.append(task) smooths_done.add(smooth) # add this wvrg table to the callibrary for this spw calto = callibrary.CalTo(vis=inputs.vis, spw=spw) calfrom = callibrary.CalFrom(caltable, caltype='wvr', spwmap=[], interp='nearest', calwt=False) calapp = callibrary.CalApplication(calto, calfrom) callist.append(calapp) caltables.append(caltable) # execute the jobs for job in jobs: job_result = self._executor.execute(job) if job_result['success']: # extract flags found by CASA job job_name = np.array(job_result['Name']) job_flag = np.array(job_result['Flag']) job_wvrflag = set(job_name[job_flag]) else: LOG.warning('CASA wvrgcal job terminated unexpectedly with ' 'exit code %s; no flags generated.' % (job_result['rval'])) job_wvrflag = set([]) input_wvrflag = set(wvrflag) generated_wvrflag = job_wvrflag.difference(input_wvrflag) if generated_wvrflag: LOG.warning('%s wvrgcal has flagged antennas: %s' % ( os.path.basename(inputs.vis), list(generated_wvrflag))) wvrflag_set = set(result.wvrflag) wvrflag_set.update(job_wvrflag) result.wvrflag = list(wvrflag_set) result.wvr_infos = self._get_wvrinfos(job_result) LOG.info('wvrgcal complete') # removed any 'unsmoothed' wvrcal tables generated by the wvrgcal jobs for caltable in caltables: shutil.rmtree('%s_unsmoothed' % caltable, ignore_errors=True) result.pool[:] = callist return result
[docs] def analyse(self, result): inputs = self.inputs # check that the caltable was actually generated on_disk = [ca for ca in result.pool if ca.exists()] missing = [ca for ca in result.pool if ca not in on_disk] result.error.clear() result.error.update(missing) # wvrcal files to be applied result.final[:] = on_disk # If no QA intent was provided, then return early. Otherwise, continue # with computing the QA results. qa_intent = inputs.qa_intent.strip() if not qa_intent: return result # Return early if no WVR caltable was created. if not result.final: return result # Determine the list of SpWs for which QA results are needed. If no QA # SpW order was specified explicitly, then calculate an order here based # on SpW bandwidth and Tsys; otherwise use the specified order. if inputs.qa_spw == '': # PIPE-2056: if the observing mode is band-to-band, then restrict # the list of QA SpWs to only the diffgain on-source SpWs, otherwise # use all science SpWs. if inputs.ms.is_band_to_band: spws = inputs.ms.get_spectral_windows(intent='DIFFGAINSRC') else: spws = inputs.ms.get_spectral_windows(science_windows_only=True) # Initialize atmosphere heuristics. atmheuristics = atm_heuristic.AtmHeuristics(context=inputs.context, vis=inputs.vis, spw=spws) # Create a ranked list preferably by Tsys and SpW bandwidth. qa_spw_list = atmheuristics.spwid_rank_by_tsys_and_bandwidth(qa_intent) # If ranking by Tsys failed (e.g. no Tsys table, or due to # flagging), then fall back to ranking by opacity and bandwidth: if qa_spw_list is None: LOG.info(f"qa: ranking spws by bandwidth and Tsys failed for {inputs.ms.basename}; will rank by" f" bandwidth and opacity instead.") qa_spw_list = atmheuristics.spwid_rank_by_opacity_and_bandwidth() else: qa_spw_list = inputs.qa_spw.split(',') for qa_spw in qa_spw_list: LOG.info(f'qa: {inputs.ms.basename} attempting to calculate wvrgcal QA using spw {qa_spw}') inputs.qa_spw = qa_spw # Do a bandpass calibration LOG.info('qa: calculating bandpass calibration') bp_result = self._do_qa_bandpass(inputs) # Do a gain calibration if not bp_result.final: LOG.warning('qa: calculating phase calibration without bandpass applied') else: LOG.info('qa: calculating phase calibration with bandpass applied') nowvr_result = self._do_nowvr_gaincal(inputs) if not nowvr_result.final: continue inputs.bandpass_result = bp_result inputs.nowvr_result = nowvr_result result.qa_wvr.bandpass_result = bp_result result.qa_wvr.nowvr_result = nowvr_result result.qa_wvr.qa_spw = qa_spw LOG.info('qa: wvrgcal QA calculation was successful') break # Accept the result object into the local copy of the context, thus # adding the WVR table to the callibrary of this local copy. # PIPE-1058: accepting the result requires a stage number. Normally, # this would be added by the task execution infrastructure. In this # the result of the currently-still-running task is being accepted # into a local context already, before the result has been finalised, # so need to explicitly set stage number now. LOG.debug('qa: accept WVR results into copy of context') result.stage_number = inputs.context.task_counter result.accept(inputs.context) # do a phase calibration on the bandpass and phase calibrators, now # with bandpas *and* wvr preapplied. if not bp_result.final: LOG.warning('qa: calculating phase calibration with wvr applied') else: LOG.info('qa: calculating phase calibration with bandpass and wvr applied') wvr_result = self._do_wvr_gaincal(inputs) nowvr_caltable = nowvr_result.inputs['caltable'] wvr_caltable = wvr_result.inputs['caltable'] result.qa_wvr.gaintable_wvr = wvr_caltable # Edited for PIPE-1837 adding BPgood and PHgood LOG.info('qa: calculate ratio with-WVR phase RMS / without-WVR phase rms') PHnoisy, BPnoisy, PHgood, BPgood = wvrg_qa.calculate_view(inputs.context, nowvr_caltable, wvr_caltable, result.qa_wvr, qa_intent) result.PHnoisy = PHnoisy result.BPnoisy = BPnoisy result.PHgood = PHgood result.BPgood = BPgood suggest_remcloud = wvrg_qa.calculate_qa_numbers(result.qa_wvr, result.wvr_infos, PHnoisy, BPnoisy) result.suggest_remcloud = suggest_remcloud # PIPE-846: report improvement factors. self._report_wvr_improvement(result) # if the qa score indicates that applying the wvrg file will # make things worse then remove it from the results so that # it cannot be accepted into the context. if result.qa_wvr.overall_score < inputs.accept_threshold: LOG.warning('wvrgcal has qa score (%s) below accept_threshold ' '(%s) and will not be applied' % (result.qa_wvr.overall_score, inputs.accept_threshold)) result.final[:] = [] return result
def _report_wvr_improvement(self, result: resultobjects.WvrgcalResult): """ Report the improvement factors from QA data views to the log (PIPE-846). """ # Get translation from antenna IDs to names. ms = self.inputs.context.observing_run.get_ms(name=self.inputs.vis) ant_names, _ = commonhelpermethods.get_antenna_names(ms) # Report improvement values for each view: for description in result.qa_wvr.descriptions(): # Get list of antennas and timestamps from data view. qa_result = result.qa_wvr.last(description) antids = qa_result.axes[0].data timestamps = qa_result.axes[1].data # For each antenna, and then each timestamp, report the improvement # factor, and whether that entry was flagged. LOG.info(f"Phase improvement ratios for {result.vis}, intent {qa_result.intent}, SpW {qa_result.spw}:") for xid, antid in enumerate(antids): for yid, timestamp in enumerate(timestamps): LOG.info(f"Ant #{antid} ({ant_names[antid]}), time {timestamp}: {qa_result.data[xid, yid]:.2f}" f" {'(flagged)' if qa_result.flag[xid, yid] else ''}") def _do_qa_bandpass(self, inputs: WvrgcalInputs) -> BandpassResults: """ Create a bandpass caltable for QA analysis, returning the result of the worker bandpass task. If a suitable bandpass caltable already exists, it will be reused. """ if inputs.bandpass_result: # table already exists use it LOG.info('Reusing B calibration result:\n%s' % inputs.bandpass_result) return self._do_user_qa_bandpass(inputs) else: LOG.info('Calculating new bandpass for QA analysis') result = self._do_new_qa_bandpass(inputs) return result @staticmethod def _do_user_qa_bandpass(inputs: WvrgcalInputs) -> BandpassResults: """ Accept and return the bandpass result affixed to the inputs. This code path is used as an optimisation, so identical caltables need not be recalculated. """ bp_result = inputs.bandpass_result bp_result.accept(inputs.context) return bp_result def _do_new_qa_bandpass(self, inputs: WvrgcalInputs) -> BandpassResults: """ Create a new bandpass caltable by spawning a bandpass worker task, merging the results with the context. """ # passing an empty string as intent tells bandpass to use all intents, # which resolves to all fields. Convert empty/blank string to None, # which will tell bandpass to use the default bandpass intents, as # expected. if inputs.qa_bandpass_intent and inputs.qa_bandpass_intent.strip(): # qa_bandpass_intent is not None and is not empty/blank string. intent = inputs.qa_bandpass_intent else: intent = None args = {'vis': inputs.vis, 'mode': 'channel', 'intent': intent, 'spw': inputs.qa_spw, 'hm_phaseup': 'manual', 'hm_bandpass': 'fixed', 'solint': 'inf,7.8125MHz'} inputs = bandpass.SerialALMAPhcorBandpass.Inputs(inputs.context, **args) task = bandpass.SerialALMAPhcorBandpass(inputs) result = self._executor.execute(task, merge=False) if not result.final: pass else: result.accept(inputs.context) return result def _do_nowvr_gaincal(self, inputs: WvrgcalInputs) -> GaincalResults: # do a phase calibration on the bandpass and phase # calibrators with B preapplied LOG.info('Calculating phase calibration with B applied') if inputs.nowvr_result: # if table already exists use it LOG.debug('Reusing WVR-uncorrected gain result for RMS:\n %s' % inputs.nowvr_result) return inputs.nowvr_result else: LOG.debug('Calculating new gaincal with B but no WVR') # get namer that will add '.wvr' to caltable filename nowvr_caltable_namer = self._get_nowvr_caltable_namer() result = self._do_qa_gaincal(inputs, nowvr_caltable_namer) return result def _do_wvr_gaincal(self, inputs: WvrgcalInputs) -> GaincalResults: # get namer that will add '.flags_1_2.wvr' to caltable filename wvr_caltable_namer = self._get_wvr_caltable_namer(inputs) return self._do_qa_gaincal(inputs, wvr_caltable_namer) def _do_qa_gaincal(self, inputs: WvrgcalInputs, caltable_namer: Callable[[str], str]) -> GaincalResults: """ Generate a new gain caltable via a call to a child pipeline task. Analysing the improvement gained by applying the WVR requires that exactly the same gaincal job is called with and without the WVR preapply. Coding the gaincal as a separate function with minimal outside interaction helps enforce that requirement. """ args = {'vis': inputs.vis, 'intent': inputs.qa_intent, 'spw': inputs.qa_spw, 'solint': 'int', 'calmode': 'p', 'minsnr': 0.0} inputs = gaincal.GTypeGaincal.Inputs(inputs.context, **args) # give calling code a chance to customise the caltable name via the # callback passed as an argument inputs.caltable = caltable_namer(inputs.caltable) task = gaincal.GTypeGaincal(inputs) result = self._executor.execute(task, merge=False) return result @staticmethod def _get_nowvr_caltable_namer() -> Callable[[str], str]: """ Returns a function that inserts a '.nowvr' component into a filename. """ def caltable_namer(caltable: str) -> str: root, ext = os.path.splitext(caltable) new_caltable_name = '%s.nowvr%s' % (root, ext) LOG.debug('WVR uncorrected phase RMS gain table is %s' % new_caltable_name) return new_caltable_name return caltable_namer @staticmethod def _get_wvr_caltable_namer(inputs: WvrgcalInputs) -> Callable[[str], str]: """ Returns a function that inserts a ''.flagged_<N>_antennas.wvr' component into a filename. """ flags = '.flagged_%d_antennas' % len(inputs.wvrflag) if inputs.wvrflag else '' def caltable_namer(caltable: str) -> str: root, ext = os.path.splitext(caltable) new_caltable = '%s%s.wvr%s' % (root, flags, ext) LOG.debug('WVR-corrected phase RMS gain table is %s' % os.path.basename(new_caltable)) return new_caltable return caltable_namer @staticmethod def _get_wvrinfos(result: dict) -> list[WVRInfo]: """ Retrieve necessary information from the result returned by the CASA 'wvrgcal' task. Args: result: result dictionary returned by CASA 'wvrgcal' Returns: List containing WVR info for each antenna. """ def to_microns(x: float) -> measures.Distance: return measures.Distance(x, measures.DistanceUnits.MICROMETRE) # copy result in case we need it unaltered elsewhere, then convert raw # values to domain measures copied = dict(result) copied['RMS_um'] = [to_microns(v) for v in copied['RMS_um']] copied['Disc_um'] = [to_microns(v) for v in copied['Disc_um']] attrs = ['Name', 'WVR', 'Flag', 'RMS_um', 'Disc_um'] zipped = list(zip(*[copied.get(attr) for attr in attrs])) wvr_infos = [WVRInfo(*row) for row in zipped] return wvr_infos