Source code for pipeline.hif.tasks.makeimages.makeimages

import os
import tempfile
from inspect import signature

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.daskhelpers as daskhelpers
import pipeline.infrastructure.mpihelpers as mpihelpers
import pipeline.infrastructure.pipelineqa as pqa
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp

from pipeline.domain import DataType
from pipeline.h.tasks.common.sensitivity import Sensitivity
from pipeline.infrastructure import casa_tools, exceptions, task_registry

from ..tclean import Tclean
from ..tclean.resultobjects import TcleanResult
from .resultobjects import MakeImagesResult
import numpy as np

LOG = infrastructure.get_logger(__name__)


class MakeImagesInputs(vdp.StandardInputs):
    # Search order of input vis
    processing_data_type = [DataType.SELFCAL_LINE_SCIENCE, DataType.REGCAL_LINE_SCIENCE, DataType.SELFCAL_CONTLINE_SCIENCE, DataType.REGCAL_CONTLINE_SCIENCE, DataType.REGCAL_CONTLINE_ALL, DataType.RAW]

    calcsb = vdp.VisDependentProperty(default=False)
    cleancontranges = vdp.VisDependentProperty(default=False)
    hm_cleaning = vdp.VisDependentProperty(default='')
    hm_cyclefactor = vdp.VisDependentProperty(default=-999.0)
    hm_nmajor = vdp.VisDependentProperty(default=None)
    hm_dogrowprune = vdp.VisDependentProperty(default=None)
    hm_growiterations = vdp.VisDependentProperty(default=-999)
    hm_lownoisethreshold = vdp.VisDependentProperty(default=-999.0)
    hm_masking = vdp.VisDependentProperty(default=None)
    hm_minbeamfrac = vdp.VisDependentProperty(default=-999.0)
    hm_minpercentchange = vdp.VisDependentProperty(default=-999.0)
    hm_minpsffraction = vdp.VisDependentProperty(default=-999.0)
    hm_maxpsffraction = vdp.VisDependentProperty(default=-999.0)
    hm_fastnoise = vdp.VisDependentProperty(default=None)
    hm_nsigma = vdp.VisDependentProperty(default=0.0)
    hm_perchanweightdensity = vdp.VisDependentProperty(default=None)
    hm_npixels = vdp.VisDependentProperty(default=0)
    hm_negativethreshold = vdp.VisDependentProperty(default=-999.0)
    hm_noisethreshold = vdp.VisDependentProperty(default=-999.0)
    hm_sidelobethreshold = vdp.VisDependentProperty(default=-999.0)
    hm_weighting = vdp.VisDependentProperty(default=None)
    masklimit = vdp.VisDependentProperty(default=2.0)
    parallel = vdp.VisDependentProperty(default='automatic')
    tlimit = vdp.VisDependentProperty(default=2.0)
    drcorrect = vdp.VisDependentProperty(default=-999.0)
    overwrite_on_export = vdp.VisDependentProperty(default=True)
    vlass_plane_reject_im = vdp.VisDependentProperty(default=True)

    @vlass_plane_reject_im.postprocess
    def vlass_plane_reject_im(self, unprocessed):
        """Convert the allowed argument input datatype to the dictionary form used by the task."""
        vlass_plane_reject_dict = {
            'apply': True, 'exclude_spw': '', 'flagpct_thresh': 0.8, 'beamdev_thresh': 0.2}
        if isinstance(unprocessed, dict):
            vlass_plane_reject_dict.update(unprocessed)
        if isinstance(unprocessed, bool):
            vlass_plane_reject_dict['apply'] = unprocessed
        LOG.debug("convert the task input of 'vlass_plane_reject_im' from %r to %r.",
                  unprocessed, vlass_plane_reject_dict)
        return vlass_plane_reject_dict

    @vdp.VisDependentProperty(null_input=['', None, {}])
    def target_list(self):
        return self.context.clean_list_pending

    @tlimit.convert
    def tlimit(self, tlimit):
        if tlimit <= 0.0:
            raise ValueError('tlimit values must be larger than 0.0')
        else:
            return tlimit

    # docstring and type hints: supplements hif_makeimages
    def __init__(self, context, output_dir=None, vis=None, target_list=None,
                 hm_masking=None, hm_sidelobethreshold=None, hm_noisethreshold=None,
                 hm_lownoisethreshold=None, hm_negativethreshold=None, hm_minbeamfrac=None, hm_growiterations=None,
                 hm_dogrowprune=None, hm_minpercentchange=None, hm_fastnoise=None, hm_nsigma=None,
                 hm_perchanweightdensity=None, hm_npixels=None, hm_cyclefactor=None, hm_nmajor=None, hm_minpsffraction=None,
                 hm_maxpsffraction=None, hm_weighting=None, hm_cleaning=None, tlimit=None, drcorrect=None, masklimit=None,
                 cleancontranges=None, calcsb=None, hm_mosweight=None, overwrite_on_export=None, vlass_plane_reject_im=None,
                 parallel=None,
                 # Extra parameters
                 ):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            output_dir: Output directory.
                Defaults to None, which corresponds to the current working directory.

            vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the <hifa,hifv>_importdata task.
                '': use all MeasurementSets in the context
                Examples: 'ngc5921.ms', ['ngc5921a.ms', ngc5921b.ms', 'ngc5921c.ms']

            target_list: Dictionary specifying targets to be imaged; blank will read list from context

            hm_masking: Clean masking mode. Options are 'centralregion', 'auto', 'manual' and 'none'

            hm_sidelobethreshold: sidelobethreshold * the max sidelobe level

            hm_noisethreshold: noisethreshold * rms in residual image

            hm_lownoisethreshold: lownoisethreshold * rms in residual image

            hm_negativethreshold: negativethreshold * rms in residual image

            hm_minbeamfrac: Minimum beam fraction for pruning

            hm_growiterations: Number of binary dilation iterations for growing the mask

            hm_dogrowprune: Do pruning on the grow mask. Defaults to '' to enable the automatic heuristics calculation.
                Can be set to True or False manually.

            hm_minpercentchange: Mask size change threshold

            hm_fastnoise: Faster noise calculation for automask or nsigma stopping. Defaults to '' to enable the automatic heuristics calculation.
                Can be set to True or False manually.

            hm_nsigma: Multiplicative factor for rms-based threshold stopping

            hm_perchanweightdensity: Calculate the weight density for each channel independently. Defaults to '' to enable the automatic heuristics calculation.
                Can be set to True or False manually.

            hm_npixels: Number of pixels to determine uv-cell size for super-uniform weighting

            hm_cyclefactor: Scaling on PSF sidelobe level to compute the minor-cycle stopping threshold

            hm_nmajor: Controls the maximum number of major cycles to evaluate.

            hm_minpsffraction: PSF fraction that marks the max depth of cleaning in the minor cycle

            hm_maxpsffraction: PSF fraction that marks the minimum depth of cleaning in the minor cycle

            hm_weighting: Weighting scheme (natural,uniform,briggs,briggsabs[experimental],briggsbwtaper[experimental])

            hm_cleaning: Pipeline cleaning mode

            tlimit: Times the sensitivity limit for cleaning

            drcorrect: Override the default heuristics-based DR correction (for ALMA data only)

            masklimit: Times good mask pixels for cleaning

            cleancontranges: Clean continuum frequency ranges in cubes

            calcsb: Force (re-)calculation of sensitivities and beams

            hm_mosweight: Mosaic weighting Defaults to '' to enable the automatic heuristics calculation.
                Can be set to True or False manually.

            overwrite_on_export: Replace existing image products when h/hifa/hifv_exportdata is called.
                If False, images that would have the same FITS name on export,
                are amended to include a version number.  For example, if
                oussid.J1248-4559_ph.spw21.mfs.I.pbcor.fits would already be
                exported by a previous call to hif_makeimags, then
                'oussid.J1248-4559_ph.spw21.mfs.I.pbcor.v2.fits' would also be
                exported to the products/ directory. The first exported
                product retains the same name.  Additional products start
                counting with 'v2', 'v3', etc.

            vlass_plane_reject_im: Only used for the 'VLASS-SE-CUBE' imaging mode.

                Default: True

                If True, reject VLASS Coarse Cube planes with high flagging percentages or outlier beam sizes (see the heuristics details below)

                If False, do not perform the post-imaging VLASS Coarse Cube plane rejection.
                If the input value is a dictionary, the plane rejection heuristics will be performed with custom thresholds.
                The optional keys could be:

                - exclude_spw, default: ''
                  Spectral windows to be excluded from the VLASS Coarse Cube post-imaging plane rejection consideration, i.e. always preserve.
                - flagpct_thresh, default: 0.8
                  The flagging percentage across the entire mosaic to be considered to be high flagging level for the plane rejection.
                - beamdev_thresh: default: 0.2
                  Threshold for the fractional beam deviation from the expected value required for the plane rejection.

            parallel: Use CASA/tclean built-in parallel imaging for individual scientific targets, or, perform continuum imaging
                of multiple target (calibrators) concurrently without the CASA/tclean built-in parallelization.

                Options: ``'automatic'``, ``'true'``, ``'false'``, ``True``, ``False``

                Default: ``'automatic'`` - optimizes the parallelization mode based on the imaging target type
                    (scientific targets vs. calibrators) and the specific operation.
        """
        self.context = context
        self.output_dir = output_dir
        self.vis = vis

        self.target_list = target_list
        self.hm_masking = hm_masking
        self.hm_sidelobethreshold = hm_sidelobethreshold
        self.hm_noisethreshold = hm_noisethreshold
        self.hm_lownoisethreshold = hm_lownoisethreshold
        self.hm_negativethreshold = hm_negativethreshold
        self.hm_minbeamfrac = hm_minbeamfrac
        self.hm_growiterations = hm_growiterations
        self.hm_dogrowprune = hm_dogrowprune
        self.hm_minpercentchange = hm_minpercentchange
        self.hm_fastnoise = hm_fastnoise
        self.hm_nsigma = hm_nsigma
        self.hm_perchanweightdensity = hm_perchanweightdensity
        self.hm_npixels = hm_npixels
        self.hm_cleaning = hm_cleaning
        self.hm_cyclefactor = hm_cyclefactor
        self.hm_nmajor = hm_nmajor
        self.hm_minpsffraction = hm_minpsffraction
        self.hm_maxpsffraction = hm_maxpsffraction
        self.hm_weighting = hm_weighting
        self.tlimit = tlimit
        self.drcorrect = drcorrect
        self.masklimit = masklimit
        self.cleancontranges = cleancontranges
        self.calcsb = calcsb
        self.hm_mosweight = hm_mosweight
        self.parallel = parallel
        self.overwrite_on_export = overwrite_on_export
        self.vlass_plane_reject_im = vlass_plane_reject_im


# tell the infrastructure to give us mstransformed data when possible by
# registering our preference for imaging measurement sets
# api.ImagingMeasurementSetsPreferred.register(MakeImagesInputs)


[docs] @task_registry.set_equivalent_casa_task('hif_makeimages') @task_registry.set_casa_commands_comment('A list of target sources is cleaned.') class MakeImages(basetask.StandardTaskTemplate): Inputs = MakeImagesInputs is_multi_vis_task = True
[docs] def prepare(self): inputs = self.inputs result = MakeImagesResult() result.overwrite = inputs.overwrite_on_export # Carry any message from hif_makeimlist (e.g. for missing PI cube target) result.set_info(inputs.context.clean_list_info) # Check for size mitigation errors. if 'status' in inputs.context.size_mitigation_parameters: if inputs.context.size_mitigation_parameters['status'] == 'ERROR': result.mitigation_error = True return result # make sure inputs.vis is a list, even it is one that contains a # single measurement set if not isinstance(inputs.vis, list): inputs.vis = [inputs.vis] with CleanTaskFactory(inputs, self._executor) as factory: task_queue = [(target, factory.get_task(target)) for target in inputs.target_list] for (target, task) in task_queue: try: worker_result = task.get_result() except exceptions.PipelineException as ex: error_msg = ('Cleaning failure for field {!s}, intent {!s}, specmode {!s}, spw {!s}. ' 'Exception from hif_tclean: {!s}'.format( target['field'], target['intent'], target['specmode'], target['spw'], ex)) worker_result = TcleanResult() worker_result.qa.pool.append(pqa.QAScore(0.34, longmsg=error_msg, shortmsg='Cleaning failure')) result.add_result(worker_result, target, outcome='failure') LOG.error(error_msg) else: # Note add_result() removes 'heuristics' from worker_result heuristics = target['heuristics'] result.add_result(worker_result, target, outcome='success') # Export RMS of sources if self._is_target_for_sensitivity(worker_result, heuristics): s = self._get_image_rms_as_sensitivity(worker_result, target, heuristics) if s is not None: result.sensitivities_for_aqua.append(s) del heuristics # set of descriptions if inputs.context.clean_list_info.get('msg', '') != '': target_list = [inputs.context.clean_list_info] else: target_list = inputs.target_list description = { # map specmode to description for every clean target _get_description_map(target['intent'], target['stokes']).get(target['specmode'], 'Calculate clean products') for target in target_list } result.metadata['long description'] = ' / '.join(sorted(description)) sidebar = { # map specmode to description for every clean target _get_sidebar_map(target['intent'], target['stokes']).get(target['specmode'], '') for target in target_list } result.metadata['sidebar suffix'] = '/'.join(sidebar) return result
[docs] def analyse(self, result): if self.inputs.context.imaging_mode is not None and self.inputs.context.imaging_mode.startswith('VLASS-'): result = self._add_vlass_metadata(result) return result
def _add_vlass_metadata(self, result): """Attach extra imaging metadata to Tclean results for the VLASS Coarse Cube imaging.""" if self.inputs.context.imaging_mode != 'VLASS-SE-CUBE': for idx, tclean_result in enumerate(result.results): target = result.targets[idx] tclean_result.imaging_metadata['cutout_imsize'] = (target['misc_vlass'] or {}).get('cutout_imsize') return result vlass_plane_reject_keys_allowed = [ 'apply', 'exclude_spw', 'flagpct_thresh', 'beamdev_thresh'] for k in self.inputs.vlass_plane_reject_im: if k not in vlass_plane_reject_keys_allowed: LOG.warning("The key %r in the 'vlass_plane_reject_im' task input dictionary is not expected and will be ignored.", k) beamdev_thresh = self.inputs.vlass_plane_reject_im['beamdev_thresh'] flagpct_thresh = self.inputs.vlass_plane_reject_im['flagpct_thresh'] bmajor_list = [] bminor_list = [] bpa_list = [] freq_list = [] spwgroup_list = [] flagpct_list = [] ref_idx = None plane_keep = None # loop over all Tclean results to attach the imaging metadata # The imaging metadata will be entiredly merged into context.scimlist and context.calimlist # as the ImageItem instance 'metadata' attribute. for idx, tclean_result in enumerate(result.results): target = result.targets[idx] imaging_metadata = { 'keep': False, # Flagging percentage of a VLASS-SE-CUBE plane within a 1deg^2 box. 'flagpct': target['misc_vlass']['flagpct'], 'spw': target['spw'], 'freq': float(target['reffreq'].replace('GHz', '')), 'beam': [None, None, None], } if isinstance(tclean_result.image, str): ext = '.tt0' if tclean_result.multiterm else '' psf_name = tclean_result.psf+ext if os.path.exists(psf_name): with casa_tools.ImagepolReader(psf_name) as imagepol: img_stokesi = imagepol.stokesi() restoringbeam = img_stokesi.restoringbeam(polarization=0) imaging_metadata['beam'] = [restoringbeam['major']['value'], restoringbeam['minor']['value'], restoringbeam['positionangle']['value']] imaging_metadata['keep'] = True bmajor_list.append(imaging_metadata['beam'][0]) bminor_list.append(imaging_metadata['beam'][1]) bpa_list.append(imaging_metadata['beam'][2]) freq_list.append(imaging_metadata['freq']) spwgroup_list.append(imaging_metadata['spw']) flagpct_list.append(imaging_metadata['flagpct']) tclean_result.imaging_metadata.update(imaging_metadata) # update tclean_result.imaging_metadata['keep'] based on the beam size and flagging percentage if bminor_list: bminor_array = np.array(bminor_list) freq_array = np.array(freq_list) weighted_values = bminor_array * freq_array sorted_indices = np.argsort(weighted_values) ref_idx = sorted_indices[len(bminor_list) // 2] bmajor_expected = ( bmajor_list[ref_idx] * freq_list[ref_idx] / np.array(freq_list) ) bminor_expected = ( bminor_list[ref_idx] * freq_list[ref_idx] / np.array(freq_list) ) c1 = bmajor_expected * (1.0 - beamdev_thresh) < np.array(bmajor_list) c2 = bmajor_expected * (1.0 + beamdev_thresh) > np.array(bmajor_list) c3 = bminor_expected * (1.0 - beamdev_thresh) < np.array(bminor_list) c4 = bminor_expected * (1.0 + beamdev_thresh) > np.array(bminor_list) c5 = (np.array(flagpct_list) < flagpct_thresh) spwgroup_keep = [False]*len(spwgroup_list) for idx, spwgroup in enumerate(spwgroup_list): is_spwgroup_excluded = set(map(str.strip, spwgroup.split(','))) & set( map(str.strip, self.inputs.vlass_plane_reject_im['exclude_spw'].split(','))) if is_spwgroup_excluded or not self.inputs.vlass_plane_reject_im['apply']: spwgroup_keep[idx] = True plane_keep = c1 & c2 & c3 & c4 & c5 | np.array(spwgroup_keep) # create a lookup dict for the plane rejection info plane_keep_dict = {spwgroup: plane_keep[idx] for idx, spwgroup in enumerate(spwgroup_list)} for idx, tclean_result in enumerate(result.results): target_spw = result.targets[idx]['spw'] if target_spw in plane_keep_dict: tclean_result.imaging_metadata['keep'] = plane_keep_dict[target_spw] self._vlass_cube_set_miscinfo(tclean_result) # attched the metadata w.r.t the plane rejection for the plane rejection plot. result.metadata['vlass_cube_metadata'] = {'bmajor_list': np.array(bmajor_list), 'bminor_list': np.array(bminor_list), 'bpa_list': np.array(bpa_list), 'freq_list': np.array(freq_list), 'spwgroup_list': spwgroup_list, 'flagpct_list': np.array(flagpct_list), 'beam_dev': beamdev_thresh, 'ref_idx': ref_idx, 'flagpct_threshold': flagpct_thresh, 'plane_keep': plane_keep} return result def _vlass_cube_set_miscinfo(self, tclean_result): """Add the VLASS cube plane rejection header keyword.""" imagename = tclean_result.image reject = not tclean_result.imaging_metadata['keep'] imlist = utils.glob_ordered(imagename.replace('.image', '.*')) for name in imlist: with casa_tools.ImageReader(name) as image: info = image.miscinfo() info['VLASSRJ'] = reject LOG.info('mark the image %s as reject=%r', name, reject) image.setmiscinfo(info) def _is_target_for_sensitivity(self, clean_result, heuristics): """ Returns True if the clean target is one to export image sensitivity Conditions to export image sensitivities are - cubes generated by SRDP ALMA image cube recipe (specmode='cube') - all target images for ALMA if not SRDP - reprSrc and reprSpw (all specmode) """ # SRDP ALMA optimized cube images if self.inputs.context.project_structure.recipe_name == 'hifa_cubeimage': # only cubes return clean_result.specmode == 'cube' # ALMA pipeline if heuristics.imaging_mode == 'ALMA': return clean_result.intent == 'TARGET' # VLA pipeline # note: Need to check are their any conditions to # export image sensitivities for VLA if heuristics.imaging_mode == 'VLA': return True # Representative source and SpW _, repr_source, repr_spw, _, _, _, _, _, _, _ = heuristics.representative_target() if str(repr_spw) in clean_result.spw.split(',') and repr_source == utils.dequote(clean_result.sourcename): return True # Don't export image sensitivity for the other clean targets return False def _get_image_rms_as_sensitivity(self, result, target, heuristics): if not result.image: return None extension = 'tt0.' if result.multiterm else '' # Needed when nterms=2, see PIPE-1361 # the tt0 needs to be inserted before the ending ".pbcor" in the image name index = result.image.find('pbcor') imname = result.image[:index] + extension + result.image[index:] if not os.path.exists(imname): return None cqa = casa_tools.quanta cell = target['cell'][0:2] if len(target['cell']) >= 2 else (target['cell'][0], target['cell'][0]) # Image beam with casa_tools.ImageReader(imname) as image: restoringbeam = image.restoringbeam() csys = image.coordsys() chanwidth_of_image = csys.increment(format='q', type='spectral')['quantity']['*1'] csys.done() # effectiveBW if result.specmode == 'cube': # use nbin for cube and repBW msobj = self.inputs.context.observing_run.get_ms(name=result.vis[0]) nbin = target['nbin'] if target['nbin'] > 0 else 1 SCF, physicalBW_of_1chan, effectiveBW_of_1chan, _ = heuristics.get_bw_corr_factor(msobj, result.spw, nbin) effectiveBW_of_image = cqa.quantity(nbin / SCF**2 * effectiveBW_of_1chan, 'Hz') else: #continuum mode effectiveBW_of_image = result.aggregate_bw # antenna array (aligned definition with imageprecheck) diameters = list(heuristics.antenna_diameters().keys()) array = ('%dm' % min(diameters)) # Check if this sensitivity is for the representative source and SpW if heuristics.imaging_mode == 'VLA': is_representative = False else: _, repr_source, repr_spw, _, _, _, _, _, _, _ = heuristics.representative_target() if str(repr_spw) in result.spw.split(',') and repr_source == utils.dequote(result.sourcename): is_representative = True else: is_representative = False # Sensitivities are currently reported for Stokes I only. For IQUV imaging the # correct values have to be fetched from the new parameters since the previous # ones will contain mixtures of I, Q, U and V due to the "axes" parameter in # the ia.statistics() calls (PIPE-2464). TODO: Refactor the code to have just # one set of statistical parameters. if result.stokes == 'IQUV': image_rms = result.image_rms_iquv[0] image_min = result.image_min_iquv[0] image_max = result.image_max_iquv[0] else: image_rms = result.image_rms image_min = result.image_min image_max = result.image_max return Sensitivity(array=array, intent=target['intent'], field=target['field'], spw=result.spw, is_representative=is_representative, bandwidth=chanwidth_of_image, effective_bw=effectiveBW_of_image, bwmode=result.hm_specmode, beam=restoringbeam, cell=cell, robust=target['robust'], uvtaper=target['uvtaper'], theoretical_sensitivity=cqa.quantity(result.sensitivity, 'Jy/beam'), observed_sensitivity=cqa.quantity(image_rms, 'Jy/beam'), pbcor_image_min=cqa.quantity(image_min, 'Jy/beam'), pbcor_image_max=cqa.quantity(image_max, 'Jy/beam'), imagename=result.image.replace('.pbcor', ''), datatype=result.datatype)
class CleanTaskFactory: def __init__(self, inputs, executor): self.__inputs = inputs self.__context = inputs.context self.__executor = executor self.__context_path = None def __enter__(self): # If there's a possibility that we'll submit MPI jobs, save the context # to disk ready for import by the MPI servers. if mpihelpers.mpiclient or daskhelpers.daskclient: # Use the tempfile module to generate a unique temporary filename, # which we use as the output path for our pickled context tmpfile = tempfile.NamedTemporaryFile(suffix='.context', dir=self.__context.output_dir, delete=True) self.__context_path = tmpfile.name tmpfile.close() self.__context.save(self.__context_path) return self def __exit__(self, exc_type, exc_val, exc_tb): if self.__context_path and os.path.exists(self.__context_path): os.unlink(self.__context_path) def get_task(self, target): """ Create and return a SyncTask or AsyncTask for the clean job required to produce the clean target. The current algorithm generates Tier 0 clean jobs for calibrator images (=AsyncTask) and Tier 1 clean jobs for target images (=SyncTask). :param target: a clean job definition generated by MakeImList :return: a SyncTask or AsyncTask """ task_args = self.__get_task_args(target) parallel_wanted = mpihelpers.parse_parallel_input_parameter(self.__inputs.parallel) # exclude target imaging from tier0 in general parallel_wanted = parallel_wanted and 'TARGET' not in target['intent'] # PIPE-1923 asks to temporarily turn off Tier-0 mode for # POLARIZATION intent when imaging IQUV because of a # potential CASA bug. This should be undone when this # bug is fixed. if target['intent'] == 'POLARIZATION' and target['stokes'] == 'IQUV': parallel_wanted = False LOG.info('Temporarily turning off Tier-0 parallelization for Stokes IQUV polarization calibrator imaging (PIPE-1923).') # PIPE-1401: turn on the tier0 parallelization for individuals planes in the VLASS coarse cube imaging # Also see the disscussions in PIPE-1357 vlass_se_cube_tier0_wanted = True is_vlass_se_cube = 'TARGET' in target['intent'] and self.__context.imaging_mode == 'VLASS-SE-CUBE' if vlass_se_cube_tier0_wanted and is_vlass_se_cube: parallel_wanted = True # we check/remove the task_arg dictionary keys that are not required by Tclean.Inputs # then clean_targets objects would be free to carry extra metedata without causing problems during # hif_tclean task execuation. task_inputs_args = list(signature(Tclean.Inputs).parameters) task_args = {k: v for k, v in task_args.items() if k in task_inputs_args} LOG.debug('MakeImages Parallelization Config:') LOG.debug('parallel_wanted: %s', parallel_wanted) LOG.debug('daskhelpers.is_dask_ready(): %s', daskhelpers.is_dask_ready()) LOG.debug('mpihelpers.is_mpi_ready() %s', mpihelpers.is_mpi_ready()) if parallel_wanted and daskhelpers.is_dask_ready(): task_args['parallel'] = False executable = mpihelpers.Tier0PipelineTask(Tclean, task_args, self.__context_path) return daskhelpers.FutureTask(executable) elif parallel_wanted and mpihelpers.is_mpi_ready(): task_args['parallel'] = False executable = mpihelpers.Tier0PipelineTask(Tclean, task_args, self.__context_path) return mpihelpers.AsyncTask(executable) else: inputs = Tclean.Inputs(self.__context, **task_args) task = Tclean(inputs) return mpihelpers.SyncTask(task, self.__executor) def __get_task_args(self, target): inputs = self.__inputs parallel_wanted = mpihelpers.parse_parallel_input_parameter(inputs.parallel) # Request Tier 1 tclean parallelisation if the user requested it, this # is science target imaging, and we have either an MPI client or a Dask # cluster available. With Dask, casa_tasks._tclean will dispatch each # tclean call to its own MPI subprocess via run_in_mpicasa; nproc is # derived from the active Dask worker count to avoid oversubscription. parallel = all([ parallel_wanted, 'TARGET' in target['intent'], mpihelpers.is_mpi_ready() or daskhelpers.is_dask_ready(), ]) image_heuristics = target['heuristics'] task_args = dict(target) task_args.update({ 'output_dir': inputs.output_dir, 'vis': inputs.vis, # set the weighting type 'weighting': inputs.hm_weighting, # other vals 'tlimit': inputs.tlimit, 'masklimit': inputs.masklimit, 'cleancontranges': inputs.cleancontranges, 'calcsb': inputs.calcsb, 'parallel': parallel, 'hm_perchanweightdensity': inputs.hm_perchanweightdensity, 'hm_npixels': inputs.hm_npixels, 'restoringbeam': image_heuristics.restoringbeam(), }) if 'hm_nsigma' not in task_args: task_args['hm_nsigma'] = inputs.hm_nsigma if inputs.drcorrect not in (None, -999.0): task_args['drcorrect'] = inputs.drcorrect if target['robust'] not in (None, -999.0): task_args['robust'] = target['robust'] else: task_args['robust'] = image_heuristics.robust(sepecmode=target['specmode']) if target['uvtaper']: task_args['uvtaper'] = target['uvtaper'] else: task_args['uvtaper'] = image_heuristics.uvtaper() # set the imager mode here (temporarily ...) if target['gridder'] is not None: task_args['gridder'] = target['gridder'] else: task_args['gridder'] = image_heuristics.gridder( task_args['intent'], task_args['field']) if inputs.hm_masking in (None, ''): if 'TARGET' in task_args['intent']: if task_args['stokes'] == 'IQUV': if task_args['mask'] not in (None, ''): # "re-use" is a hidden mode just for the special use case # of re-using a previously computed Stokes I mask. task_args['hm_masking'] = 're-use' else: task_args['hm_masking'] = 'none' elif task_args['mask'] not in (None, ''): task_args['hm_masking'] = 'manual' else: task_args['hm_masking'] = 'auto' elif task_args['intent'] == 'POLARIZATION' and task_args['stokes'] == 'IQUV': task_args['hm_masking'] = 'centralregion' else: task_args['hm_masking'] = 'auto' else: if inputs.hm_masking.lower() in ('auto', 'centralregion', 'manual', 'none'): task_args['hm_masking'] = inputs.hm_masking.lower() else: raise Exception(f'Masking mode {inputs.hm_masking} unknown.') if inputs.hm_masking == 'auto': task_args['hm_sidelobethreshold'] = inputs.hm_sidelobethreshold task_args['hm_noisethreshold'] = inputs.hm_noisethreshold task_args['hm_lownoisethreshold'] = inputs.hm_lownoisethreshold task_args['hm_negativethreshold'] = inputs.hm_negativethreshold task_args['hm_minbeamfrac'] = inputs.hm_minbeamfrac task_args['hm_growiterations'] = inputs.hm_growiterations task_args['hm_dogrowprune'] = inputs.hm_dogrowprune task_args['hm_minpercentchange'] = inputs.hm_minpercentchange task_args['hm_fastnoise'] = inputs.hm_fastnoise if inputs.hm_cleaning == '': if task_args['threshold'] not in (None, ''): task_args['hm_cleaning'] = 'manual' else: task_args['hm_cleaning'] = 'rms' else: task_args['hm_cleaning'] = inputs.hm_cleaning if target['vis']: task_args['vis'] = target['vis'] if target['is_per_eb']: task_args['is_per_eb'] = target['is_per_eb'] if inputs.hm_mosweight not in (None, ''): task_args['mosweight'] = inputs.hm_mosweight elif target['mosweight'] not in (None, ''): task_args['mosweight'] = target['mosweight'] else: task_args['mosweight'] = image_heuristics.mosweight(task_args['intent'], task_args['field']) if inputs.hm_cyclefactor not in (None, -999.0): # The tclean task argument was already called "cyclefactor" # before hm_cyclefactor was exposed in hif_makeimages. To # keep compatibility with hif_editimlist and cleantarget.py # we keep the name now. Could be refactored later. task_args['cyclefactor'] = inputs.hm_cyclefactor if inputs.hm_nmajor not in (None, -999.0): task_args['nmajor'] = inputs.hm_nmajor if inputs.hm_minpsffraction not in (None, -999.0): task_args['hm_minpsffraction'] = inputs.hm_minpsffraction if inputs.hm_maxpsffraction not in (None, -999.0): task_args['hm_maxpsffraction'] = inputs.hm_maxpsffraction return task_args def _get_description_map(intent, stokes): if intent in ('PHASE', 'BANDPASS', 'AMPLITUDE'): return { 'mfs': 'Make calibrator images', 'cont': 'Make calibrator images' } elif intent in ('POLARIZATION', 'POLANGLE', 'POLLEAKAGE'): return { 'mfs': 'Make polarization calibrator images', 'cont': 'Make polarization calibrator images' } elif intent in ('DIFFGAINREF', 'DIFFGAINSRC'): return { 'mfs': 'Make diffgain calibrator images', 'cont': 'Make diffgain calibrator images' } elif intent == 'CHECK': return { 'mfs': 'Make check source images', 'cont': 'Make check source images' } elif intent == 'TARGET': if stokes.upper() in ('', 'I'): return { 'mfs': 'Make target per-spw continuum images', 'cont': 'Make target aggregate continuum images', 'cube': 'Make target cubes', 'repBW': 'Make representative bandwidth target cube' } elif stokes.upper() == 'IQUV': return { 'mfs': 'Make target fullpol per-spw continuum images', 'cont': 'Make target fullpol aggregate continuum images', 'cube': 'Make target fullpol cubes', 'repBW': 'Make fullpol representative bandwidth target cube' } else: raise Exception(f'Unknown Stokes value "{stokes}"') else: return {} def _get_sidebar_map(intent, stokes): if intent in ('PHASE', 'BANDPASS', 'AMPLITUDE', 'DIFFGAINREF', 'DIFFGAINSRC'): return { 'mfs': 'cals', 'cont': 'cals' } elif intent in ('POLARIZATION', 'POLANGLE', 'POLLEAKAGE'): return { 'mfs': 'pol', 'cont': 'pol' } elif intent == 'CHECK': return { 'mfs': 'checksrc', 'cont': 'checksrc' } elif intent == 'TARGET': if stokes.upper() in ('', 'I'): return { 'mfs': 'mfs', 'cont': 'cont', 'cube': 'cube', 'repBW': 'cube_repBW' } elif stokes.upper() == 'IQUV': return { 'mfs': 'mfs_fullpol', 'cont': 'cont_fullpol', 'cube': 'cube_fullpol', 'repBW': 'cube_repBW_fullpol' } else: raise Exception(f'Unknown Stokes value "{stokes}"') else: return {}