Source code for pipeline.hif.tasks.selfcal.selfcal

from __future__ import annotations

import copy
import json
import os
import shutil
import tarfile
import traceback
from datetime import datetime, timezone
from fnmatch import fnmatch
from pathlib import Path
from typing import TYPE_CHECKING

import numpy as np
from astropy.utils.misc import JsonCustomEncoder

import pipeline.domain.measures as measures
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.filenamer as filenamer
import pipeline.infrastructure.mpihelpers as mpihelpers
import pipeline.infrastructure.vdp as vdp
from pipeline import environment
from pipeline.domain import DataType
from pipeline.hif.heuristics.auto_selfcal import auto_selfcal
from pipeline.hif.tasks.applycal import SerialIFApplycal
from pipeline.hif.tasks.makeimlist import MakeImList
from pipeline.infrastructure import callibrary, casa_tasks, casa_tools, task_registry, utils
from pipeline.infrastructure.contfilehandler import contfile_to_chansel
from pipeline.infrastructure.mpihelpers import TaskQueue

if TYPE_CHECKING:
    from typing import Any

LOG = infrastructure.get_logger(__name__)


class SelfcalResults(basetask.Results):
    def __init__(self, targets, applycal_result_contline=None, applycal_result_line=None, selfcal_resources=None,
                 is_restore=False):
        super().__init__()
        self.pipeline_casa_task = 'Selfcal'
        self.targets = targets
        self.applycal_result_contline = applycal_result_contline
        self.applycal_result_line = applycal_result_line
        self.is_restore = is_restore
        self.selfcal_resources = selfcal_resources

    def merge_with_context(self, context):
        """See :method:`~pipeline.infrastructure.api.Results.merge_with_context`."""

        # save selfcal results into the Pipeline context
        if context.selfcal_targets:
            LOG.warning('context.selfcal_targets is being over-written.')

        scal_targets_ctx = copy.deepcopy(self.targets)
        for target in scal_targets_ctx:
            target.pop('heuristics', None)
        context.selfcal_targets = scal_targets_ctx

        # if selfcal_resources is None, then the selfcal solver is not triggered and no need to register the
        # selfcal resources for auxproducts exporting.
        if self.selfcal_resources is not None:
            context.selfcal_resources = self.selfcal_resources

        if self.applycal_result_contline is not None:
            self._register_datatype(context, self.applycal_result_contline)
        if self.applycal_result_line is not None:
            self._register_datatype(context, self.applycal_result_line)

    def _register_datatype(self, context, appycal_result):

        calto_list = []
        for r in appycal_result:
            for calapp in r.applied:
                calto_list.append(calapp.calto)

        # register the selfcal results to the observing run
        for calto in calto_list:

            vis = calto.vis
            spw_sel = calto.spw

            # Create a mapping of regcal data types to their applied selfcal types
            data_type_mapping = {
                DataType.REGCAL_CONTLINE_SCIENCE: DataType.SELFCAL_CONTLINE_SCIENCE,
                DataType.REGCAL_LINE_SCIENCE: DataType.SELFCAL_LINE_SCIENCE,
                DataType.REGCAL_CONT_SCIENCE: DataType.SELFCAL_CONT_SCIENCE
            }

            # `calto.field` is a reduced CASA-style selection string derived from the calapp-apply consolidation.
            # The value can be a field name, a comma separated field id selection string, or an empty string.
            # See `CaltoIDAdapter.field` for more details on this structure.
            # Here, its selection scope needs to be translated back to the source name(s) used in `cleantarget`.

            # Retrieve the MS object for the observing run associated with the given visibility (`vis`).
            ms = context.observing_run.get_ms(vis)

            # Fetch the field names matching the given `calto.field` and `calto.intent`.
            # Use a set comprehension to collect unique field names, and join them into a single string.
            field_name = ','.join({field.name for field in ms.get_fields(task_arg=calto.field, intent=calto.intent)})

            # Retrieve the data type
            data_dtype = ms.get_data_type('DATA', field_name, spw_sel)

            # Find the corresponding selfcal type
            dtype_applied = data_type_mapping.get(data_dtype, None)
            if dtype_applied is None:
                LOG.warning('No selfcal data type found corresponding to the data type: %s '
                            'associated with field=%r, spw=%r in vis=%s. Skipping registration.',
                            data_dtype, field_name, spw_sel, vis)
                continue

            with casa_tools.TableReader(vis) as tb:
                # check for the existance of CORRECTED_DATA first
                if 'CORRECTED_DATA' not in tb.colnames():
                    LOG.warning('No CORRECTED_DATA column in %s, skip %s registration', vis, dtype_applied)
                    continue
                LOG.debug('DataType registration: overwrite=%s', self.inputs['overwrite'])
                LOG.info('Registering the CORRECTED_DATA column as %s for %s: field=%r spw=%r',
                         dtype_applied, vis, field_name, spw_sel)
                ms.set_data_column(dtype_applied, 'CORRECTED_DATA', source=field_name,
                                   spw=spw_sel, overwrite=self.inputs['overwrite'])

    def __repr__(self):
        return 'SelfcalResults:'


