Source code for pipeline.hif.tasks.findcont.findcont

import collections
import copy
import os

import numpy as np

import pipeline.domain.measures as measures
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.contfilehandler as contfilehandler
import pipeline.infrastructure.mpihelpers as mpihelpers
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.domain import DataType
from pipeline.hif.heuristics import findcont
from pipeline.infrastructure import casa_tasks, casa_tools, task_registry

from .resultobjects import FindContResult

LOG = infrastructure.get_logger(__name__)


class FindContInputs(vdp.StandardInputs):
    # Must use empty data type list to allow for user override and
    # automatic determination depending on specmode, field and spw.
    processing_data_type = []

    hm_perchanweightdensity = vdp.VisDependentProperty(default=None)
    hm_weighting = vdp.VisDependentProperty(default=None)
    datacolumn = vdp.VisDependentProperty(default='')
    parallel = vdp.VisDependentProperty(default='automatic')

    @vdp.VisDependentProperty(null_input=['', None, {}])
    def target_list(self):
        # Note that the deepcopy is necessary to avoid changing the
        # context's clean_list inadvertently when removing the heuristics
        # objects from the inputs' clean_list.
        return copy.deepcopy(self.context.clean_list_pending)

    # docstring and type hints: supplements hif_findcont
    def __init__(self, context, output_dir=None, vis=None, target_list=None, hm_mosweight=None,
                 hm_perchanweightdensity=None, hm_weighting=None, datacolumn=None, parallel=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            output_dir: Output directory.
                Defaults to None, which corresponds to the current working directory.

            vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the <hifa,hifv>_importdata task.
                '': use all MeasurementSets in the context

                Examples: 'ngc5921.ms', ['ngc5921a.ms', ngc5921b.ms', 'ngc5921c.ms']

            target_list: Dictionary specifying targets to be imaged; blank will read list from context.

            hm_mosweight: Mosaic weighting. Defaults to '' to enable the automatic heuristics calculation.
                Can be set to True or False manually.

            hm_perchanweightdensity: Calculate the weight density for each channel independently.
                Defaults to '' to enable the automatic heuristics calculation. Can be set to True or False manually.

            hm_weighting: Weighting scheme (natural,uniform,briggs,briggsabs[experimental],briggsbwtaper[experimental])

            datacolumn: Data column to image. Only to be used for manual overriding when the automatic choice by data type is not appropriate.

            parallel: Use CASA/tclean built-in parallel imaging when possible.

                Options: ``'automatic'``, ``'true'``, ``'false'``, ``True``, ``False``

                Default: ``'automatic'``
        """
        super().__init__()
        self.context = context
        self.output_dir = output_dir
        self.vis = vis

        self.target_list = target_list
        self.hm_mosweight = hm_mosweight
        self.hm_perchanweightdensity = hm_perchanweightdensity
        self.hm_weighting = hm_weighting
        self.datacolumn = datacolumn
        self.parallel = parallel


[docs] @task_registry.set_equivalent_casa_task('hif_findcont') class FindCont(basetask.StandardTaskTemplate): Inputs = FindContInputs is_multi_vis_task = True
[docs] def prepare(self): inputs = self.inputs context = self.inputs.context # Check if this stage should be skipped if self._skip_findcont(): # only triggered for VLA-PI pieplein (not for ALMA) result = FindContResult({}, {}, '', 0, 0, [], {}) return result # Check for size mitigation errors. if 'status' in inputs.context.size_mitigation_parameters and \ inputs.context.size_mitigation_parameters['status'] == 'ERROR': result = FindContResult({}, {}, '', 0, 0, [], {}) result.mitigation_error = True return result qaTool = casa_tools.quanta # make sure inputs.vis is a list, even if it is one that contains a # single measurement set if not isinstance(inputs.vis, list): inputs.vis = [inputs.vis] if inputs.datacolumn not in (None, ''): datacolumn = inputs.datacolumn else: datacolumn = '' # Select the correct vis list if inputs.vis in ('', [''], [], None): datatypes = [DataType.SELFCAL_CONTLINE_SCIENCE, DataType.REGCAL_CONTLINE_SCIENCE, DataType.REGCAL_CONTLINE_ALL, DataType.RAW] ms_objects_and_columns, selected_datatype = context.observing_run.get_measurement_sets_of_type(dtypes=datatypes, msonly=False) if ms_objects_and_columns == collections.OrderedDict(): LOG.error('No data found for continuum finding.') result = FindContResult({}, {}, '', 0, 0, [], {}) return result LOG.info(f'Using data type {str(selected_datatype).split(".")[-1]} for continuum finding.') if selected_datatype == DataType.RAW: LOG.warning('Falling back to raw data for continuum finding.') columns = list(ms_objects_and_columns.values()) if not all(column == columns[0] for column in columns): LOG.warning( f'Data type based column selection changes among MSes: {",".join(f"{k.basename}: {v}" for k, v in ms_objects_and_columns.items())}.') if datacolumn != '': LOG.info( f'Manual override of datacolumn to {datacolumn}. Data type based datacolumn would have been "{"data" if columns[0] == "DATA" else "corrected"}".') else: if columns[0] == 'DATA': datacolumn = 'data' elif columns[0] == 'CORRECTED_DATA': datacolumn = 'corrected' else: LOG.warning(f'Unknown column name {columns[0]}') datacolumn = '' inputs.vis = [k.basename for k in ms_objects_and_columns.keys()] findcont_heuristics = findcont.FindContHeuristics(context) contfile_handler = contfilehandler.ContFileHandler(context.contfile) cont_ranges = contfile_handler.read() result_cont_ranges = {} joint_mask_names = {} momDiffSNRs = {} num_found = 0 num_total = 0 single_range_channel_fractions = [] for i, target in enumerate(inputs.target_list): for spwid in target['spw'].split(','): source_name = utils.dequote(target['field']) spw_name = context.observing_run.virtual_science_spw_ids[int(spwid)] # get continuum ranges dict for this source, also setting it if accessed for first time source_continuum_ranges = result_cont_ranges.setdefault(source_name, {}) # get continuum ranges list for this source and spw, also setting them if accessed for first time cont_ranges_source_spw = cont_ranges['fields'].setdefault(source_name, {}).setdefault(spwid, {'spwname': spw_name, 'ranges': [], 'flags': []}) if len(cont_ranges_source_spw['ranges']) > 0: LOG.info('Using existing selection {!r} for field {!s}, ' 'spw {!s}'.format(cont_ranges_source_spw['ranges'], source_name, spwid)) source_continuum_ranges[spwid] = { 'cont_ranges': cont_ranges_source_spw, 'plotfile': 'none', 'status': 'OLD' } if cont_ranges_source_spw['ranges'] != ['NONE']: num_found += 1 else: LOG.info('Determining continuum ranges for field %s, spw %s' % (source_name, spwid)) findcont_basename = '%s.I.findcont' % (os.path.basename(target['imagename']).replace( 'spw%s' % (target['spw'].replace(',', '_')), 'spw%s' % spwid ).replace('STAGENUMBER', str(context.stage))) # Determine the gridder mode image_heuristics = target['heuristics'] gridder = image_heuristics.gridder(target['intent'], target['field']) if inputs.hm_mosweight not in (None, ''): mosweight = inputs.hm_mosweight elif target['mosweight'] not in (None, ''): mosweight = target['mosweight'] else: mosweight = image_heuristics.mosweight(target['intent'], target['field']) # Determine weighting and perchanweightdensity parameters if inputs.hm_weighting in (None, ''): weighting = image_heuristics.weighting('cube') perchanweightdensity = image_heuristics.perchanweightdensity('cube') else: weighting = inputs.hm_weighting perchanweightdensity = inputs.hm_perchanweightdensity # Usually the inputs value takes precedence over the one from the target list. # For PIPE-557 it was necessary to fill target['vis'] in hif_makeimlist to filter # out fully flagged selections. Using the default vislist one would have to # re-determine all dependendent parameters such as "antenna", etc. here. Most # likely the use case of defining an explicit vislist for hif_findcont will # happen very rarely if at all. Thus changing the paradigm here for now. if target['vis']: vislist = target['vis'] scanidlist, _ = image_heuristics.get_scanidlist(target['vis'], target['field'], target['intent']) else: scanidlist, visindexlist = image_heuristics.get_scanidlist(inputs.vis, target['field'], target['intent']) # Remove MSs that do not contain data for the given field(s) vislist = [inputs.vis[i] for i in visindexlist] # Need to make an LSRK or SOURCE/REST (ephemeris sources) cube to get # the real ranges in the source frame. # The LSRK or SOURCE/REST ranges will need to be translated to the # individual TOPO ranges for the involved MSs in hif_tclean. if image_heuristics.is_eph_obj(target['field']): frame = 'REST' else: frame = 'LSRK' # To avoid noisy edge channels, use only the LSRK or SOURCE/REST frequency # intersection and skip one channel on either end. # Use only the current spw ID here ! if0, if1, channel_width = image_heuristics.freq_intersection(vislist, target['field'], target['intent'], spwid, frame) if (if0 == -1) or (if1 == -1): LOG.error('No %s frequency intersect among selected MSs for Field %s ' 'SPW %s' % (frame, target['field'], spwid)) cont_ranges['fields'][source_name][spwid]['spwname'] = spw_name cont_ranges['fields'][source_name][spwid]['ranges'] = ['NONE'] cont_ranges['fields'][source_name][spwid]['flags'] = [] result_cont_ranges[source_name][spwid] = { 'cont_ranges': cont_ranges['fields'][source_name][spwid], 'plotfile': 'none', 'status': 'NEW' } continue # Check for manually supplied values if0_auto = if0 if1_auto = if1 channel_width_auto = channel_width if target['start'] != '': if0 = qaTool.convert(target['start'], 'Hz')['value'] if if0 < if0_auto: LOG.error('Supplied start frequency (%s GHz) < f_low_native (%s GHz) for Field %s ' 'SPW %s' % (if0/1e9, if0_auto/1e9, target['field'], target['spw'])) continue LOG.info('Using supplied start frequency %s' % (target['start'])) if target['width'] != '' and target['nbin'] not in (None, -1): LOG.error('Field %s SPW %s: width and nbin are mutually exclusive' % (target['field'], target['spw'])) continue if target['width'] != '': channel_width_manual = qaTool.convert(target['width'], 'Hz')['value'] if channel_width_manual < channel_width_auto: LOG.error( 'User supplied channel width (%s MHz) is smaller than the native value (%s MHz) ' 'for Field %s SPW %s', channel_width_manual / 1e6, channel_width_auto / 1e6, target['field'], target['spw'], ) continue LOG.info('Using supplied width %s' % (target['width'])) channel_width = channel_width_manual if channel_width > channel_width_auto: target['nbin'] = int(utils.round_half_up(channel_width / channel_width_auto) + 0.5) elif target['nbin'] not in (None, -1): LOG.info('Applying binning factor %d' % (target['nbin'])) channel_width *= target['nbin'] # Get real spwid ref_ms = context.observing_run.get_ms(vislist[0]) real_spwid = context.observing_run.virtual2real_spw_id(int(spwid), ref_ms) real_spwid_obj = ref_ms.get_spectral_window(real_spwid) if image_heuristics.is_eph_obj(target['field']): # Determine extra channels to skip for ephemeris objects to # account for fast moving objects. centre_frequency_TOPO = float(real_spwid_obj.centre_frequency.to_units(measures.FrequencyUnits.HERTZ)) channel_width_freq_TOPO = float(real_spwid_obj.channels[0].getWidth().to_units(measures.FrequencyUnits.HERTZ)) freq0 = qaTool.quantity(centre_frequency_TOPO, 'Hz') freq1 = qaTool.quantity(centre_frequency_TOPO + channel_width_freq_TOPO, 'Hz') channel_width_velo_TOPO = float(qaTool.getvalue(qaTool.convert(utils.frequency_to_velocity(freq1, freq0), 'km/s'))[0]) # Skip 1 km/s extra_skip_channels = int(np.ceil(1.0 / abs(channel_width_velo_TOPO))) else: extra_skip_channels = 0 if target['nchan'] not in (None, -1): if1 = if0 + channel_width * target['nchan'] if if1 > if1_auto: LOG.error('Calculated stop frequency (%s GHz) > f_high_native (%s GHz) for Field %s ' 'SPW %s' % (if1/1e9, if1_auto/1e9, target['field'], target['spw'])) continue LOG.info('Using supplied nchan %d' % (target['nchan'])) nchan = target['nchan'] else: # Skip edge channels and extra channels if no nchan is supplied. # Adjust to binning since the normal nchan heuristics already includes it. if target['nbin'] not in (None, -1): nchan = int(utils.round_half_up((if1 - if0) / channel_width - 2)) - 2 * int(extra_skip_channels // target['nbin']) else: nchan = int(utils.round_half_up((if1 - if0) / channel_width - 2)) - 2 * extra_skip_channels if target['start'] == '': # tclean interprets the start frequency as the center of the # first channel. We have, however, an edge to edge range. # Thus shift by 0.5 channels if no start is supplied. # Additionally skipping the edge channel (cf. "- 2" above) # means a correction of 1.5 channels. if target['nbin'] not in (None, -1): start = '%.10fGHz' % ((if0 + (1.5 + extra_skip_channels) * channel_width / target['nbin']) / 1e9) else: start = '%.10fGHz' % ((if0 + (1.5 + extra_skip_channels) * channel_width) / 1e9) else: start = target['start'] width = '%.7fMHz' % (channel_width / 1e6) parallel = mpihelpers.parse_parallel_input_parameter(inputs.parallel) real_spwsel = context.observing_run.get_real_spwsel([str(spwid)]*len(vislist), vislist) # Set special phasecenter, frame and specmode for ephemeris objects. # Needs to be done here since the explicit coordinates are # used in heuristics methods upstream. if image_heuristics.is_eph_obj(target['field']): phasecenter = 'TRACKFIELD' psf_phasecenter = None # 'REST' does not yet work (see CAS-8965, CAS-9997) #outframe = 'REST' outframe = '' specmode = 'cubesource' else: phasecenter = target['phasecenter'] if gridder == 'mosaic' and target['psf_phasecenter'] != target['phasecenter']: psf_phasecenter = target['psf_phasecenter'] else: psf_phasecenter = None outframe = 'LSRK' specmode = 'cube' # PIPE-107 requests using a fixed robust value of 1.0. robust = 1.0 if target['uvrange'] not in (None, [], ''): uvrange = target['uvrange'] else: uvrange = None if target['uvtaper'] not in ([], None): uvtaper = target['uvtaper'] else: uvtaper = None if target['antenna'] not in ([], None): antenna = target['antenna'] else: antenna = None if target['usepointing'] not in (None,): usepointing = target['usepointing'] else: usepointing = None job = casa_tasks.tclean(vis=vislist, imagename=findcont_basename, datacolumn=datacolumn, antenna=antenna, spw=real_spwsel, intent=utils.to_CASA_intent(inputs.ms[0], target['intent']), field=target['field'], start=start, width=width, nchan=nchan, outframe=outframe, scan=scanidlist, specmode=specmode, gridder=gridder, mosweight=mosweight, perchanweightdensity=perchanweightdensity, pblimit=0.2, niter=0, threshold='0mJy', deconvolver='hogbom', interactive=False, imsize=target['imsize'], cell=target['cell'], phasecenter=phasecenter, psfphasecenter=psf_phasecenter, stokes='I', weighting=weighting, robust=robust, uvrange=uvrange, uvtaper=uvtaper, npixels=0, restoration=False, restoringbeam=[], pbcor=False, usepointing=usepointing, savemodel='none', parallel=parallel, fullsummary=False) self._executor.execute(job) # Try detecting continuum frequency ranges # Determine the representative source name and spwid for the ms repsource_name, repsource_spwid = ref_ms.get_representative_source_spw() # Determine reprBW mode repr_target, _, repr_spw, _, reprBW_mode, real_repr_target, _, _, _, _ = image_heuristics.representative_target() real_repr_spw = context.observing_run.virtual2real_spw_id(int(repr_spw), ref_ms) real_repr_spw_obj = ref_ms.get_spectral_window(real_repr_spw) if reprBW_mode in ['nbin', 'repr_spw']: # Approximate reprBW with nbin physicalBW_of_1chan = float(real_repr_spw_obj.channels[0].getWidth().convert_to(measures.FrequencyUnits.HERTZ).value) reprBW_nbin = int(qaTool.getvalue(qaTool.convert(repr_target[2], 'Hz'))[0]/physicalBW_of_1chan + 0.5) else: reprBW_nbin = 1 spw_transitions = ref_ms.get_spectral_window(real_spwid).transitions single_continuum = any(['Single_Continuum' in t for t in spw_transitions]) # PIPE-1855: use spectralDynamicRangeBandWidth from SBSummary if available dynrange_bw = ref_ms.science_goals.get('spectralDynamicRangeBandWidth', None) if dynrange_bw is not None: # None means that a value was not provided, and it should remain None dynrange_bw = qaTool.tos(dynrange_bw) (cont_ranges_and_flags, png, single_range_channel_fraction, warning_strings, joint_mask_name, momDiffSNR) = \ findcont_heuristics.find_continuum(dirty_cube='%s.residual' % findcont_basename, pb_cube='%s.pb' % findcont_basename, psf_cube='%s.psf' % findcont_basename, single_continuum=single_continuum, is_eph_obj=image_heuristics.is_eph_obj(target['field']), ref_ms_name=ref_ms.name, nbin=reprBW_nbin, dynrange_bw=dynrange_bw) joint_mask_names[(source_name, spwid)] = joint_mask_name momDiffSNRs[(source_name, spwid)] = momDiffSNR # PIPE-74 if single_range_channel_fraction < 0.05: LOG.warning('Only a single narrow range of channels was found for continuum in ' '{field} in spw {spw}, so the continuum subtraction ' 'may be poor for that spw.'.format(field=target['field'], spw=spwid)) # Internal findContinuum warnings for warning_msg in warning_strings: # PIPE-2158: Exclude empty strings if warning_msg: LOG.warning('Field {field}, spw {spw}: {warning_msg}'.format(field=target['field'], spw=spwid, warning_msg=warning_msg)) is_repsource = (repsource_name == target['field']) and (repsource_spwid == spwid) chanfrac = {'fraction' : single_range_channel_fraction, 'field' : target['field'], 'spw' : spwid, 'is_repsource': is_repsource} single_range_channel_fractions.append(chanfrac) cont_ranges['fields'][source_name][spwid].update(cont_ranges_and_flags) source_continuum_ranges[spwid] = { 'cont_ranges': cont_ranges_and_flags, 'flags': [], 'plotfile': png, 'status': 'NEW' } if cont_ranges_and_flags['ranges'] not in [['NONE'], [''], []]: num_found += 1 num_total += 1 result = FindContResult(result_cont_ranges, cont_ranges, joint_mask_names, num_found, num_total, single_range_channel_fractions, momDiffSNRs) return result
[docs] def analyse(self, result): return result
def _skip_findcont(self) -> bool: """Check if we can proceed with the continuum finding heuristics. Note: this is only relevant for VLA to detect if we should proceed with VLA cube imaging sequence """ findcont_datatypes = [ DataType.SELFCAL_CONTLINE_SCIENCE, DataType.REGCAL_CONTLINE_SCIENCE, ] ms_list = self.inputs.context.observing_run.get_measurement_sets_of_type(findcont_datatypes, msonly=True) telescope = self.inputs.context.project_summary.telescope return 'VLA' in telescope.upper() and not ms_list