class SelfcalInputs(vdp.StandardInputs):

    # restrict input vis to be of type REGCAL_CONTLINE_SCIENCE
    # potentially we could allow REGCAL_CONTLINE_ALL here (e.g. tmp ms splitted from 'corrected' data column),
    # but there is no space for applying final selfcal solutions to the data.

    processing_data_type = [DataType.REGCAL_CONT_SCIENCE, DataType.REGCAL_CONTLINE_SCIENCE]

    field = vdp.VisDependentProperty(default='')

    @field.convert
    def field(self, val):
        if not isinstance(val, (str, type(None))):
            # PIPE-1881: allow field names that mistakenly get casted into non-string datatype by
            # recipereducer (utils.string_to_val) and executeppr (XmlObjectifier.castType)
            LOG.warning('The field selection input %r is not a string and will be converted.', val)
            val = str(val)
        return val

    spw = vdp.VisDependentProperty(default='')
    contfile = vdp.VisDependentProperty(default='cont.dat')
    hm_imsize = vdp.VisDependentProperty(default=None)
    hm_cell = vdp.VisDependentProperty(default=None)
    apply = vdp.VisDependentProperty(default=True)
    parallel = vdp.VisDependentProperty(default='automatic')
    recal = vdp.VisDependentProperty(default=False)
    restore_only = vdp.VisDependentProperty(default=False)
    overwrite = vdp.VisDependentProperty(default=False)

    n_solints = vdp.VisDependentProperty(default=4.0)
    amplitude_selfcal = vdp.VisDependentProperty(default=False)
    gaincal_minsnr = vdp.VisDependentProperty(default=2.0)
    minsnr_to_proceed = vdp.VisDependentProperty(default=3.0)
    delta_beam_thresh = vdp.VisDependentProperty(default=0.05)
    apply_cal_mode_default = vdp.VisDependentProperty(default='calflag')
    rel_thresh_scaling = vdp.VisDependentProperty(default='log10')
    dividing_factor = vdp.VisDependentProperty(default=None)
    check_all_spws = vdp.VisDependentProperty(default=False)
    inf_EB_gaincal_combine = vdp.VisDependentProperty(default=False)
    refantignore = vdp.VisDependentProperty(default='')

    @refantignore.postprocess
    def refantignore(self, unprocessed):
        if not isinstance(unprocessed, (str, dict)):
            LOG.error('refantignore must be string or dictionary')
            raise ValueError('refantignore must be string or dictionary')
        refantignore_per_ms = {}
        if isinstance(unprocessed, dict):
            vis_not_selected = set(unprocessed) - set(self.vis)
            if vis_not_selected:
                LOG.warning(
                    '%s specified in refantignore not in task input MS list, will be ignored.',
                    utils.commafy(sorted(vis_not_selected), quotes=False),
                )
        for vis in self.vis:
            if isinstance(unprocessed, str):
                refantignore_per_ms[vis] = unprocessed
            if isinstance(unprocessed, dict):
                refantignore_per_ms[vis] = unprocessed.get(vis, '')

        return refantignore_per_ms

    usermask = vdp.VisDependentProperty(default=None)
    # input as dictionary for individual clean targets, e.g.
    #   usermask={'IRAS32':'IRAS32.rgn', 'IRS5N':'IRS5N.rgn'}
    #   usermask={'IRAS32':{'Band_6':'IRAS32.rgn'}, 'IRS5N':{'Band_6': 'IRS5N.rgn'}}
    #   in which .rgn is a CRTF region (CASA region format)
    # for clean targets not found in the (nested) dictionary structure (source->band), the usermask value will be empty (default)
    usermodel = vdp.VisDependentProperty(default=None)
    # input as dictiionary for individual clean targets, e.g.
    #   usermodel={'IRAS32':['IRAS32-model.tt0','IRAS32-model.tt1'], 'IRS5N':['IRS5N-model.tt0','IRS5N-model.tt1']}
    #   usermodel={'IRAS32':{'Band_6':['IRAS32-model.tt0','IRAS32-model.tt1']}, 'IRS5N':{'Band_6':['IRS5N-model.tt0','IRS5N-model.tt1']}}
    # for clean targets not found in the (nested) dictionary structure (source->band), the usermask value will be empty (default)

    allow_wproject = vdp.VisDependentProperty(default=False)
    restore_resources = vdp.VisDependentProperty(default=None)

    # docstring and type hints: supplements hif_selfcal
    def __init__(self, context, vis=None, field=None, spw=None, contfile=None, hm_imsize=None, hm_cell=None, n_solints=None,
                 amplitude_selfcal=None, gaincal_minsnr=None, refantignore=None,
                 minsnr_to_proceed=None, delta_beam_thresh=None, apply_cal_mode_default=None,
                 rel_thresh_scaling=None, dividing_factor=None, check_all_spws=None, inf_EB_gaincal_combine=None,
                 usermask=None, usermodel=None, allow_wproject=None,
                 apply=None, parallel=None, recal=None, restore_only=None, overwrite=None,
                 restore_resources=None):
        r"""Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets defined in the pipeline context.

            field: Select field(s) for self-calibration. Use field name(s) NOT id(s). Mosaics are assumed to have common source/field names.

            spw: Select spectral windows for self-calibration.

            contfile: Name of file to specify line-free frequency ranges for selfcal continuum imaging. Defaults to ``'cont.dat'``.


            hm_imsize: self-calibration imaging dimension in pixels, or PB level for single fields.

            hm_cell: self-calibration imaging cell size.

            n_solints: number of solution intervals to attempt for self-calibration. Defaults to ``4``.

            amplitude_selfcal: Attempt amplitude self-calibration following phase-only self-calibration; if median time
                between scans of a given target is < 150s, solution intervals of ``300s`` and ``inf`` will be attempted,
                otherwise just ``inf`` will be attempted. Defaults to ``False``.

            gaincal_minsnr: Minimum S/N for a solution to not be flagged by gaincal. Defaults to ``2.0``.

            refantignore: Antennas to be ignored as reference antennas. Defaults to ``''``. Examples:

                * ``refantignore='ea02,ea03'``
                * ``refantignore={'ms1.ms':'ea02,ea03','ms2.ms': 'ea03'}``, specified at the per-ms level.

            minsnr_to_proceed: Minimum estimated S/N on a per antenna basis to attempt self-calibration of a source.
                Defaults to ``3.0``.

            delta_beam_thresh: Allowed fractional change in beam size for selfcalibration to accept results of a solution interval.
                Defaults to ``0.05``.

            apply_cal_mode_default: Apply mode to use for applycal task during self-calibration; same options as applycal.
                Defaults to ``'calflag'``.

            rel_thresh_scaling: Scaling type to determine how clean thresholds per solution interval should be determined going
                from the starting clean threshold to 3.0 * RMS for the final solution interval. Defaults to ``'log10'``.
                Available options: ``'linear'``, ``'log10'``, or ``'loge'`` (natural log)

            dividing_factor: Scaling factor to determine clean threshold for first self-calibration solution interval.
                Equivalent to (Peak S/N / dividing_factor) \* RMS as the first clean threshold;
                however, if  `(Peak S/N / dividing_factor) < 5.0`;
                a value of 5.0 is used for the first clean threshold.
                Defaults to ``40`` for < 8 GHz or ``15`` for > 8 GHz.

            check_all_spws: If ``True``, the S/N of mfs images created on a per-spectral-window basis will be compared at the
                initial stages final self-calibration. Defaults to ``False``.

            inf_EB_gaincal_combine: change gain solution combination parameters for the inf_EB solution interval. if ``True``,
                the gaincal combine parameter will be set to ``'scan,spw'``; if ``False``, the gaincal combine parameter will
                be set to ``'scan'``. Defaults to ``False``.


            allow_wproject: Allow the wproject heuristics for self-calibration imaging. Defaults to ``False``.

            apply: Apply final selfcal solutions back to the input MeasurementSets. Defaults to ``True``.

            parallel: Process multiple MeasurementSets in parallel using the casampi parallelization framework, and use CASA/tclean
                parallel imaging, when possible.

                Options: ``'automatic'``, ``'true'``, ``'false'``, ``True``, ``False``

                Default: ``None`` (equivalent to ```'automatic'``)

            recal: Always re-do self-calibration even solutions/caltables are found in the Pipeline context or json restore file.
                Defaults to `False`.

                Note that the selfcal solutions might not be applied if self-calibrated data labeled by the pipeline Datatypes already
                exists. See ``overwrite`` below.

            restore_only:   Only attempt to apply pre-existing selfcal calibration tables and would not run
                            the self-calibration sequence if their records (.selfcal.json, gaintables) are not present.
                            Defaults to `False`. ``restore_only`` will take precedence over ``recal``.

            overwrite: Allow overwriting pre-existing self-calibrated data of applicable field/spw labeled by DataType.
                Defaults to ``False``.

            restore_resources: Path to the restore resources from a standard run of hif_selfcal. hif_selfcal will automatically
                do an exhaustive search to lookup/extract/verify
                the selfcal restore resources, i.e., selfcal.json and all selfcal-caltable referred
                in selfcal.json, starting from *working/*, to *products/* and *rawdata/*.
                If restore_resources is specified, this file path will be evaluated first
                before the pre-defined exhaustive search list.
                The value can be the file path of *\*auxproducts.tgz* file or *\*selfcal.json* file.

            usermask: User mask to be used for self-calibration imaging. (not implemented)

            usermodel: User model to be used for self-calibration imaging. (not implemented)

        """
        super().__init__()
        self.context = context
        self.vis = vis
        self.field = field
        self.spw = spw
        self.contfile = contfile
        self.hm_imsize = hm_imsize
        self.hm_cell = hm_cell
        self.apply = apply
        self.parallel = parallel
        self.recal = recal
        self.restore_only = restore_only
        self.overwrite = overwrite
        self.refantignore = refantignore

        self.n_solints = n_solints
        self.amplitude_selfcal = amplitude_selfcal
        self.gaincal_minsnr = gaincal_minsnr
        self.minsnr_to_proceed = minsnr_to_proceed
        self.delta_beam_thresh = delta_beam_thresh
        self.apply_cal_mode_default = apply_cal_mode_default
        self.rel_thresh_scaling = rel_thresh_scaling
        self.dividing_factor = dividing_factor
        self.check_all_spws = check_all_spws
        self.inf_EB_gaincal_combine = inf_EB_gaincal_combine
        self.restore_resources = restore_resources
        self.usermask = usermask
        self.usermodel = usermodel
        self.allow_wproject = allow_wproject


[docs] @task_registry.set_equivalent_casa_task('hif_selfcal') @task_registry.set_casa_commands_comment('Run self-calibration using the science target visibilities.') class Selfcal(basetask.StandardTaskTemplate): Inputs = SelfcalInputs is_multi_vis_task = True def __init__(self, inputs): super().__init__(inputs) @staticmethod def _scal_targets_to_json(scal_targets: list[dict[str, Any]], filename: str | Path = 'selfcal.json') -> None: """Serialize scal_targets to a JSON file. Creates a JSON file containing the scal_targets data along with metadata including version, timestamp, and pipeline version. The 'heuristics' key is removed from each target before serialization. Args: scal_targets: List of target dictionaries containing calibration data filename: Output JSON filename or path """ current_version = 1.0 current_datetime = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') scal_targets_copy = copy.deepcopy(scal_targets) # Remove heuristics from each target for target in scal_targets_copy: target.pop('heuristics', None) scal_targets_json = { 'scal_targets': scal_targets_copy, 'version': current_version, 'datetime': current_datetime, 'pipeline_version': environment.pipeline_revision, } output_path = Path(filename) # Write JSON with custom formatting (keys may include mixed strings/integers, so no sorting) with output_path.open('w', encoding='utf-8') as fp: json.dump( scal_targets_json, fp, sort_keys=False, indent=2, cls=JsonCustomEncoder, separators=(',', ': '), ) @staticmethod def _json_debug_to_json_lite(filename='selfcal.debug.json'): """Convert a debug JSON file to a lightweight JSON file. This function reads a debug selfcal JSON file containing full self-calibration targets/library data and generates a lightweight version of with essential calibration data fields suitable for archival and selfcal solution restoration. Args: filename: Path to the debug JSON file. Defaults to 'selfcal.debug.json'. """ json_path = Path(filename) scal_targets = json.loads(json_path.read_text(encoding='utf-8')) Selfcal._scal_targets_to_json_lite(scal_targets['scal_targets'], filename=filename.replace('.debug.json', '.json')) @staticmethod def _scal_targets_to_json_lite(scal_targets: list[dict[str, Any]], filename: str | Path = 'selfcal.json') -> None: """Serialize scal_targets to a lightweight JSON file. Creates a JSON file with only essential calibration data fields. This reduces file size by including only critical information like field, spw_real, exceptions, and selected calibration library data. Args: scal_targets: List of target dictionaries containing calibration data. filename: Output JSON filename or path. Defaults to 'selfcal.json'. PIPE-2769: This function is not currently in use. It was originally intended to generate a "lite" selfcal.json file as described in PIPE-2646. Howeever, we decide to export a full selfcal diagnostic instead as of PL2025. This private method is kept for future reference. """ current_version = 1.0 current_datetime = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') scal_targets_copy: list[dict[str, Any]] = [] for target in scal_targets: target_lite = { 'field': target['field'], 'phasecenter': target['phasecenter'], 'cell': target['cell'], 'imsize': target['imsize'], 'field_name': target['field_name'], 'is_repr_target': target['is_repr_target'], 'is_mosaic': target['is_mosaic'], 'spw': target['spw'], 'spw_real': target['spw_real'], 'sc_exception': target['sc_exception'], 'sc_workdir': target['sc_workdir'], } # Extract essential calibration library data if present if not target_lite['sc_exception'] and isinstance(target.get('sc_lib'), dict): target_lite['sc_lib'] = {} sc_lib = target['sc_lib'] # Copy success status and RMS values; required for QA and weblog rendering for key in ('SC_success', 'RMS_orig', 'RMS_final'): if key in sc_lib: target_lite['sc_lib'][key] = sc_lib[key] # Copy solution intervals and band information; required for QA and weblog rendering for key in ('sc_solints', 'sc_band'): if key in target: target_lite[key] = target[key] # Copy final solution intervals; required for QA and weblog rendering for key in ('final_solints', 'final_phase_solints'): if key in sc_lib: target_lite['sc_lib'][key] = sc_lib[key] # Copy visibility list and essential calibration parameters if 'vislist' in sc_lib: target_lite['sc_lib']['vislist'] = sc_lib['vislist'] # Copy essential calibration parameters for each visibility for vis in target_lite['sc_lib']['vislist']: target_lite['sc_lib'][vis] = {} essential_keys = [ 'gaintable_final', 'applycal_interpolate_final', 'spwmap_final', ] # Copy essential keys if they exist; required for QA and weblog rendering for key in essential_keys: if key in sc_lib.get(vis, {}): target_lite['sc_lib'][vis][key] = sc_lib[vis][key] # Copy solution interval data for solint in target_lite.get('sc_solints', []): if solint in sc_lib.get(vis, {}): target_lite['sc_lib'][vis][solint] = {} scal_targets_copy.append(target_lite) scal_targets_json = { 'scal_targets': scal_targets_copy, 'version': current_version, 'datetime': current_datetime, 'pipeline_version': environment.pipeline_revision, } output_path = Path(filename) # Write JSON with custom formatting (keys may include mixed strings/integers, so no sorting) with output_path.open('w', encoding='utf-8') as fp: json.dump( scal_targets_json, fp, sort_keys=False, indent=2, cls=JsonCustomEncoder, separators=(',', ': '), ) @staticmethod def _scal_targets_from_json(filename='selfcal.json'): """Deserilize scal_targets from a json file.""" LOG.info('Reading the selfcal targets list from %s', filename) with open(filename, 'r', encoding='utf-8') as fp: scal_targets_json = json.load(fp) return scal_targets_json['scal_targets'] def _apply_scal_check_caltable(self, sc_targets, mses=None): """Check if all calibration tables required for applying the selfcal solutions are ready to use. Returns: caltable_list: a list of calibration tables requested based on the calapply information inside scal_targets caltable_ready: a list of the requested cal tables that are ready to be applied. """ if mses is None: obs_run = self.inputs.context.observing_run mses_regcal_contline = obs_run.get_measurement_sets_of_type( [DataType.REGCAL_CONT_SCIENCE, DataType.REGCAL_CONTLINE_SCIENCE], msonly=True) mses_regcal_line = obs_run.get_measurement_sets_of_type([DataType.REGCAL_LINE_SCIENCE], msonly=True) mses = mses_regcal_contline+mses_regcal_line # collect a name list of MSes which we could apply the selfcal solutions to. vislist_calto = [ms.name for ms in mses] caltable_list = [] caltable_ready = [] for cleantarget in sc_targets: if cleantarget['sc_exception']: continue sc_lib = cleantarget['sc_lib'] sc_workdir = cleantarget['sc_workdir'] if not sc_lib['SC_success']: continue for vis in sc_lib['vislist']: vis_base = os.path.splitext(os.path.basename(vis))[0] if vis_base.endswith('_cont'): vis_base = vis_base[:-5] for idx, gaintable in enumerate(sc_lib[vis]['gaintable_final']): for vis_calto in vislist_calto: if vis_calto.startswith(vis_base): gaintable = os.path.join(sc_workdir, sc_lib[vis]['gaintable_final'][idx]) if gaintable not in caltable_list: caltable_list.append(gaintable) if os.path.exists(gaintable): if gaintable not in caltable_ready: caltable_ready.append(gaintable) LOG.info('selfcal/caltable(s) request : %r', caltable_list) LOG.info('selfcal/caltable(s) ready : %r', caltable_ready) return caltable_list, caltable_ready def _check_restore_from_resources(self): """Check if we can do selfcal restore from the restore resources.""" scal_targets = None pat_list = ['*.auxproducts.tgz', '*.selfcal.json', '../products/*.auxproducts.tgz', '*.selfcal.json', '../rawdata/*.auxproducts.tgz', '*.selfcal.json'] match_list = ('sc_workdir*', '*.selfcal.json') if self.inputs.restore_resources is not None: pat_list = [self.inputs.restore_resources, '*selfcal.json']+pat_list for pat in pat_list: for check_file in utils.glob_ordered(pat): if check_file.endswith('.auxproducts.tgz') and tarfile.is_tarfile(check_file): with tarfile.open(check_file, 'r:gz') as tar: members = [] for member in tar.getmembers(): is_resource = any([fnmatch(member.name, pat) for pat in match_list]) if is_resource: members.append(member) LOG.info('extracting: %s from %s', member.name, check_file) tar.extractall(members=members, filter='fully_trusted') if check_file.endswith('.selfcal.json'): scal_targets_json = self._scal_targets_from_json(check_file) if scal_targets_json is not None: caltable_list, caltable_ready = self._apply_scal_check_caltable(scal_targets_json) # we verify the existances of required caltables in-flight as the file search progresses. if len(caltable_ready) < len(caltable_list): LOG.warning( 'The required selfcal caltable(s) is missing if we use scal_targets from the json file %s', check_file) else: scal_targets = scal_targets_json if scal_targets is not None: break return scal_targets def _check_restore_from_context(self): """Check if we can do selfcal restore from scal_targets saved in the context.""" scal_targets = None if self.inputs.context.selfcal_targets: scal_targets_last = self.inputs.context.selfcal_targets LOG.info( 'Found selfcal results in the context. Looking for the required caltables for applying the selfcal solutions.' ) caltable_list, caltable_ready = self._apply_scal_check_caltable(scal_targets_last) if len(caltable_ready) < len(caltable_list): LOG.warning('The required selfcal caltable(s) is missing if we use scal_targets from the context.') else: scal_targets = scal_targets_last return scal_targets
[docs] def prepare(self): inputs = self.inputs if inputs.vis in (None, [], ''): # If no suitable datatype, by-pass the hif_selfcal stage. required_data_type_desc = ', '.join(dt.name for dt in SelfcalInputs.processing_data_type) LOG.warning('No data matching any of the required datatypes: %s; ' 'please review the registered MS datatype information.', required_data_type_desc) return SelfcalResults([], None, None, None, False) if not isinstance(inputs.vis, list): inputs.vis = [inputs.vis] scal_targets_last = scal_targets_json = None # Check if we can use a scal_targets list from the Pipeline context scal_targets_last = self._check_restore_from_context() # Check if we can use a scal_targets list from restore_resources scal_targets_json = self._check_restore_from_resources() obs_run = self.inputs.context.observing_run mses_columns_regcal_contline, _ = obs_run.get_measurement_sets_of_type( [DataType.REGCAL_CONT_SCIENCE, DataType.REGCAL_CONTLINE_SCIENCE], msonly=False) mses_columns_selfcal_contline, _ = obs_run.get_measurement_sets_of_type( [DataType.SELFCAL_CONT_SCIENCE, DataType.SELFCAL_CONTLINE_SCIENCE], msonly=False) mses_regcal_contline = list(mses_columns_regcal_contline) mses_selfcal_contline = list(mses_columns_selfcal_contline) mses_regcal_line = obs_run.get_measurement_sets_of_type([DataType.REGCAL_LINE_SCIENCE], msonly=True) mses_selfcal_line = obs_run.get_measurement_sets_of_type([DataType.SELFCAL_LINE_SCIENCE], msonly=True) scal_targets = [] if not self.inputs.recal: # only sideload the selfcal restore information from the context or json if recal=False if scal_targets_last is not None: scal_targets = scal_targets_last LOG.info('Staging the previous selfcal solutions from the pipeline context.') if scal_targets_json is not None: scal_targets = scal_targets_json LOG.info('Staging the previous selfcal solutions from selfcal.json.') # if applycal_result_contline is None, then contline applycal is not triggered. # if applycal_result_line is None, then line applycal is not triggered. # if selfcal_resources is None, then the selfcal solver is not triggered, even selfcal solution could be applied. applycal_result_contline = applycal_result_line = selfcal_resources = None is_restore = True if not scal_targets: if self.inputs.restore_only: LOG.info('No valid selfcal restoration resource is found and restore_only=True; skip a new self-calibration sequence.') return SelfcalResults( scal_targets, applycal_result_contline, applycal_result_line, selfcal_resources, is_restore) if self.inputs.recal: LOG.info('recal=True, override any existing selfcal solution in context or json, ' 'and alway execute the selfcal solver.') LOG.info('Execute the selfcal solver.') scal_targets = self._solve_selfcal() is_restore = False selfcal_json = self.inputs.context.name+'.selfcal.json' self._scal_targets_to_json(scal_targets, filename=selfcal_json) scal_caltable, _ = self._apply_scal_check_caltable(scal_targets, mses_regcal_contline+mses_regcal_line) selfcal_resources = [selfcal_json] + scal_caltable LOG.debug('selfcal resources list: %r', selfcal_resources) if not scal_targets: LOG.info('No valid selfcal field returned by the selfcal solver; skip the selfcal apply step.') return SelfcalResults( scal_targets, applycal_result_contline, applycal_result_line, selfcal_resources, is_restore) if self.inputs.apply: if mses_regcal_contline: if mses_selfcal_contline: LOG.warning( 'Found DataType:SELFCAL_CONT*_SCIENCE before attempting to apply selfcal solutions.' ) for ms in mses_selfcal_contline: LOG.debug(' %s: %s', ms.basename, ms.data_column) else: LOG.info( 'No DataType:SELFCAL_CONT*_SCIENCE found before attempting to apply selfcal solutions.' ) if mses_selfcal_contline and not self.inputs.overwrite: LOG.warning( 'Skip applying selfcal solutions to the REGCAL_CONT*_SCIENCE MS(es) due to ' 'the presence of DataType:SELFCAL_CONT*_SCIENCE and recal/overwrite=False.' ) else: LOG.info('Applying selfcal solutions to the REGCAL_CONT*_SCIENCE data.') applycal_result_contline = self._apply_scal(scal_targets, mses_regcal_contline) LOG.info( 'DataType info for he REGCAL_CONT*_SCIENCE MS(es) after applying selfcal solutions:' ) for ms in mses_regcal_contline: LOG.debug(' %s: %s', ms.basename, ms.data_column) if mses_regcal_line: if mses_selfcal_line: LOG.warning('Found DataType:SELFCAL_LINE_SCIENCE before attempting to apply selfcal solutions.') for ms in mses_selfcal_line: LOG.debug(' %s: %s', ms.basename, ms.data_column) else: LOG.info('No DataType:SELFCAL_LINE_SCIENCE found before attempting to apply selfcal solutions.') if mses_selfcal_line and not self.inputs.overwrite: LOG.warning( 'Skip applying selfcal solutions to the REGCAL_LINE_SCIENCE MS(es). due to ' 'the presence of DataType:SELFCAL_CONT*_SCIENCE and recal=False.' ) else: LOG.info('Attempt to apply any selfcal solutions to the REGCAL_LINE_SCIENCE MS(es)') applycal_result_line = self._apply_scal(scal_targets, mses_regcal_line) LOG.info('DataType info for the REGCAL_LINE_SCIENCE MS(es) after applying selfcal solutions:') for ms in mses_regcal_line: LOG.debug(' %s: %s', ms.basename, ms.data_column) return SelfcalResults( scal_targets, applycal_result_contline, applycal_result_line, selfcal_resources, is_restore )
def _solve_selfcal(self): # collect the target list scal_targets = self._get_scaltargets(scal=True) scal_targets = self._check_scaltargets(scal_targets) if not scal_targets: return scal_targets # split per-cleantarget MSes with optional spectral line flagging self._split_scaltargets(scal_targets) # start the selfcal sequence. if self.inputs.inf_EB_gaincal_combine: inf_EB_gaincal_combine = 'scan,spw' else: inf_EB_gaincal_combine = 'scan' parallel = mpihelpers.parse_parallel_input_parameter(self.inputs.parallel) taskqueue_parallel_request = len(scal_targets) > 1 and parallel self.inputs.refantignore with TaskQueue(parallel=taskqueue_parallel_request) as tq: for target in scal_targets: target['sc_parallel'] = (parallel and mpihelpers.is_mpi_ready() and not tq.is_async()) tq.add_functioncall(self._run_selfcal_sequence, target, gaincal_minsnr=self.inputs.gaincal_minsnr, minsnr_to_proceed=self.inputs.minsnr_to_proceed, delta_beam_thresh=self.inputs.delta_beam_thresh, apply_cal_mode_default=self.inputs.apply_cal_mode_default, rel_thresh_scaling=self.inputs.rel_thresh_scaling, dividing_factor=self.inputs.dividing_factor, refantignore=self.inputs.refantignore, check_all_spws=self.inputs.check_all_spws, n_solints=self.inputs.n_solints, do_amp_selfcal=self.inputs.amplitude_selfcal, inf_EB_gaincal_combine=inf_EB_gaincal_combine, executor=self._executor, use_pickle=True) tq_results = tq.get_results() for idx, target in enumerate(scal_targets): scal_library, _ = tq_results[idx] sc_exception = False if scal_library is None: sc_exception = True if not sc_exception: try: # scal_library here is expected to contain only a single target/band combination. sc_field = list(scal_library.keys())[0] # the selfcal heuristics target name sc_band = list(scal_library[sc_field].keys())[0] # the selfcal heuristics band name if sc_field != target['sc_field']: # note scal_library is keyed by field name without quotes at this moment. # see. https://casadocs.readthedocs.io/en/stable/notebooks/visibility_data_selection.html#The-field-Parameter # utils.fieldname_for_casa() and # utils.dequote() LOG.warning( 'The field name of the selfcal library is different from the clean target dequoted field name: %s != %s', sc_field, target['sc_field']) target['sc_band'] = sc_band target['sc_lib'] = scal_library[sc_field][sc_band] target['sc_solints'] = target['sc_lib']['solints'] target['sc_rms_scale'] = target['sc_lib']['RMS_final'] / target['sc_lib']['theoretical_sensitivity'] target['sc_success'] = target['sc_lib']['SC_success'] except Exception as err: traceback_msg = traceback.format_exc() LOG.debug('Exception: %s', err) LOG.info(traceback_msg) sc_exception = True if sc_exception: LOG.warning( 'An exception was triggered during the self-calibration sequence for target=%r spw=%r in the working directory: %s .', target['field'], target['spw'], target['sc_workdir']) target['sc_exception'] = sc_exception return scal_targets @staticmethod def _run_selfcal_sequence(scal_target, **kwargs): """Run the selfcal sequence for a single target. Note that this function is executed in a TaskQueue so potentially on an MPI server process. """ workdir = os.path.abspath('./') selfcal_library = trackback_msg = None try: os.chdir(scal_target['sc_workdir']) LOG.info('') LOG.info('Running auto_selfcal heuristics on target %s spw %s from %s', scal_target['field'], scal_target['spw'], scal_target['sc_workdir']) LOG.info('') selfcal_heuristics = auto_selfcal.SelfcalHeuristics(scal_target, **kwargs) # import pickle # with open('selfcal_heuristics.pickle', 'wb') as handle: # pickle.dump(selfcal_library, handle, protocol=pickle.HIGHEST_PROTOCOL) selfcal_library = selfcal_heuristics() except Exception as err: traceback_msg = traceback.format_exc() LOG.debug('Exception: %s', err) LOG.info(traceback_msg) finally: os.chdir(workdir) if scal_target['sc_parallel']: # sc_parallel=True indicats that we are certainly running tclean(parallel=true) in a sequential TaskQueue. # A side effect of doing this while changing cwd is that the working directory of MPIServers will be "stuck" # to the one where tclean(paralllel=True) started. # As a workaround, we send the chdir command to the MPIServers explicitly. mpihelpers.mpiclient.push_command_request( f'os.chdir({workdir!r})', block=True, target_server=mpihelpers.mpi_server_list) return selfcal_library, trackback_msg def _apply_scal(self, sc_targets, mses): calapps = [] vislist = [] # collect a name list of MSes which we could apply the selfcal solutions to. vislist_calto = [ms.name for ms in mses] for cleantarget in sc_targets: if cleantarget['sc_exception']: continue sc_lib = cleantarget['sc_lib'] if not sc_lib['SC_success']: continue sc_workdir = cleantarget['sc_workdir'] for vis in sc_lib['vislist']: vis_base = os.path.splitext(os.path.basename(vis))[0] if vis_base.endswith('_cont'): vis_base = vis_base[:-5] for idx, gaintable in enumerate(sc_lib[vis]['gaintable_final']): for vis_calto in vislist_calto: if vis_calto.startswith(vis_base): # workaround a potential issue from heuristics.auto_selfcal when gaintable has only one element, when it's not a list of list. spwmap_final = sc_lib[vis]['spwmap_final'] if any(not isinstance(spwmap, list) for spwmap in spwmap_final) or not spwmap_final: spwmap_final = [spwmap_final] gaintable = os.path.join(sc_workdir, sc_lib[vis]['gaintable_final'][idx]) calfrom = callibrary.CalFrom( gaintable=gaintable, interp=sc_lib[vis]['applycal_interpolate_final'][idx], calwt=False, spwmap=spwmap_final[idx], caltype='gaincal') calto = callibrary.CalTo( vis=vis_calto, field=cleantarget['field'], spw=cleantarget['spw_real'][vis]) calapps.append(callibrary.CalApplication(calto, calfrom)) vislist.append(vis_calto) for calapp in calapps: self.inputs.context.callibrary.add(calapp.calto, calapp.calfrom) vislist = sorted(set(vislist)) parallel = mpihelpers.parse_parallel_input_parameter(self.inputs.parallel) taskqueue_parallel_request = len(vislist) > 1 and parallel with TaskQueue(parallel=taskqueue_parallel_request, executor=self._executor) as tq: for vis in vislist: task_args = {'vis': vis, 'applymode': self.inputs.apply_cal_mode_default, 'intent': 'TARGET'} tq.add_pipelinetask(SerialIFApplycal, task_args, self.inputs.context) tq_results = tq.get_results() return tq_results
[docs] def analyse(self, results): return results
def _check_scaltargets(self, scal_targets): """Filter out the sources that the selfcal heuristics should not process. PIPE-1447/PIPE-1915: we do not execute selfcal heuristics for mosaic or ephemeris sources. """ final_scal_target = [] for scal_target in scal_targets: disable_mosaic = False if disable_mosaic and scal_target['is_mosaic']: LOG.warning( 'The self-calibration heuristics do not fully support mosaic yet. Skipping target=%r spw=%r.', scal_target['field'], scal_target['spw']) continue if scal_target['is_eph_obj']: LOG.warning( 'The self-calibration heuristics do not fully support ephemeris sources yet. Skipping target=%r spw=%r.', scal_target['field'], scal_target['spw']) continue final_scal_target.append(scal_target) return final_scal_target def _get_scaltargets(self, scal=True): """Get the cleantarget list from the context. This essenially runs MakeImList and go through all nesscary steps to get the target list. However, it will pick up the selfcal heuristics from imageparams_factory,ImageParamsHeuristicsFactory """ telescope = self.inputs.context.project_summary.telescope if telescope == 'ALMA': repr_ms = self.inputs.ms[0] diameter = min([a.diameter for a in repr_ms.antennas]) if diameter == 7.0: telescope = 'ACA' else: telescope = 'ALMA' makeimlist_inputs = MakeImList.Inputs( self.inputs.context, vis=None, intent='TARGET', specmode='cont', clearlist=True, scal=scal, contfile=self.inputs.contfile, field=self.inputs.field, spw=self.inputs.spw, hm_imsize=self.inputs.hm_imsize, hm_cell=self.inputs.hm_cell, allow_wproject=self.inputs.allow_wproject, datatype='regcal', parallel=self.inputs.parallel, # PIPE-2449: prevent applying imageprecheck customizations during planning # During imaging planning for self-calibration targets, customizations # from the imageprecheck step should not be applied. Here we removes those # customizations from the local context. uvtaper=[''], # disable possible uvtaper specification from context/imageprecheck robust=0.5, # disable potential robust specification from context/imageprecheck ) makeimlist_task = MakeImList(makeimlist_inputs) makeimlist_results = makeimlist_task.execute() scal_targets = makeimlist_results.targets if isinstance(self.inputs.usermask, dict): usermask = self.inputs.usermask else: usermask = {} if isinstance(self.inputs.usermodel, dict): usermodel = self.inputs.userrmodel else: usermodel = {} for scal_target in scal_targets: scal_target['sc_telescope'] = telescope _, repr_source, repr_spw, _, _, repr_real, _, _, _, _ = scal_target['heuristics'].representative_target() if str(repr_spw) in scal_target['spw'].split(',') and repr_source == utils.dequote(scal_target['field']): is_representative = True else: is_representative = False scal_target['is_repr_target'] = is_representative scal_target['is_mosaic'] = scal_target['heuristics'].is_mosaic(scal_target['field'], scal_target['intent']) scal_target['is_eph_obj'] = scal_target['heuristics'].is_eph_obj(scal_target['field']) # Note that scal_library is currently keyed by field name without quotes. # We use the 'field_name' value to retrieve self-calibration results from scal_library generated by the self-cal solver. scal_target['field_name'] = utils.dequote(scal_target['field']) scal_target['sc_field'] = utils.dequote(scal_target['field']) # not supporting the target->band nest dictionary structure yet. scal_target['sc_usermask'] = usermask.get(scal_target['sc_field'], '') scal_target['sc_usermodel'] = usermodel.get(scal_target['sc_field'], '') LOG.debug('scal_targets: %s', scal_targets) return scal_targets def _remove_ms(self, vis): vis_dirs = [vis, vis+'.flagversions'] for vis_dir in vis_dirs: if os.path.isdir(vis_dir): LOG.debug(f'removing {vis_dir}') self._executable.rmtree(vis_dir) def _split_scaltargets(self, scal_targets): """Split the input MSes into smaller MSes per cleantargets effeciently.""" # mt_inputvis_list aggregates input vis argument values of expected mstransform calls # therefore len(mt_inputvis_list) represents the number of ms to be split out mt_inputvis_list = [(vis, '_CONTLINE_' in target['datatype']) for target in scal_targets for vis in target['vis']] parallel = mpihelpers.parse_parallel_input_parameter(self.inputs.parallel) taskqueue_parallel_request = len(mt_inputvis_list) > 1 and parallel inputvis_list = utils.deduplicate([m[0] for m in mt_inputvis_list]) outputvis_list = [] # PIPE-2497: Attempting line flagging for all input visibilities, # regardless of whether the datatype is "CONTLINE" or "CONT". self._flag_lines(inputvis_list) with utils.ignore_pointing(inputvis_list): with TaskQueue(parallel=taskqueue_parallel_request) as tq: for target in scal_targets: vislist = [] band_str = self._get_band_name(target).lower().replace(' ', '_') sc_workdir = filenamer.sanitize(f'sc_workdir_{target["field"]}_{band_str}') if os.path.isdir(sc_workdir): shutil.rmtree(sc_workdir) os.mkdir(sc_workdir) sc_workdir_contfile = os.path.join(sc_workdir, 'cont.dat') if os.path.isfile(self.inputs.contfile) and not os.path.isfile(sc_workdir_contfile): shutil.copy(self.inputs.contfile, sc_workdir_contfile) spw_real = {} field = target['field'] uvrange = target['uvrange'] for vis in target['vis']: # we use virtualspw here for the naming convention (similar to the imaging naming convention). real_spwsel = self.inputs.context.observing_run.get_real_spwsel([target['spw']], [vis])[0] spw_real[vis] = real_spwsel outputvis = os.path.join(sc_workdir, os.path.basename(vis)) self._remove_ms(outputvis) ms = self.inputs.context.observing_run.get_ms(vis) spws = ms.get_spectral_windows(real_spwsel) mean_freq = np.mean([float(spw.mean_frequency.to_units( measures.FrequencyUnits.HERTZ)) for spw in spws]) bwarray = np.array([float(spw.bandwidth.to_units(measures.FrequencyUnits.HERTZ)) for spw in spws]) chanarray = np.array([spw.num_channels for spw in spws]) chanwidth_desired_hz = self.get_desired_width(mean_freq) chanbin = self.get_spw_chanbin(bwarray, chanarray, chanwidth_desired_hz) task_args = {'vis': vis, 'outputvis': outputvis, 'field': field, 'spw': real_spwsel, 'uvrange': uvrange, 'chanaverage': True, 'chanbin': chanbin, 'usewtspectrum': True, 'datacolumn': 'data', 'reindex': False, 'keepflags': True} outputvis_list.append((vis, outputvis)) tq.add_jobrequest(casa_tasks.mstransform, task_args, executor=self._executor) vislist.append(os.path.basename(outputvis)) target['sc_workdir'] = sc_workdir target['spw_real'] = spw_real target['sc_vislist'] = vislist self._restore_flags(inputvis_list) for outputvis in outputvis_list: # Copy across requisite XML files. self._copy_xml_files(outputvis[0], outputvis[1]) return scal_targets def _get_band_name(self, target): """Get the band name for the target.""" spw_virtual = target['spw'].split(',')[0] ms = self.inputs.context.observing_run.get_ms(target['vis'][0]) spw_real = self.inputs.context.observing_run.virtual2real_spw_id(spw_virtual, ms) if self.inputs.context.project_summary.telescope in ('VLA', 'JVLA', 'EVLA'): spw2band = ms.get_vla_spw2band() band_name = spw2band[ms.get_spectral_windows(spw_real)[0].id]+' band' else: band_name = ms.get_spectral_windows(spw_real)[0].band return band_name @staticmethod def _copy_xml_files(vis, outputvis): for xml_filename in ['SpectralWindow.xml', 'DataDescription.xml']: vis_source = os.path.join(vis, xml_filename) outputvis_targets_contline = os.path.join(outputvis, xml_filename) if os.path.exists(vis_source) and os.path.exists(outputvis): LOG.info('Copying %s from original MS to science targets cont+line MS', xml_filename) LOG.trace('Copying %s: %s to %s', xml_filename, vis_source, outputvis_targets_contline) shutil.copyfile(vis_source, outputvis_targets_contline)
[docs] @staticmethod def get_desired_width(meanfreq): """Get the desired channel width for the given mean frequency.""" if meanfreq >= 50.0e9: chanwidth = 15.625e6 elif (meanfreq < 50.0e9) and (meanfreq >= 40.0e9): chanwidth = 16.0e6 elif (meanfreq < 40.0e9) and (meanfreq >= 26.0e9): chanwidth = 8.0e6 elif (meanfreq < 26.0e9) and (meanfreq >= 18.0e9): chanwidth = 16.0e6 elif (meanfreq < 18.0e9) and (meanfreq >= 8.0e9): chanwidth = 8.0e6 elif (meanfreq < 8.0e9) and (meanfreq >= 4.0e9): chanwidth = 4.0e6 elif (meanfreq < 4.0e9) and (meanfreq >= 2.0e9): chanwidth = 4.0e6 elif (meanfreq < 4.0e9): chanwidth = 2.0e6 return chanwidth
[docs] @staticmethod def get_spw_chanbin(bwarray, chanarray, chanwidth=15.625e6): """Calculate the number of channels to average over for each spw. note: mstransform only accept chanbin as integer. """ avgarray = [1]*len(bwarray) for idx, bw in enumerate(bwarray): nchan = bw/chanwidth nchan = max(np.round(nchan), 1.0) avgarray[idx] = int(chanarray[idx]/nchan) if avgarray[idx] < 1.0: avgarray[idx] = 1 return avgarray
def _restore_flags(self, vislist): """Restore the before lineflagging flag state, after splitting per_cleantarget tmp MS.""" for vis in vislist: # restore to the starting flags # self._executable.initweights(vis=vis, wtmode='delwtsp') # remove channelized weights if os.path.exists(vis+".flagversions/flags.before_hif_selfcal"): self._executable.flagmanager(vis=vis, mode='restore', versionname='before_hif_selfcal', comment='Flag states before hif_selfcal') def _flag_lines(self, vislist): """Flag the lines when cont.dat is present, before splitting per_cleantarget tmp MS.""" for vis in vislist: # self._executable.initweights(vis=vis, wtmode='weight', dowtsp=True) # initialize channelized weights # save starting flags or restore to the starting flags if os.path.exists(vis+".flagversions/flags.before_hif_selfcal"): self._executable.flagmanager(vis=vis, mode='restore', versionname='before_hif_selfcal', comment='Flag states before hif_selfcal') else: self._executable.flagmanager(vis=vis, mode='save', versionname='before_hif_selfcal') # note that contfile_to_chansel will do the virtual2real spw translation automatically. lines_sel_dict = contfile_to_chansel( vis, self.inputs.context, contfile=self.inputs.contfile, excludechans=True) for field, lines_sel in lines_sel_dict.items(): LOG.info("Flagging lines in field {} with the spw selection {}".format(field, lines_sel)) # lines_sel_dict is keyed with original field names, which might not be safe for CASA/field # selections. self._executable.flagdata(vis=vis, field=utils.fieldname_for_casa(field), mode='manual', spw=lines_sel, flagbackup=False, action='apply')