import os
import re
import inspect
import numpy as np
from scipy.ndimage import label
import pipeline.domain.measures as measures
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.imageheader as imageheader
import pipeline.infrastructure.mpihelpers as mpihelpers
import pipeline.infrastructure.pipelineqa as pipelineqa
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.domain import DataType
from pipeline.hif.heuristics import imageparams_factory
from pipeline.infrastructure import casa_tasks, casa_tools, task_registry
from . import cleanbase
from .automaskthresholdsequence import AutoMaskThresholdSequence
from .autoscalthresholdsequence import AutoScalThresholdSequence
from .imagecentrethresholdsequence import ImageCentreThresholdSequence
from .manualmaskthresholdsequence import ManualMaskThresholdSequence
from .reusemaskthresholdsequence import ReuseMaskThresholdSequence
from .nomaskthresholdsequence import NoMaskThresholdSequence
from .resultobjects import TcleanResult
from .vlaautomaskthresholdsequence import VlaAutoMaskThresholdSequence
from .vlassmaskthresholdsequence import VlassMaskThresholdSequence
LOG = infrastructure.logging.get_logger(__name__)
class TcleanInputs(cleanbase.CleanBaseInputs):
# Search order of input vis
# This is just an initial default to get any vis. The real selection is
# usually made in hif_makeimlist and passed on as explicit parameter
# via hif_makeimages.
processing_data_type = [DataType.SELFCAL_LINE_SCIENCE, DataType.REGCAL_LINE_SCIENCE, DataType.SELFCAL_CONT_SCIENCE, DataType.REGCAL_CONT_SCIENCE,
DataType.SELFCAL_CONTLINE_SCIENCE, DataType.REGCAL_CONTLINE_SCIENCE, DataType.REGCAL_CONTLINE_ALL, DataType.RAW]
# simple properties ------------------------------------------------------------------------------------------------
calcsb = vdp.VisDependentProperty(default=False)
cleancontranges = vdp.VisDependentProperty(default=False)
datacolumn = vdp.VisDependentProperty(default=None)
datatype = vdp.VisDependentProperty(default=None)
datatype_info = vdp.VisDependentProperty(default=None)
hm_cleaning = vdp.VisDependentProperty(default='rms')
masklimit = vdp.VisDependentProperty(default=4.0)
mosweight = vdp.VisDependentProperty(default=None)
parallel = vdp.VisDependentProperty(default='automatic')
reffreq = vdp.VisDependentProperty(default=None)
restfreq = vdp.VisDependentProperty(default=None)
tlimit = vdp.VisDependentProperty(default=None)
usepointing = vdp.VisDependentProperty(default=None)
weighting = vdp.VisDependentProperty(default=None)
pblimit = vdp.VisDependentProperty(default=None)
cfcache = vdp.VisDependentProperty(default=None)
cfcache_nowb = vdp.VisDependentProperty(default=None)
pbmask = vdp.VisDependentProperty(default=None)
# override CleanBaseInputs default value of 'auto'
hm_masking = vdp.VisDependentProperty(default='centralregion')
# properties requiring some logic ----------------------------------------------------------------------------------
@vdp.VisDependentProperty
def image_heuristics(self):
image_heuristics_factory = imageparams_factory.ImageParamsHeuristicsFactory()
return image_heuristics_factory.getHeuristics(
vislist=self.vis,
spw=self.spw,
observing_run=self.context.observing_run,
imagename_prefix=self.context.project_structure.ousstatus_entity_id,
proj_params=self.context.project_performance_parameters,
contfile=self.context.contfile,
linesfile=self.context.linesfile,
imaging_params=self.context.imaging_parameters,
processing_intents=self.context.processing_intents,
# TODO: imaging_mode should not be fixed
imaging_mode='ALMA'
)
imagename = vdp.VisDependentProperty(default='')
@imagename.convert
def imagename(self, value):
return value.replace('STAGENUMBER', str(self.context.stage))
specmode = vdp.VisDependentProperty(default=None)
@specmode.convert
def specmode(self, value):
if value == 'repBW':
self.hm_specmode = 'repBW'
return 'cube'
self.hm_specmode = value
return value
@vdp.VisDependentProperty
def spwsel_lsrk(self):
# mutable object, so should not use VisDependentProperty(default={})
return {}
@vdp.VisDependentProperty
def spwsel_topo(self):
# mutable object, so should not use VisDependentProperty(default=[])
return []
@vdp.VisDependentProperty(null_input=[None, -999, -999.0])
def robust(self):
# Fallback value if undefined in the imaging target and in the
# imaging parameters. TODO: Use heuristic
return 0.5
@vdp.VisDependentProperty
def uvtaper(self):
# Fallback value if undefined in the imaging target and in the
# imaging parameters. TODO: Use heuristic
return []
# class methods ----------------------------------------------------------------------------------------------------
def __init__(self, context, output_dir=None, vis=None, imagename=None, intent=None, field=None, spw=None,
spwsel_lsrk=None, spwsel_topo=None, uvrange=None, specmode=None, gridder=None, deconvolver=None,
nterms=None, outframe=None, imsize=None, cell=None, phasecenter=None, psf_phasecenter=None, stokes=None, nchan=None,
start=None, width=None, nbin=None, datacolumn=None, datatype=None, datatype_info=None, pblimit=None,
cfcache=None, restoringbeam=None, hm_masking=None, hm_sidelobethreshold=None, hm_noisethreshold=None,
hm_lownoisethreshold=None, hm_negativethreshold=None, hm_minbeamfrac=None, hm_growiterations=None,
hm_dogrowprune=None, hm_minpercentchange=None, hm_fastnoise=None, hm_nsigma=None,
hm_perchanweightdensity=None, hm_npixels=None, hm_cleaning=None,
iter=None, mask=None, niter=None, threshold=None, tlimit=None, drcorrect=None, masklimit=None,
calcsb=None, cleancontranges=None, parallel=None,
# Extra parameters not in the CLI task interface
weighting=None, robust=None, uvtaper=None, scales=None, cycleniter=None, cyclefactor=None, nmajor=None, wprojplanes=None,
hm_minpsffraction=None, hm_maxpsffraction=None,
sensitivity=None, reffreq=None, restfreq=None, conjbeams=None, is_per_eb=None, antenna=None,
usepointing=None, mosweight=None, spwsel_all_cont=None, spwsel_low_bandwidth=None,
spwsel_low_spread=None, num_all_spws=None, num_good_spws=None, bl_ratio=None, cfcache_nowb=None,
# End of extra parameters
heuristics=None, pbmask=None):
super().__init__(context, output_dir=output_dir, vis=vis, imagename=imagename, antenna=antenna,
datacolumn=datacolumn, datatype=datatype, datatype_info=datatype_info,
intent=intent, field=field, spw=spw, uvrange=uvrange, specmode=specmode,
gridder=gridder, deconvolver=deconvolver, uvtaper=uvtaper, nterms=nterms,
cycleniter=cycleniter, cyclefactor=cyclefactor, nmajor=nmajor, wprojplanes=wprojplanes,
hm_minpsffraction=hm_minpsffraction, hm_maxpsffraction=hm_maxpsffraction,
scales=scales, outframe=outframe, imsize=imsize, cell=cell, phasecenter=phasecenter,
psf_phasecenter=psf_phasecenter, nchan=nchan, start=start, width=width, stokes=stokes,
weighting=weighting, robust=robust, restoringbeam=restoringbeam, pblimit=pblimit,
iter=iter, mask=mask, hm_masking=hm_masking, cfcache=cfcache,
hm_sidelobethreshold=hm_sidelobethreshold,
hm_noisethreshold=hm_noisethreshold,
hm_lownoisethreshold=hm_lownoisethreshold,
hm_negativethreshold=hm_negativethreshold, hm_minbeamfrac=hm_minbeamfrac,
hm_growiterations=hm_growiterations, hm_dogrowprune=hm_dogrowprune,
hm_minpercentchange=hm_minpercentchange, hm_fastnoise=hm_fastnoise,
hm_nsigma=hm_nsigma, hm_perchanweightdensity=hm_perchanweightdensity,
hm_npixels=hm_npixels, niter=niter, threshold=threshold,
sensitivity=sensitivity, conjbeams=conjbeams, is_per_eb=is_per_eb,
usepointing=usepointing, mosweight=mosweight,
parallel=parallel, heuristics=heuristics)
self.calcsb = calcsb
self.cleancontranges = cleancontranges
self.hm_cleaning = hm_cleaning
self.image_heuristics = heuristics
self.masklimit = masklimit
self.nbin = nbin
self.reffreq = reffreq
self.restfreq = restfreq
self.spwsel_lsrk = spwsel_lsrk
self.spwsel_topo = spwsel_topo
self.spwsel_all_cont = spwsel_all_cont
self.spwsel_low_bandwidth = spwsel_low_bandwidth
self.spwsel_low_spread = spwsel_low_spread
self.num_all_spws = num_all_spws
self.num_good_spws = num_good_spws
self.bl_ratio = bl_ratio
self.tlimit = tlimit
self.drcorrect = drcorrect
# For MOM0/8_FC and cube RMS we need the LSRK frequency ranges in
# various places
self.cont_freq_ranges = ''
self.is_per_eb = is_per_eb
self.antenna = antenna
self.usepointing = usepointing
self.mosweight = mosweight
self.datacolumn = datacolumn
self.datatype = datatype
self.datatype_info = datatype_info
self.pblimit = pblimit
self.cfcache = cfcache
self.cfcache_nowb = cfcache_nowb
self.pbmask = pbmask
# tell the infrastructure to give us mstransformed data when possible by
# registering our preference for imaging measurement sets
#api.ImagingMeasurementSetsPreferred.register(TcleanInputs)
[docs]
@task_registry.set_equivalent_casa_task('hif_tclean')
@task_registry.set_casa_commands_comment('A single target source is cleaned.')
class Tclean(cleanbase.CleanBase):
Inputs = TcleanInputs
is_multi_vis_task = True
[docs]
def rm_stage_files(self, imagename, stokes=''):
filenames = utils.glob_ordered('%s*.%s.iter*' % (imagename, stokes))
for filename in filenames:
try:
if os.path.isfile(filename):
os.remove(filename)
elif os.path.isdir(filename):
rmtree_job = casa_tasks.rmtree(filename, ignore_errors=False)
self._executor.execute(rmtree_job)
else:
raise Exception('Cannot remove %s' % filename)
except Exception as e:
LOG.warning('Exception while deleting %s: %s' % (filename, e))
[docs]
def rm_iter_files(self, rootname, iteration):
# Delete any old files with this naming root
filenames = utils.glob_ordered('%s.iter%s*' % (rootname, iteration))
for filename in filenames:
try:
rmtree_job = casa_tasks.rmtree(filename)
self._executor.execute(rmtree_job)
except Exception as e:
LOG.warning('Exception while deleting %s: %s' % (filename, e))
[docs]
def copy_products(self, old_pname, new_pname, ignore=None):
imlist = utils.glob_ordered('%s.*' % (old_pname))
imlist = [xx for xx in imlist if ignore is None or ignore not in xx]
for image_name in imlist:
newname = image_name.replace(old_pname, new_pname)
LOG.info('Copying {} to {}'.format(image_name, newname))
job = casa_tasks.copytree(image_name, newname)
self._executor.execute(job)
[docs]
def move_products(self, old_pname, new_pname, ignore_list=None, remove_list=None, copy_list=None):
"""Move imaging products of one iteration to another.
Certain image types can be excluded from the default "move" operation using the following keywords (in the precedence order):
ignore_list: do nothing (no 'remove', 'copy', or 'move'), if any string from the list is in the image name.
remove_list: remove without 'move' or 'copy', if any string from the list in the image name.
copy_list: copy instead move, if any string from the list is in the image name.
"""
imlist = utils.glob_ordered('%s.*' % (old_pname))
for image_name in imlist:
newname = image_name.replace(old_pname, new_pname)
if isinstance(ignore_list, (list, tuple)):
if any([ignore_pattern in image_name for ignore_pattern in ignore_list]):
continue
if isinstance(remove_list, (list, tuple)):
if any([remove_pattern in image_name for remove_pattern in remove_list]):
LOG.info('Remove {}'.format(image_name))
job = casa_tasks.rmtree(image_name)
self._executor.execute(job)
continue
if isinstance(copy_list, (list, tuple)):
if any([copy_pattern in image_name for copy_pattern in copy_list]):
LOG.info('Copying {} to {}'.format(image_name, newname))
job = casa_tasks.copytree(image_name, newname)
self._executor.execute(job)
continue
LOG.info('Moving {} to {}'.format(image_name, newname))
job = casa_tasks.move(image_name, newname)
self._executor.execute(job)
return
[docs]
def prepare(self):
inputs = self.inputs
context = self.inputs.context
self.known_synthesized_beams = self.inputs.context.synthesized_beams
LOG.info('Start imaging for intent %s, field %s, spw %s', inputs.intent, inputs.field, inputs.spw)
per_spw_cont_sensitivities_all_chan = context.per_spw_cont_sensitivities_all_chan
qaTool = casa_tools.quanta
# if 'start' or 'width' are defined in velocity units, track these
# for conversion to frequency and back before and after tclean call. Added
# to support SRDP ALMA optimized imaging.
self.start_as_velocity = None
self.width_as_velocity = None
self.start_as_frequency = None
self.width_as_frequency = None
self.aggregate_lsrk_bw = None
# delete any old files with this naming root. One of more
# of these (don't know which) will interfere with this run.
if inputs.stokes:
LOG.info('deleting {}*.{}.iter*'.format(inputs.imagename, inputs.stokes))
self.rm_stage_files(inputs.imagename, inputs.stokes)
else:
LOG.info('deleting {}*.iter*'.format(inputs.imagename))
self.rm_stage_files(inputs.imagename)
# Get the image parameter heuristics
self.image_heuristics = inputs.image_heuristics
# Set initial masking limits
self.pblimit_image, self.pblimit_cleanmask = self.image_heuristics.pblimits(None, inputs.specmode)
if not inputs.pblimit:
inputs.pblimit = self.pblimit_image
# Remove MSs that do not contain data for the given field(s)
_, visindexlist = self.image_heuristics.get_scanidlist(inputs.vis, inputs.field, inputs.intent)
filtered_vislist = [inputs.vis[i] for i in visindexlist]
if filtered_vislist != inputs.vis:
inputs.vis = filtered_vislist
# Also need to reset any antenna list to trigger recalculation below.
inputs.antenna = None
# Generate the image name if one is not supplied.
if inputs.imagename in (None, ''):
inputs.imagename = self.image_heuristics.imagename(intent=inputs.intent,
field=inputs.field,
spwspec=inputs.spw,
specmode=inputs.specmode)
# Determine the default gridder
if inputs.gridder in (None, ''):
inputs.gridder = self.image_heuristics.gridder(inputs.intent, inputs.field)
# Determine deconvolver
if inputs.deconvolver in (None, ''):
inputs.deconvolver = self.image_heuristics.deconvolver(inputs.specmode, inputs.spw, inputs.intent, inputs.stokes)
# Determine weighting
if inputs.weighting in (None, ''):
inputs.weighting = self.image_heuristics.weighting(inputs.specmode)
# Determine perchanweightdensity
if inputs.hm_perchanweightdensity in (None, ''):
inputs.hm_perchanweightdensity = self.image_heuristics.perchanweightdensity(inputs.specmode)
# Determine nterms
if (inputs.nterms in ('', None)) and (inputs.deconvolver == 'mtmfs'):
inputs.nterms = self.image_heuristics.nterms(inputs.spw)
# Determine antennas to be used
if inputs.antenna in (None, [], ''):
antenna_ids = self.image_heuristics.antenna_ids(inputs.intent)
# PIPE-964: The '&' at the end of the antenna input was added to not to consider the cross baselines by
# default. The cross baselines with antennas not listed (for TARGET images the antennas with the minority
# antenna sizes are not listed) could be added in some future configurations by removing this character.
inputs.antenna = [','.join(map(str, antenna_ids.get(os.path.basename(v), '')))+'&' for v in inputs.vis]
# If imsize not set then use heuristic code to calculate the
# centers for each field / spw
imsize = inputs.imsize
cell = inputs.cell
if imsize in (None, [], '') or cell in (None, [], ''):
# The heuristics cell size is always the same for x and y as
# the value derives from a single value returned by imager.advise
synthesized_beam, self.known_synthesized_beams = \
self.image_heuristics.synthesized_beam(field_intent_list=[(inputs.field, inputs.intent)],
spwspec=inputs.spw,
robust=inputs.robust,
uvtaper=inputs.uvtaper,
parallel=inputs.parallel,
known_beams=self.known_synthesized_beams,
force_calc=inputs.calcsb,
shift=True)
cell = self.image_heuristics.cell(beam=synthesized_beam)
if inputs.cell in (None, [], ''):
inputs.cell = cell
LOG.info('Heuristic cell: %s' % cell)
field_ids = self.image_heuristics.field(inputs.intent, inputs.field)
largest_primary_beam = self.image_heuristics.largest_primary_beam_size(spwspec=inputs.spw,
intent=inputs.intent)
# spw dependent imsize (FOV) in continuum spectral mode
imsize_spwlist = inputs.spw if inputs.specmode == 'cont' else None
imsize = self.image_heuristics.imsize(fields=field_ids,
cell=inputs.cell,
primary_beam=largest_primary_beam,
spwspec=imsize_spwlist,
intent=inputs.intent,specmode=inputs.specmode)
if inputs.imsize in (None, [], ''):
inputs.imsize = imsize
LOG.info('Heuristic imsize: %s', imsize)
if inputs.specmode == 'cube':
# To avoid noisy edge channels, use only the frequency
# intersection and skip one channel on either end.
if self.image_heuristics.is_eph_obj(inputs.field):
frame = 'REST'
else:
frame = 'LSRK'
if0, if1, channel_width = self.image_heuristics.freq_intersection(inputs.vis, inputs.field, inputs.intent,
inputs.spw, frame)
if (if0 == -1) or (if1 == -1):
LOG.error('No frequency intersect among selected MSs for Field %s SPW %s' % (inputs.field, inputs.spw))
error_result = TcleanResult(vis=inputs.vis,
sourcename=inputs.field,
intent=inputs.intent,
spw=inputs.spw,
specmode=inputs.specmode,
imaging_mode=self.image_heuristics.imaging_mode)
error_result.error = '%s/%s/spw%s clean error: No frequency intersect among selected MSs' % (inputs.field, inputs.intent, inputs.spw)
return error_result
# Check for manually supplied values
if0_auto = if0
if1_auto = if1
channel_width_auto = channel_width
if inputs.start != '':
# if specified in velocity units, convert to frequency
# then back again before the tclean call
if 'm/' in inputs.start:
self.start_as_velocity = qaTool.quantity(inputs.start)
inputs.start = utils.velocity_to_frequency(inputs.start, inputs.restfreq)
self.start_as_frequency = inputs.start
if0 = qaTool.convert(inputs.start, 'Hz')['value']
if if0 < if0_auto:
LOG.error('Supplied start frequency (%s GHz) < f_low_native (%s GHz) for Field %s '
'SPW %s' % (if0/1e9, if0_auto/1e9, inputs.field, inputs.spw))
error_result = TcleanResult(vis=inputs.vis,
sourcename=inputs.field,
intent=inputs.intent,
spw=inputs.spw,
specmode=inputs.specmode,
imaging_mode=self.image_heuristics.imaging_mode)
error_result.error = '%s/%s/spw%s clean error: f_start < f_low_native' % (inputs.field,
inputs.intent, inputs.spw)
return error_result
LOG.info('Using supplied start frequency %s' % inputs.start)
if (inputs.width != '') and (inputs.nbin not in (None, -1)):
LOG.error('Field %s SPW %s: width and nbin are mutually exclusive' % (inputs.field, inputs.spw))
error_result = TcleanResult(vis=inputs.vis,
sourcename=inputs.field,
intent=inputs.intent,
spw=inputs.spw,
specmode=inputs.specmode,
imaging_mode=self.image_heuristics.imaging_mode)
error_result.error = '%s/%s/spw%s clean error: width and nbin are mutually exclusive' % (inputs.field,
inputs.intent,
inputs.spw)
return error_result
if inputs.width != '':
# if specified in velocity units, convert to frequency
# then back again before the tclean call
if 'm/' in inputs.width:
self.width_as_velocity = qaTool.quantity(inputs.width)
start_plus_width = qaTool.add(self.start_as_velocity, inputs.width)
start_plus_width_freq = utils.velocity_to_frequency(start_plus_width, inputs.restfreq)
inputs.width = qaTool.sub(start_plus_width_freq, inputs.start)
self.width_as_frequency = inputs.width
channel_width_manual = qaTool.convert(inputs.width, 'Hz')['value']
# PIPE-1984: add tolerance acceptance when comparing user-specified chanwidths with
# the intrinsic vis chanwidths.
channel_width_tolerance = 0.05
if abs(channel_width_manual) < channel_width_auto*(1-channel_width_tolerance):
LOG.error(
'User supplied channel width (%s MHz) is smaller than the native value (%s MHz) '
'for Field %s SPW %s',
channel_width_manual / 1e6, # manual width in MHz
channel_width_auto / 1e6, # native width in MHz
inputs.field,
inputs.spw,
)
error_result = TcleanResult(vis=inputs.vis,
sourcename=inputs.field,
intent=inputs.intent,
spw=inputs.spw,
specmode=inputs.specmode,
imaging_mode=self.image_heuristics.imaging_mode)
error_result.error = '%s/%s/spw%s clean error: user channel width too small' % (inputs.field,
inputs.intent,
inputs.spw)
return error_result
else:
if abs(channel_width_manual) < channel_width_auto:
LOG.warning(
'User supplied channel width (%s MHz) is smaller than native value (%s MHz) '
'for Field %s SPW %s, but within tolerance of %f MHz; using native value.',
channel_width_manual / 1e6, # user‑supplied width in MHz
channel_width_auto / 1e6, # native width in MHz
inputs.field,
inputs.spw,
channel_width_tolerance,
)
channel_width = channel_width_auto
else:
LOG.info('Using supplied width %s' % inputs.width)
channel_width = channel_width_manual
if abs(channel_width) > channel_width_auto:
inputs.nbin = int(utils.round_half_up(abs(channel_width) / channel_width_auto) + 0.5)
elif inputs.nbin not in (None, -1):
LOG.info('Applying binning factor %d' % inputs.nbin)
channel_width *= inputs.nbin
if self.image_heuristics.is_eph_obj(inputs.field):
# Determine extra channels to skip for ephemeris objects to
# account for fast moving objects.
ref_ms = context.observing_run.get_ms(inputs.vis[0])
real_spw = context.observing_run.virtual2real_spw_id(inputs.spw, ref_ms)
real_spw_obj = ref_ms.get_spectral_window(real_spw)
centre_frequency_TOPO = float(real_spw_obj.centre_frequency.to_units(measures.FrequencyUnits.HERTZ))
channel_width_freq_TOPO = float(real_spw_obj.channels[0].getWidth().to_units(measures.FrequencyUnits.HERTZ))
freq0 = qaTool.quantity(centre_frequency_TOPO, 'Hz')
freq1 = qaTool.quantity(centre_frequency_TOPO + channel_width_freq_TOPO, 'Hz')
channel_width_velo_TOPO = float(qaTool.getvalue(qaTool.convert(utils.frequency_to_velocity(freq1, freq0), 'km/s'))[0])
# Skip 1 km/s
extra_skip_channels = int(np.ceil(1.0 / abs(channel_width_velo_TOPO)))
else:
extra_skip_channels = 0
if inputs.nchan not in (None, -1):
if1 = if0 + channel_width * inputs.nchan
if if1 > if1_auto:
nchan_use = int(np.floor((if1_auto - if0) / channel_width))
LOG.warning('Calculated stop frequency (%s GHz) > f_high_native (%s GHz) for Field %s '
'SPW %s, adjusting nchan from %d to %d to avoid imaging out of the SPW coverage.',
if1/1e9, if1_auto/1e9, inputs.field, inputs.spw, inputs.nchan, nchan_use)
if nchan_use < 1:
LOG.error('No coverage overlap with the requested imaging frequency range for Field %s '
'SPW %s', inputs.field, inputs.spw)
error_result = TcleanResult(vis=inputs.vis,
sourcename=inputs.field,
intent=inputs.intent,
spw=inputs.spw,
specmode=inputs.specmode,
imaging_mode=self.image_heuristics.imaging_mode)
error_result.error = (
f'{inputs.field}/{inputs.intent}/spw{inputs.spw} '
'clean error: invalid specification for imaging channel grid'
)
return error_result
inputs.nchan = nchan_use
LOG.info('Using nchan=%d', inputs.nchan)
else:
# Skip edge channels and extra channels if no nchan is supplied.
# Adjust to binning since the normal nchan heuristics already includes it.
if inputs.nbin not in (None, -1):
inputs.nchan = int(utils.round_half_up((if1 - if0) / channel_width - 2)) - \
2 * int(extra_skip_channels // inputs.nbin)
else:
inputs.nchan = int(utils.round_half_up((if1 - if0) / channel_width - 2)) - 2 * extra_skip_channels
if inputs.start == '':
# tclean interprets the start frequency as the center of the
# first channel. We have, however, an edge to edge range.
# Thus shift by 0.5 channels if no start is supplied.
# Additionally skipping the edge channel (cf. "- 2" above)
# means a correction of 1.5 channels.
if inputs.nbin not in (None, -1):
inputs.start = '%.10fGHz' % ((if0 + (1.5 + extra_skip_channels) * channel_width / inputs.nbin) / 1e9)
else:
inputs.start = '%.10fGHz' % ((if0 + (1.5 + extra_skip_channels) * channel_width) / 1e9)
# Always adjust width to apply possible binning
inputs.width = '%.7fMHz' % (channel_width / 1e6)
# Make sure there are LSRK selections if cont.dat/lines.dat exist.
# For ALMA this is already done at the hif_makeimlist step. For VLASS
# this does not (yet) happen in hif_editimlist.
sourcename = self.image_heuristics.get_sourcename(inputs.vis, inputs.field, inputs.intent)
if inputs.spwsel_lsrk == {}:
all_continuum = True
low_bandwidth = True
low_spread = True
for spwid in inputs.spw.split(','):
cont_ranges_spwsel, all_continuum_spwsel, low_bandwidth_spwsel, low_spread_spwsel = self.image_heuristics.cont_ranges_spwsel()
spwsel_spwid = cont_ranges_spwsel.get(sourcename, {}).get(spwid, 'NONE')
all_continuum = all_continuum and all_continuum_spwsel.get(sourcename, {}).get(spwid, False)
low_bandwidth = low_bandwidth and low_bandwidth_spwsel.get(sourcename, {}).get(spwid, False)
low_spread = low_spread and low_spread_spwsel.get(sourcename, {}).get(spwid, False)
if inputs.intent == 'TARGET':
if (spwsel_spwid == 'NONE') and self.image_heuristics.warn_missing_cont_ranges():
LOG.warning('No continuum frequency range information detected for %s, spw %s.' % (inputs.field,
spwid))
if spwsel_spwid in ('ALL', 'ALLCONT', '', 'NONE'):
if self.image_heuristics.is_eph_obj(inputs.field):
spwsel_spwid_refer = 'SOURCE'
else:
spwsel_spwid_refer = 'LSRK'
else:
_, spwsel_spwid_refer = spwsel_spwid.split()
if spwsel_spwid_refer not in ('LSRK', 'SOURCE'):
LOG.warning('Frequency selection is specified in %s but must be in LSRK or SOURCE' %
spwsel_spwid_refer)
inputs.spwsel_lsrk['spw%s' % spwid] = spwsel_spwid
inputs.spwsel_all_cont = all_continuum
inputs.spwsel_low_bandwidth = low_bandwidth
inputs.spwsel_low_spread = low_spread
# Get TOPO frequency ranges for all MSs
(spw_topo_freq_param, _, _, spw_topo_chan_param_dict, _, _, self.aggregate_lsrk_bw) = self.image_heuristics.calc_topo_ranges(inputs)
# Save continuum frequency ranges for later.
if (inputs.specmode == 'cube') and (inputs.spwsel_lsrk.get('spw%s' % inputs.spw, None) not in (None,
'NONE', '')):
self.cont_freq_ranges = inputs.spwsel_lsrk['spw%s' % inputs.spw].split()[0]
else:
self.cont_freq_ranges = ''
# Get sensitivity
sens_reffreq = None
if inputs.sensitivity is not None:
# Override with manually set value
sensitivity = qaTool.convert(inputs.sensitivity, 'Jy')['value']
eff_ch_bw = 1.0
else:
# Get a noise estimate from the CASA sensitivity calculator
(sensitivity, eff_ch_bw, _, sens_reffreq, per_spw_cont_sensitivities_all_chan) = \
self.image_heuristics.calc_sensitivities(inputs.vis, inputs.field, inputs.intent, inputs.spw,
inputs.nbin, spw_topo_chan_param_dict, inputs.specmode,
inputs.gridder, inputs.cell, inputs.imsize, inputs.weighting,
inputs.robust, inputs.uvtaper,
known_sensitivities=per_spw_cont_sensitivities_all_chan,
force_calc=inputs.calcsb, calc_reffreq=True)
if sensitivity is None:
LOG.error('Could not calculate the sensitivity for Field %s Intent %s SPW %s' % (inputs.field,
inputs.intent, inputs.spw))
error_result = TcleanResult(vis=inputs.vis,
sourcename=inputs.field,
intent=inputs.intent,
spw=inputs.spw,
specmode=inputs.specmode,
imaging_mode=self.image_heuristics.imaging_mode)
error_result.error = '%s/%s/spw%s clean error: no sensitivity' % (inputs.field, inputs.intent, inputs.spw)
return error_result
# PIPE-2130: for the VLA-PI workflow only, determine the optimal reffreq value from spw center frequencies
# weighted by predicted per-spw sensitivity.
# This is only triggered if all below conditions meet:
# * reffreq is not specified in the Tclean/input (from MakeImList or Editimlist)
# * deconvolver is mtmfs
# * nterms>=2 or CASA default nterms=None
if (
self.image_heuristics.imaging_mode in {"VLA", "VLA-SCAL"}
and inputs.specmode == "cont"
and inputs.deconvolver == "mtmfs"
and (inputs.nterms is None or inputs.nterms >= 2)
and inputs.reffreq is None
and sens_reffreq is not None
):
# Set reffreq in GHz
inputs.reffreq = f"{sens_reffreq/1e9}GHz"
# Choose TOPO frequency selections
if inputs.specmode != 'cube':
inputs.spwsel_topo = spw_topo_freq_param
else:
inputs.spwsel_topo = ['%s' % inputs.spw] * len(inputs.vis)
if inputs.tlimit:
tlimit = inputs.tlimit
else:
# Initial tlimit for iter0
tlimit = self.image_heuristics.tlimit(0, inputs.field, inputs.intent, inputs.specmode, 0.0)
# Determine threshold
if inputs.hm_cleaning == 'manual':
threshold = inputs.threshold
elif inputs.hm_cleaning == 'sensitivity':
raise Exception('sensitivity threshold not yet implemented')
elif inputs.hm_cleaning == 'rms':
if inputs.threshold not in (None, '', 0.0):
threshold = inputs.threshold
else:
threshold = '%.3gJy' % (tlimit * sensitivity)
else:
raise Exception('hm_cleaning mode {} not recognized. '
'Threshold not set.'.format(inputs.hm_cleaning))
multiterm = inputs.nterms if inputs.deconvolver == 'mtmfs' else None
# Choose sequence manager
# Central mask based on PB
if inputs.hm_masking == 'centralregion':
sequence_manager = ImageCentreThresholdSequence(multiterm=multiterm,
gridder=inputs.gridder, threshold=threshold,
sensitivity=sensitivity, niter=inputs.niter)
# Auto-boxing
elif inputs.hm_masking == 'auto' and self.image_heuristics.imaging_mode == 'VLA':
sequence_manager = VlaAutoMaskThresholdSequence(multiterm=multiterm,
gridder=inputs.gridder, threshold=threshold,
sensitivity=sensitivity, niter=inputs.niter)
# Auto-boxing-selfcal
elif inputs.hm_masking == 'auto' and '-SCAL' in self.image_heuristics.imaging_mode:
sequence_manager = AutoScalThresholdSequence(multiterm=multiterm,
gridder=inputs.gridder, threshold=threshold,
sensitivity=sensitivity, niter=inputs.niter)
# Auto-boxing
elif inputs.hm_masking == 'auto':
sequence_manager = AutoMaskThresholdSequence(multiterm=multiterm,
gridder=inputs.gridder, threshold=threshold,
sensitivity=sensitivity, niter=inputs.niter)
# VLASS-SE masking
# Intended to cover VLASS-SE-CONT, VLASS-SE-CONT-AWP-P001, VLASS-SE-CONT-AWP-P032 as of 01.03.2021
elif inputs.hm_masking == 'manual' and \
(self.image_heuristics.imaging_mode.startswith('VLASS-SE-CONT') or self.image_heuristics.imaging_mode.startswith('VLASS-SE-CUBE')):
sequence_manager = VlassMaskThresholdSequence(multiterm=multiterm, mask=inputs.mask,
gridder=inputs.gridder, threshold=threshold,
sensitivity=sensitivity, niter=inputs.niter, executor=self._executor)
# Manually supplied mask
elif inputs.hm_masking == 'manual':
sequence_manager = ManualMaskThresholdSequence(multiterm=multiterm, mask=inputs.mask,
gridder=inputs.gridder, threshold=threshold,
sensitivity=sensitivity, niter=inputs.niter)
# Re-used Stokes I mask
elif inputs.hm_masking == 're-use':
sequence_manager = ReuseMaskThresholdSequence(multiterm=multiterm, mask=inputs.mask,
gridder=inputs.gridder, threshold=threshold,
sensitivity=sensitivity, niter=inputs.niter)
# No mask
elif inputs.hm_masking == 'none':
sequence_manager = NoMaskThresholdSequence(multiterm=multiterm,
gridder=inputs.gridder, threshold=threshold,
sensitivity=sensitivity, niter=inputs.niter)
else:
raise Exception('hm_masking mode {} not recognized. '
'Sequence manager not set.'.format(inputs.hm_masking))
# VLASS-SE-CONT mode currently needs a non-standard cleaning workflow
# due to limitations in CASA: PSFs for an awproject mosaic image is
# not optimal. Thus, PSFs need to be created with the tclean parameter
# wbawp set to False. The awproject mosaic cleaning then continued
# with this PSF. CASA is expected to handle this with version 6.2.
if self.image_heuristics.imaging_mode.startswith('VLASS-SE-'):
result = self._do_iterative_vlass_se_imaging(sequence_manager=sequence_manager)
elif '-SCAL' in self.image_heuristics.imaging_mode:
result = self._do_scal_imaging(sequence_manager=sequence_manager)
else:
result = self._do_iterative_imaging(sequence_manager=sequence_manager)
# The updated sensitivity dictionary needs to be transported via the
# result object so that one can update the context later on in the
# merge_with_context method (direct updates of the context do not
# work since we are working on copies and since the HPC case will
# have multiple instances which would overwrite each others results).
# This result object is, however, only created in cleanbase.py while
# we have the dictionary already here in tclean.py. Thus one has to
# set this property only after getting the final result object.
result.per_spw_cont_sensitivities_all_chan = per_spw_cont_sensitivities_all_chan
# Record aggregate LSRK bandwidth and mosaic field sensitivities for weblog
# TODO: Record total bandwidth as opposed to range
# Save channel selection in result for weblog.
result.set_aggregate_bw(self.aggregate_lsrk_bw)
result.set_eff_ch_bw(eff_ch_bw)
result.synthesized_beams = self.known_synthesized_beams
result.bl_ratio = inputs.bl_ratio
return result
[docs]
def analyse(self, result):
# Perform QA here if this is a sub-task
context = self.inputs.context
pipelineqa.qa_registry.do_qa(context, result)
return result
def _do_iterative_vlass_se_imaging(self, sequence_manager):
"""VLASS-SE-CONT imaging mode specific cleaning and imaging cycle
This method implements the following workflow:
1a) Compute PSF with cfcache_nowb (wbapw=False)
1b) Create a copy of the PSF, delete products from 1a
2) Initialise TClean with cfcache (wbapw=True)
3) Replace 2) PSF with 1) PSF
4) Continue imaging, clean according heuristics
The method is base on _do_iterative_imaging(), but cube imaging support
is removed.
"""
inputs = self.inputs
qaTool = casa_tools.quanta
# Items for image header for manifest
obspatt = None
arrays = self.image_heuristics.arrays()
if inputs.nbin not in (None, -1):
modifier = f'binned{inputs.nbin}'
else:
modifier = ''
session = ','.join(inputs.context.observing_run.get_ms(_vis).session for _vis in inputs.vis)
# Local list from input masks
if type(inputs.mask) is list:
vlass_masks = inputs.mask
else:
vlass_masks = [inputs.mask]
cfcache = inputs.cfcache
# awproject gridder specific case: replace iter0 PSF with a PSF computed with wbawp=False
if inputs.gridder == 'awproject':
# Store CFCache references in local variables
if inputs.cfcache_nowb:
cfcache_nowb = inputs.cfcache_nowb
else:
raise Exception("wbapw=True CFCache is necessary in this imaging mode "
"but it was not defined.")
# Compute PSF only
LOG.info('Computing PSF with wbawp=False')
iteration = 0
# Replace main CFCache reference with the wbawp=False cache
inputs.cfcache = cfcache_nowb
result_psf = self._do_clean(iternum=iteration, cleanmask='', niter=0, threshold='0.0mJy',
sensitivity=sequence_manager.sensitivity, result=None, calcres=False,
wbawp=False)
# Rename PSF before they are overwritten in the text TClean call
self._replace_psf(result_psf.psf, result_psf.psf.replace('iter%s' % iteration, 'tmp'))
# Delete any old files with this naming root
rootname, _ = os.path.splitext(result_psf.psf)
rootname, _ = os.path.splitext(rootname)
self.rm_iter_files(rootname, iteration)
# Compute the dirty image
LOG.info('Initialise tclean iter 0')
iteration = 0
if hasattr(self.image_heuristics, 'restore_startmodel') and isinstance(self.image_heuristics.restore_startmodel, (str, list)):
restore_startmodel = self.image_heuristics.restore_startmodel
restore_imagename = self.image_heuristics.restore_imagename
calcres_iter0 = False
LOG.info(f'set calcres=False for the tclean initilization call because we are going to restore the model column.')
LOG.info(f'Will use startmodel: {restore_startmodel}, for the final modelcolumn prediction.')
else:
restore_imagename = restore_startmodel = calcres_iter0 = None
# Restore main CFCache reference / initialise tclean
inputs.cfcache = cfcache
result = self._do_clean(iternum=iteration, cleanmask='', niter=0, threshold='0.0mJy',
sensitivity=sequence_manager.sensitivity, result=None, calcres=calcres_iter0)
if inputs.gridder == 'awproject':
# only run for AWPproject, see VLASS memo 15 Sec2.2.
LOG.info('Replacing PSF with wbawp=False PSF')
# Remove *psf.tmp.* files with clear_origin=True argument
self._replace_psf(result_psf.psf.replace('iter%s' % iteration, 'tmp'), result.psf, clear_origin=False)
del result_psf # Not needed in further steps
if restore_imagename is not None:
# PIPE-1354: this block will only be executed for the VLASS selfcal modelcolumn restoration (from hifv_restorepims)
#
# As of CASA 6.4.1, if selfcal model images are made with mpicasa and later fed to a predict-only serial tclean call
# using "startmodel", the csys latpoles mismatch from CAS-13338 will cause the input model images to be regridded
# in-flight: a side effect not desired in the VLASS-SE-CUBE case.
# for now we make a copy of models under the tclean-predict imagename, and
# you should see a resetting warning instead:
# WARN SIImageStore::Open existing Images (file src/code/synthesis/ImagerObjects/SIImageStore.cc, line 568)
# Mismatch in Csys latpoles between existing image on disk ([350.792, 50.4044, 180, 50.4044])
# The DirectionCoordinates have differing latpoles -- Resetting to match image on disk
# note: we preassume that restore_imagename and new_pname are different.
rootname, _ = os.path.splitext(result.psf)
rootname, _ = os.path.splitext(rootname)
LOG.info('Copying model images for the modelcolumn prediction tclean call.')
new_pname = f'{rootname}.iter{iteration}'
self.copy_products(restore_imagename, os.path.basename(new_pname))
restore_startmodel = None
if calcres_iter0 is None or calcres_iter0:
# tclean results assessment only happen when calcres=True or implicit default (None) for iter0
# Determine masking limits depending on PB
extension = '.tt0' if result.multiterm else ''
self.pblimit_image, self.pblimit_cleanmask = self.image_heuristics.pblimits(result.flux + extension)
# Keep pblimits for mom8_fc QA statistics and score (PIPE-704)
result.set_pblimit_image(self.pblimit_image)
result.set_pblimit_cleanmask(self.pblimit_cleanmask)
# Give the result to the sequence_manager for analysis
(residual_cleanmask_rms, # printed
residual_non_cleanmask_rms, # printed
residual_min, # printed
residual_max, # USED
nonpbcor_image_non_cleanmask_rms_min, # added to result, later used in Weblog under name 'image_rms_min'
nonpbcor_image_non_cleanmask_rms_max, # added to result, later used in Weblog under name 'image_rms_max'
nonpbcor_image_non_cleanmask_rms, # printed added to result, later used in Weblog under name 'image_rms'
pbcor_image_min, # added to result, later used in Weblog under name 'image_min'
pbcor_image_max, # added to result, later used in Weblog under name 'image_max'
# USED
residual_robust_rms,
nonpbcor_image_robust_rms_and_spectra,
pbcor_image_min_iquv,
pbcor_image_max_iquv,
nonpbcor_image_non_cleanmask_rms_min_iquv,
nonpbcor_image_non_cleanmask_rms_max_iquv,
nonpbcor_image_non_cleanmask_rms_iquv) = \
sequence_manager.iteration_result(model=result.model,
restored=result.image, residual=result.residual,
flux=result.flux, cleanmask=None,
pblimit_image=self.pblimit_image,
pblimit_cleanmask=self.pblimit_cleanmask,
cont_freq_ranges=self.cont_freq_ranges)
LOG.info('Dirty image stats')
LOG.info(' Residual rms: %s', residual_non_cleanmask_rms)
LOG.info(' Residual max: %s', residual_max)
LOG.info(' Residual min: %s', residual_min)
LOG.info(' Residual scaled MAD: %s', residual_robust_rms)
LOG.info('Continue cleaning')
# Continue iterating (calcpsf and calcres are set automatically false)
iteration = 1
# Do not reuse mask from earlier iteration stage. <imagename>.iter<n>.mask (copy of mask from iteration n-1)
# would lead to collision with new_cleanmask of nth iteration. VLASS-SE-CONT uses two different masks.
do_not_copy_mask = True
for mask in vlass_masks:
# Create the name of the next clean mask from the root of the
# previous residual image.
rootname, ext = os.path.splitext(result.residual)
rootname, ext = os.path.splitext(rootname)
# Delete any old files with this naming root
self.rm_iter_files(rootname, iteration)
if self.image_heuristics.imaging_mode == 'VLASS-SE-CUBE':
# PIPE-1401: copy input user masks (vlass-se-cont tier1/tier2) for imaging of each spw group.
# Because various metadata (e.g., spw list) are written into headers of mask images and later used by the
# weblog display/renderer class, we have to differentiate in-use mask files of individual spw groups, even
# they are identical. Here, we make mask copy per clean_target (i.e. a spw group of VLASS-SE-CUBE.)
# under its distinct name.
if mask in ['', None, 'pb']:
new_cleanmask = mask
else:
# note: this "new_cleanmask" name must be different from imagename+'.mask'.
# 1) tclean() doesn't support specifying usemask='user' and mask=imagename+'.mask' (pre-exists) at the same time.
# 2) the vlass-se-cont mask is Stokes-I only and needs to be "regridded" (i.e. broadcasted) in the Pol. axis.
# Therefore a "restart" by copying the user mask under imagename+'.mask' is also not supported in tclean() as of ver 6.4.1.
new_cleanmask = f'{rootname}.iter{iteration}.cleanmask'
else:
# Determine stage mask name and replace stage substring place holder with actual stage number.
# Special cases when mask is an empty string, None, or when it is set to 'pb'.
new_cleanmask = mask if mask in ['', None, 'pb'] else 's{:d}_0.{}'.format(
self.inputs.context.task_counter, re.sub('s[0123456789]+_[0123456789]+.', '', mask, 1))
threshold = self.image_heuristics.threshold(iteration, sequence_manager.threshold, inputs.hm_masking)
nsigma = self.image_heuristics.nsigma(iteration, inputs.hm_nsigma, inputs.hm_masking)
seq_result = sequence_manager.iteration(new_cleanmask, self.pblimit_image,
self.pblimit_cleanmask, iteration=iteration)
# Use previous iterations's products as starting point
old_pname = '%s.iter%s' % (rootname, iteration - 1)
new_pname = '%s.iter%s' % (rootname, iteration)
if self.image_heuristics.imaging_mode == 'VLASS-SE-CUBE':
# PIPE-1401: We move (instead of copy) most imaging products from one iteration to the next,
# to reduce the disk I/O operations and storage use.
self.move_products(os.path.basename(old_pname), os.path.basename(new_pname),
ignore_list=['.cleanmask'] if do_not_copy_mask else [],
copy_list=['.residual', '.image'],
remove_list=['.mask'] if do_not_copy_mask else [])
else:
self.copy_products(os.path.basename(old_pname), os.path.basename(new_pname),
ignore='mask' if do_not_copy_mask else None)
LOG.info('Iteration %s: Clean control parameters' % iteration)
LOG.info(' Mask %s', new_cleanmask)
LOG.info(' Threshold %s', threshold)
LOG.info(' Niter %s', seq_result.niter)
# The user_cycleniter_final_image_nomask ImageParamsHeuristicsVlassSeCont class variable should be
# prioritised over cycleniter parameter set by the user when cleaning without a user mask (see PIPE-978
# and PIPE-977). The heuristic method cycleniter() takes care of this, but in order to be triggered, the
# cycleniter parameter should be reset to None for the specific cases.
if mask == 'pb':
cycleniter = inputs.cycleniter
inputs.cycleniter = None
result = self._do_clean(iternum=iteration, cleanmask=new_cleanmask, niter=seq_result.niter, nsigma=nsigma,
threshold=threshold, sensitivity=sequence_manager.sensitivity, result=result)
# Restore cycleniter if needed
if mask == 'pb':
inputs.cycleniter = cycleniter
# Give the result to the clean 'sequencer'
(residual_cleanmask_rms,
residual_non_cleanmask_rms,
residual_min,
residual_max,
nonpbcor_image_non_cleanmask_rms_min,
nonpbcor_image_non_cleanmask_rms_max,
nonpbcor_image_non_cleanmask_rms,
pbcor_image_min,
pbcor_image_max,
residual_robust_rms,
nonpbcor_image_robust_rms_and_spectra,
pbcor_image_min_iquv,
pbcor_image_max_iquv,
nonpbcor_image_non_cleanmask_rms_min_iquv,
nonpbcor_image_non_cleanmask_rms_max_iquv,
nonpbcor_image_non_cleanmask_rms_iquv) = \
sequence_manager.iteration_result(model=result.model,
restored=result.image, residual=result.residual,
flux=result.flux, cleanmask=new_cleanmask,
pblimit_image=self.pblimit_image,
pblimit_cleanmask=self.pblimit_cleanmask,
cont_freq_ranges=self.cont_freq_ranges)
# Center frequency and effective bandwidth in Hz for the image header (and thus the manifest).
ctrfrq = 0.5 * (float(qaTool.getvalue(qaTool.convert(nonpbcor_image_robust_rms_and_spectra['nonpbcor_image_non_cleanmask_freq_ch1'], 'Hz'))[0])
+ float(qaTool.getvalue(qaTool.convert(nonpbcor_image_robust_rms_and_spectra['nonpbcor_image_non_cleanmask_freq_chN'], 'Hz'))[0]))
if inputs.specmode in ('mfs', 'cont'):
effbw = float(qaTool.getvalue(qaTool.convert(self.aggregate_lsrk_bw, 'Hz'))[0])
else:
msobj = self.inputs.context.observing_run.get_ms(name=inputs.vis[0])
nbin = inputs.nbin if inputs.nbin is not None and inputs.nbin > 0 else 1
_, _, _, effbw = self.image_heuristics.get_bw_corr_factor(msobj, inputs.spw, nbin)
self._update_miscinfo(imagename=result.image.replace('.image', '.image' + extension),
nfield=max([len(field_ids.split(',')) for field_ids in self.image_heuristics.field(inputs.intent, inputs.field)]),
datamin=pbcor_image_min, datamax=pbcor_image_max, datarms=nonpbcor_image_non_cleanmask_rms, stokes=inputs.stokes,
effbw=effbw, ctrfrq=ctrfrq)
# Keep image cleanmask area min and max and non-cleanmask area RMS for weblog and QA
result.set_image_min(pbcor_image_min)
result.set_image_min_iquv(pbcor_image_min_iquv)
result.set_image_max(pbcor_image_max)
result.set_image_max_iquv(pbcor_image_max_iquv)
result.set_image_rms(nonpbcor_image_non_cleanmask_rms)
result.set_image_rms_iquv(nonpbcor_image_non_cleanmask_rms_iquv)
result.set_image_rms_min(nonpbcor_image_non_cleanmask_rms_min)
result.set_image_rms_min_iquv(nonpbcor_image_non_cleanmask_rms_min_iquv)
result.set_image_rms_max(nonpbcor_image_non_cleanmask_rms_max)
result.set_image_rms_max_iquv(nonpbcor_image_non_cleanmask_rms_max_iquv)
result.set_image_robust_rms_and_spectra(nonpbcor_image_robust_rms_and_spectra)
# Determine fractional flux outside of mask for final image (only VLASS-SE-CONT imaging stage 1)
outmaskratio = self.image_heuristics.get_outmaskratio(iteration, result.image + extension,
re.sub(r'\.image$', '.pb', result.image) + extension,
new_cleanmask)
result.set_outmaskratio(iteration, outmaskratio)
LOG.info('Clean image iter %s stats' % iteration)
LOG.info(' Clean image annulus area rms: %s', nonpbcor_image_non_cleanmask_rms)
LOG.info(' Clean image min: %s', pbcor_image_min)
LOG.info(' Clean image max: %s', pbcor_image_max)
LOG.info(' Residual annulus area rms: %s', residual_non_cleanmask_rms)
LOG.info(' Residual cleanmask area rms: %s', residual_cleanmask_rms)
LOG.info(' Residual max: %s', residual_max)
LOG.info(' Residual min: %s', residual_min)
# Up the iteration counter
iteration += 1
# Save predicted model visibility columns to MS at the end of the first imaging stage (see heuristic method).
savemodel = self.image_heuristics.savemodel(iteration-1)
if savemodel:
LOG.info("Saving predicted model visibilities to MeasurementSet after last iteration (iter %s)" %
(iteration-1))
_ = self._do_clean(iternum=iteration-1, cleanmask='', niter=0, threshold='0.0mJy',
sensitivity=sequence_manager.sensitivity, savemodel=savemodel, startmodel=restore_startmodel,
result=None, calcpsf=False, calcres=False, parallel=False)
return result
def _replace_psf(self, origin: str, target: str, clear_origin: bool = False):
"""Replace multi-term CASA image.
The <origin>.tt0, <origin>.tt1 and <origin>.tt2 images are copied or moved
to <target>.tt0, <target>.tt1 and <target>.tt2 images.
If clear_origin argument is True then after copying the <origin>.tt* files
they will be deleted.
"""
for tt in ['tt0', 'tt1', 'tt2']:
target_psf = f'{target}.{tt}'
origin_psf = f'{origin}.{tt}'
if os.path.isdir(target_psf):
self._executor.execute(casa_tasks.rmtree(target_psf, ignore_errors=False))
if clear_origin:
LOG.info(f'Moving {origin_psf} to {target_psf}')
self._executor.execute(casa_tasks.move(origin_psf, target_psf))
else:
LOG.info(f'Copying {origin_psf} to {target_psf}')
self._executor.execute(casa_tasks.copytree(origin_psf, target_psf))
def _do_scal_imaging(self, sequence_manager):
"""Do self-calibration imaging sequence.
This method produces the optimal selfcal solution via the iterative imaging-selfcal loop.
It also generate before-scal/after-scal. The input MS is assumed to be calibrated via
standard calibration procedures but they have been splitted from the original *_targets.ms
and rebinned in frequency.
"""
raise NotImplementedError("The self-calibration imaging/gaincal loop is not implemented yet!")
def _do_iterative_imaging(self, sequence_manager):
inputs = self.inputs
qaTool = casa_tools.quanta
# Items for image header for manifest
obspatt = 'mos' if self.image_heuristics.is_mosaic(inputs.field, inputs.intent) else 'sf'
arrays = self.image_heuristics.arrays()
if inputs.nbin not in (None, -1):
modifier = f'binned{inputs.nbin}'
else:
modifier = ''
session = ','.join(inputs.context.observing_run.get_ms(_vis).session for _vis in inputs.vis)
# Compute the dirty image
LOG.info('Compute the dirty image')
iteration = 0
result = self._do_clean(iternum=iteration, cleanmask='', niter=0, threshold='0.0mJy',
sensitivity=sequence_manager.sensitivity, result=None)
# PIPE-1790: skip any further processing in case of errors (i.e. tclean failed to produce an image)
if result.error:
return result
# Check for bad PSF fit
bad_psf_channels = None
if inputs.specmode == 'cube':
bad_psf_fit = self.image_heuristics.check_psf(result.psf, inputs.field, inputs.spw)
if bad_psf_fit:
restoringbeam_sugguested, bad_psf_channels = self.image_heuristics.restoringbeam_from_psf(
result.psf, inputs.field, inputs.spw
)
if restoringbeam_sugguested is not None:
restoringbeam_major_arcsec = qaTool.getvalue(
qaTool.convert(restoringbeam_sugguested['major'], 'arcsec'))[0]
restoringbeam_minor_arcsec = qaTool.getvalue(
qaTool.convert(restoringbeam_sugguested['minor'], 'arcsec'))[0]
restoringbeam_pa_deg = qaTool.getvalue(qaTool.convert(restoringbeam_sugguested['pa'], 'deg'))[0]
LOG.info(
'Adopting the restoring beam recommended by restoringbeam_from_psf() for Field %s SPW %s with %#.3g x %#.3g arcsec @ %.1f deg',
inputs.field,
inputs.spw,
restoringbeam_major_arcsec,
restoringbeam_minor_arcsec,
restoringbeam_pa_deg,
)
inputs.restoringbeam = [
f'{restoringbeam_major_arcsec:#.3g}arcsec',
f'{restoringbeam_minor_arcsec:#.3g}arcsec',
f'{restoringbeam_pa_deg:.1f}deg',
]
# Determine masking limits depending on PB
extension = '.tt0' if result.multiterm else ''
self.pblimit_image, self.pblimit_cleanmask = self.image_heuristics.pblimits(result.flux+extension, specmode=inputs.specmode)
# Keep pblimits for mom8_fc QA statistics and score (PIPE-704)
result.set_pblimit_image(self.pblimit_image)
result.set_pblimit_cleanmask(self.pblimit_cleanmask)
# Give the result to the sequence_manager for analysis
(residual_cleanmask_rms, # printed
residual_non_cleanmask_rms, # printed
residual_min, # printed
residual_max, # USED
nonpbcor_image_non_cleanmask_rms_min, # added to result, later used in Weblog under name 'image_rms_min'
nonpbcor_image_non_cleanmask_rms_max, # added to result, later used in Weblog under name 'image_rms_max'
nonpbcor_image_non_cleanmask_rms, # printed added to result, later used in Weblog under name 'image_rms'
pbcor_image_min, # added to result, later used in Weblog under name 'image_min'
pbcor_image_max, # added to result, later used in Weblog under name 'image_max'
# USED
residual_robust_rms,
nonpbcor_image_robust_rms_and_spectra,
pbcor_image_min_iquv,
pbcor_image_max_iquv,
nonpbcor_image_non_cleanmask_rms_min_iquv,
nonpbcor_image_non_cleanmask_rms_max_iquv,
nonpbcor_image_non_cleanmask_rms_iquv) = \
sequence_manager.iteration_result(model=result.model,
restored=result.image, residual=result.residual,
flux=result.flux, cleanmask=None,
pblimit_image=self.pblimit_image,
pblimit_cleanmask=self.pblimit_cleanmask,
cont_freq_ranges=self.cont_freq_ranges)
LOG.info('Dirty image stats')
LOG.info(' Residual rms: %s', residual_non_cleanmask_rms)
LOG.info(' Residual max: %s', residual_max)
LOG.info(' Residual min: %s', residual_min)
LOG.info(' Residual scaled MAD: %s', residual_robust_rms)
# Determine if we keep iterating past the dirty image. This is currently the case for
# 1) All continuum case
# 2) TARGET IQUV imaging without previous auto-mask from a stokes I imaging stage
if (inputs.specmode == 'cube' and inputs.spwsel_all_cont) or \
(inputs.intent == 'TARGET' and inputs.stokes == 'IQUV' and inputs.mask in (None, '')):
# Center frequency and effective bandwidth in Hz for the image header (and thus the manifest).
ctrfrq = 0.5 * (float(qaTool.getvalue(qaTool.convert(nonpbcor_image_robust_rms_and_spectra['nonpbcor_image_non_cleanmask_freq_ch1'], 'Hz'))[0])
+ float(qaTool.getvalue(qaTool.convert(nonpbcor_image_robust_rms_and_spectra['nonpbcor_image_non_cleanmask_freq_chN'], 'Hz'))[0]))
if inputs.specmode in ('mfs', 'cont'):
effbw = float(qaTool.getvalue(qaTool.convert(self.aggregate_lsrk_bw, 'Hz'))[0])
else:
msobj = self.inputs.context.observing_run.get_ms(name=inputs.vis[0])
nbin = inputs.nbin if inputs.nbin is not None and inputs.nbin > 0 else 1
_, _, _, effbw = self.image_heuristics.get_bw_corr_factor(msobj, inputs.spw, nbin)
self._update_miscinfo(imagename=result.image.replace('.image', '.image'+extension),
nfield=max([len(field_ids.split(',')) for field_ids in self.image_heuristics.field(inputs.intent, inputs.field)]),
datamin=pbcor_image_min, datamax=pbcor_image_max, datarms=nonpbcor_image_non_cleanmask_rms, stokes=inputs.stokes,
effbw=effbw, level='member', ctrfrq=ctrfrq, obspatt=obspatt, arrays=arrays, modifier=modifier, session=session)
# PIPE-2211: Update some keywords for manifest usage on all other imaging products
for im_name in result.im_names.values():
if os.path.exists(im_name):
self._update_miscinfo(imagename=im_name, stokes=inputs.stokes, level='member', obspatt=obspatt, arrays=arrays, modifier=modifier, session=session)
result.set_image_min(pbcor_image_min)
result.set_image_min_iquv(pbcor_image_min_iquv)
result.set_image_max(pbcor_image_max)
result.set_image_max_iquv(pbcor_image_max_iquv)
result.set_image_rms(nonpbcor_image_non_cleanmask_rms)
result.set_image_rms_iquv(nonpbcor_image_non_cleanmask_rms_iquv)
result.set_image_rms_min(nonpbcor_image_non_cleanmask_rms_min)
result.set_image_rms_min_iquv(nonpbcor_image_non_cleanmask_rms_min_iquv)
result.set_image_rms_max(nonpbcor_image_non_cleanmask_rms_max)
result.set_image_rms_max_iquv(nonpbcor_image_non_cleanmask_rms_max_iquv)
result.set_image_robust_rms_and_spectra(nonpbcor_image_robust_rms_and_spectra)
if inputs.specmode == 'cube' and inputs.spwsel_all_cont:
result.cube_all_cont = True
keep_iterating = False
else:
keep_iterating = True
if inputs.hm_cleaning in ('manual', 'rms'):
# Adjust threshold based on the dirty image Stokes I residual maximum.
dirty_dynamic_range = None if sequence_manager.sensitivity == 0.0 else residual_max / sequence_manager.sensitivity
tlimit = self.image_heuristics.tlimit(1, inputs.field, inputs.intent, inputs.specmode, dirty_dynamic_range)
new_threshold, DR_correction_factor, maxEDR_used = \
self.image_heuristics.dr_correction(sequence_manager.threshold, dirty_dynamic_range, residual_max,
inputs.intent, tlimit, inputs.drcorrect)
if inputs.hm_cleaning != 'manual':
sequence_manager.threshold = new_threshold
sequence_manager.dr_corrected_sensitivity = sequence_manager.sensitivity * DR_correction_factor
# Adjust niter based on the dirty image statistics
new_niter = self.image_heuristics.niter_correction(sequence_manager.niter, inputs.cell, inputs.imsize,
residual_max, new_threshold, residual_robust_rms, intent=inputs.intent)
sequence_manager.niter = new_niter
# Save corrected sensitivity in iter0 result object if there is no further iteration.
if not keep_iterating:
result.set_dirty_dynamic_range(dirty_dynamic_range)
result.set_DR_correction_factor(DR_correction_factor)
result.set_maxEDR_used(maxEDR_used)
result.set_dr_corrected_sensitivity(sequence_manager.dr_corrected_sensitivity)
iteration = 1
do_not_copy_mask = False
while keep_iterating:
# Create the name of the next clean mask from the root of the
# previous residual image.
rootname, _ = os.path.splitext(result.residual)
rootname, _ = os.path.splitext(rootname)
# Delete any old files with this naming root
self.rm_iter_files(rootname, iteration)
if inputs.hm_masking == 'auto':
new_cleanmask = '%s.iter%s.mask' % (rootname, iteration)
elif inputs.hm_masking == 'manual':
# Note that this will only work if the name of the manual
# mask does not happen to be '%s.iter%s.mask' % (rootname, iteration)
# which will usually be the case.
new_cleanmask = inputs.mask
elif inputs.hm_masking == 're-use':
# Note the same issue reported in the VLA image iteration above about
# tclean not accepting a user mask of the same name as it would create
# itself. Hence ".cleanmask" instead of ".mask".
new_cleanmask = '%s.iter%s.cleanmask' % (rootname, iteration)
elif inputs.hm_masking == 'none':
new_cleanmask = ''
else:
new_cleanmask = '%s.iter%s.cleanmask' % (rootname, iteration)
# perform an iteration.
if (inputs.specmode == 'cube') and (not inputs.cleancontranges):
seq_result = sequence_manager.iteration(new_cleanmask, self.pblimit_image,
self.pblimit_cleanmask, inputs.spw, inputs.spwsel_lsrk,
iteration=iteration)
else:
seq_result = sequence_manager.iteration(new_cleanmask, self.pblimit_image,
self.pblimit_cleanmask, iteration=iteration)
# Use previous iterations's products as starting point
old_pname = '%s.iter%s' % (rootname, iteration-1)
new_pname = '%s.iter%s' % (rootname, iteration)
self.copy_products(os.path.basename(old_pname), os.path.basename(new_pname),
ignore='mask' if do_not_copy_mask else None)
threshold = self.image_heuristics.threshold(iteration, sequence_manager.threshold, inputs.hm_masking)
savemodel = self.image_heuristics.savemodel(iteration)
niter = self.image_heuristics.niter_by_iteration(iteration, inputs.hm_masking, seq_result.niter)
if inputs.cyclefactor not in (None, -999):
cyclefactor = inputs.cyclefactor
else:
cyclefactor = self.image_heuristics.cyclefactor(
iteration, inputs.field, inputs.intent, inputs.specmode, dirty_dynamic_range)
LOG.info('Iteration %s: Clean control parameters' % iteration)
LOG.info(' Mask %s', new_cleanmask)
LOG.info(' Threshold %s', threshold)
LOG.info(' Niter %s', niter)
result = self._do_clean(iternum=iteration, cleanmask=new_cleanmask, niter=niter, nsigma=inputs.hm_nsigma,
threshold=threshold, sensitivity=sequence_manager.sensitivity, savemodel=savemodel,
result=result, cyclefactor=cyclefactor)
if result.image is None:
if not result.error:
result.error = '%s/%s/spw%s clean error: failed to produce an image' % (inputs.field, inputs.intent, inputs.spw)
return result
# Give the result to the clean 'sequencer'
(residual_cleanmask_rms,
residual_non_cleanmask_rms,
residual_min,
residual_max,
nonpbcor_image_non_cleanmask_rms_min,
nonpbcor_image_non_cleanmask_rms_max,
nonpbcor_image_non_cleanmask_rms,
pbcor_image_min,
pbcor_image_max,
residual_robust_rms,
nonpbcor_image_robust_rms_and_spectra,
pbcor_image_min_iquv,
pbcor_image_max_iquv,
nonpbcor_image_non_cleanmask_rms_min_iquv,
nonpbcor_image_non_cleanmask_rms_max_iquv,
nonpbcor_image_non_cleanmask_rms_iquv) = \
sequence_manager.iteration_result(model=result.model,
restored=result.image, residual=result.residual,
flux=result.flux, cleanmask=new_cleanmask,
pblimit_image=self.pblimit_image,
pblimit_cleanmask=self.pblimit_cleanmask,
cont_freq_ranges=self.cont_freq_ranges)
# Center frequency and effective bandwidth in Hz for the image header (and thus the manifest).
ctrfrq = 0.5 * (float(qaTool.getvalue(qaTool.convert(nonpbcor_image_robust_rms_and_spectra['nonpbcor_image_non_cleanmask_freq_ch1'], 'Hz'))[0])
+ float(qaTool.getvalue(qaTool.convert(nonpbcor_image_robust_rms_and_spectra['nonpbcor_image_non_cleanmask_freq_chN'], 'Hz'))[0]))
if inputs.specmode in ('mfs', 'cont'):
effbw = float(qaTool.getvalue(qaTool.convert(self.aggregate_lsrk_bw, 'Hz'))[0])
else:
msobj = self.inputs.context.observing_run.get_ms(name=inputs.vis[0])
nbin = inputs.nbin if inputs.nbin is not None and inputs.nbin > 0 else 1
_, _, _, effbw = self.image_heuristics.get_bw_corr_factor(msobj, inputs.spw, nbin)
self._update_miscinfo(imagename=result.image.replace('.image', '.image'+extension),
nfield=max([len(field_ids.split(',')) for field_ids in self.image_heuristics.field(inputs.intent, inputs.field)]),
datamin=pbcor_image_min, datamax=pbcor_image_max, datarms=nonpbcor_image_non_cleanmask_rms, stokes=inputs.stokes,
effbw=effbw, level='member', ctrfrq=ctrfrq, obspatt=obspatt, arrays=arrays, modifier=modifier, session=session)
# PIPE-2211: Update some keywords for manifest usage on all other imaging products
for im_name in result.im_names.values():
if os.path.exists(im_name):
self._update_miscinfo(imagename=im_name, stokes=inputs.stokes, level='member', obspatt=obspatt, arrays=arrays, modifier=modifier, session=session)
keep_iterating, hm_masking = self.image_heuristics.keep_iterating(iteration, inputs.hm_masking,
result.tclean_stopcode,
dirty_dynamic_range, residual_max,
residual_robust_rms,
inputs.field, inputs.intent, inputs.spw,
inputs.specmode)
do_not_copy_mask = hm_masking != inputs.hm_masking
inputs.hm_masking = hm_masking
# Keep image cleanmask area min and max and non-cleanmask area RMS for weblog and QA
result.set_image_min(pbcor_image_min)
result.set_image_min_iquv(pbcor_image_min_iquv)
result.set_image_max(pbcor_image_max)
result.set_image_max_iquv(pbcor_image_max_iquv)
result.set_image_rms(nonpbcor_image_non_cleanmask_rms)
result.set_image_rms_iquv(nonpbcor_image_non_cleanmask_rms_iquv)
result.set_image_rms_min(nonpbcor_image_non_cleanmask_rms_min)
result.set_image_rms_min_iquv(nonpbcor_image_non_cleanmask_rms_min_iquv)
result.set_image_rms_max(nonpbcor_image_non_cleanmask_rms_max)
result.set_image_rms_max_iquv(nonpbcor_image_non_cleanmask_rms_max_iquv)
result.set_image_robust_rms_and_spectra(nonpbcor_image_robust_rms_and_spectra)
# Keep dirty DR, correction factor and information about maxEDR heuristic for weblog
result.set_dirty_dynamic_range(dirty_dynamic_range)
result.set_DR_correction_factor(DR_correction_factor)
result.set_maxEDR_used(maxEDR_used)
result.set_dr_corrected_sensitivity(sequence_manager.dr_corrected_sensitivity)
LOG.info('Clean image iter %s stats' % iteration)
LOG.info(' Clean image annulus area rms: %s', nonpbcor_image_non_cleanmask_rms)
LOG.info(' Clean image min: %s', pbcor_image_min)
LOG.info(' Clean image max: %s', pbcor_image_max)
LOG.info(' Residual annulus area rms: %s', residual_non_cleanmask_rms)
LOG.info(' Residual cleanmask area rms: %s', residual_cleanmask_rms)
LOG.info(' Residual max: %s', residual_max)
LOG.info(' Residual min: %s', residual_min)
# Up the iteration counter
iteration += 1
# If specmode is "cube", create from the non-pbcorrected cube
# after continuum subtraction an image of the moment 0 / 8 integrated
# intensity for the line-free channels.
if inputs.specmode == 'cube':
# Moment maps of line-free channels
self._calc_mom0_8_10_fc(result)
# Moment maps of all channels
self._calc_mom0_8(result)
# Record any failed PSF fit channels
result.bad_psf_channels = bad_psf_channels
return result
def _do_clean(self, iternum, cleanmask, niter, threshold, sensitivity, result, nsigma=None, savemodel=None, startmodel=None,
calcres=None, calcpsf=None, wbawp=None, parallel=None, clean_imagename=None, cyclefactor=None):
"""Do basic cleaning."""
inputs = self.inputs
if parallel is None:
parallel = mpihelpers.parse_parallel_input_parameter(inputs.parallel)
# if 'start' or 'width' are defined in velocity units, convert from frequency back
# to velocity before tclean call. Added to support SRDP ALMA optimized imaging.
if self.start_as_velocity:
inputs.start = casa_tools.quanta.tos(self.start_as_velocity)
if self.width_as_velocity:
inputs.width = casa_tools.quanta.tos(self.width_as_velocity)
# optionally override the output image rootname
if isinstance(clean_imagename, str):
imagename = clean_imagename
else:
imagename = inputs.imagename
# Fallback for cases that do not set cyclefactor explicitly like
# for PIPE-1782. Alternatively, one would have to pass cyclefactor
# for all _do_clean calls.
if cyclefactor is None:
cyclefactor = inputs.cyclefactor
clean_inputs = cleanbase.CleanBase.Inputs(inputs.context,
output_dir=inputs.output_dir,
vis=inputs.vis,
is_per_eb=inputs.is_per_eb,
imagename=imagename,
antenna=inputs.antenna,
intent=inputs.intent,
field=inputs.field,
spw=inputs.spw,
spwsel=inputs.spwsel_topo,
spwsel_all_cont=inputs.spwsel_all_cont,
spwsel_low_bandwidth=inputs.spwsel_low_bandwidth,
spwsel_low_spread=inputs.spwsel_low_spread,
reffreq=inputs.reffreq,
restfreq=inputs.restfreq,
conjbeams=inputs.conjbeams,
uvrange=inputs.uvrange,
hm_specmode=inputs.hm_specmode,
specmode=inputs.specmode,
gridder=inputs.gridder,
datacolumn=inputs.datacolumn,
datatype=inputs.datatype,
datatype_info=inputs.datatype_info,
deconvolver=inputs.deconvolver,
nterms=inputs.nterms,
cycleniter=inputs.cycleniter,
cyclefactor=cyclefactor,
nmajor=inputs.nmajor,
wprojplanes=inputs.wprojplanes,
hm_minpsffraction=inputs.hm_minpsffraction,
hm_maxpsffraction=inputs.hm_maxpsffraction,
scales=inputs.scales,
outframe=inputs.outframe,
imsize=inputs.imsize,
cell=inputs.cell,
cfcache=inputs.cfcache,
phasecenter=inputs.phasecenter,
psf_phasecenter=inputs.psf_phasecenter,
stokes=inputs.stokes,
nchan=inputs.nchan,
nbin=inputs.nbin,
start=inputs.start,
width=inputs.width,
weighting=inputs.weighting,
robust=inputs.robust,
mosweight=inputs.mosweight,
uvtaper=inputs.uvtaper,
restoringbeam=inputs.restoringbeam,
iter=iternum,
mask=cleanmask,
savemodel=savemodel,
startmodel=startmodel,
hm_masking=inputs.hm_masking,
hm_sidelobethreshold=inputs.hm_sidelobethreshold,
hm_noisethreshold=inputs.hm_noisethreshold,
hm_lownoisethreshold=inputs.hm_lownoisethreshold,
hm_negativethreshold=inputs.hm_negativethreshold,
hm_minbeamfrac=inputs.hm_minbeamfrac,
hm_growiterations=inputs.hm_growiterations,
hm_dogrowprune=inputs.hm_dogrowprune,
hm_minpercentchange=inputs.hm_minpercentchange,
hm_fastnoise=inputs.hm_fastnoise,
niter=niter,
hm_nsigma=nsigma,
hm_perchanweightdensity=inputs.hm_perchanweightdensity,
hm_npixels=inputs.hm_npixels,
threshold=threshold,
sensitivity=sensitivity,
pblimit=inputs.pblimit,
pbmask=inputs.pbmask,
result=result,
parallel=parallel,
heuristics=inputs.image_heuristics,
calcpsf=calcpsf,
calcres=calcres,
wbawp=wbawp)
clean_task = cleanbase.CleanBase(clean_inputs)
clean_result = self._executor.execute(clean_task)
# if 'start' or 'width' were defined in velocity units, convert from velocity back
# to frequency after tclean call. Added to support SRDP ALMA optimized imaging.
if self.start_as_velocity:
inputs.start = self.start_as_frequency
if self.width_as_velocity:
inputs.width = self.width_as_frequency
return clean_result
# Remove pointing table.
def _empty_pointing_table(self):
# Concerned that simply renaming things directly
# will corrupt the table cache, so do things using only the
# table tool.
for vis in self.inputs.vis:
with casa_tools.TableReader('%s/POINTING' % vis, nomodify=False) as table:
# make a copy of the table
LOG.debug('Making copy of POINTING table')
copy = table.copy('%s/POINTING_COPY' % vis, valuecopy=True)
LOG.debug('Removing all POINTING table rows')
table.removerows(list(range(table.nrows())))
copy.done()
# Restore pointing table
def _restore_pointing_table(self):
for vis in self.inputs.vis:
# restore the copy of the POINTING table
with casa_tools.TableReader('%s/POINTING_COPY' % vis, nomodify=False) as table:
LOG.debug('Copying back into POINTING table')
original = table.copy('%s/POINTING' % vis, valuecopy=True)
original.done()
def _calc_moment_image(self, imagename=None, moments=None, outfile=None, chans=None, iter=None):
'''
Computes moment image, writes it to disk and updates moment image metadata.
This method is used in _calc_mom0_8() and _calc_mom0_8_10_fc().
'''
context = self.inputs.context
# Determine moment image type.
mom_type = ""
if ".mom0" in outfile:
mom_type += "mom0"
elif ".mom8" in outfile:
mom_type += "mom8"
elif ".mom10" in outfile:
mom_type += "mom10"
if "_fc" in outfile:
mom_type += "_fc"
# Execute job to create the MOM0/8/10_FC image.
job = casa_tasks.immoments(imagename=imagename, moments=moments, outfile=outfile, chans=chans, stokes='I')
self._executor.execute(job)
assert os.path.exists(outfile)
# Using virtual spw setups for all interferometry pipelines
virtspw = True
# Update the metadata in the MOM8_FC image.
imageheader.set_miscinfo(name=outfile, spw=self.inputs.spw, virtspw=virtspw,
field=self.inputs.field, iter=iter,
datatype=self.inputs.datatype, type=mom_type,
intent=self.inputs.intent, specmode=self.inputs.hm_specmode,
context=context)
# Calculate a "mom0_fc", "mom8_fc" and "mom10_fc: images: this is a moment
# 0 (integrated value of the spectrum), 8 (maximum value of the spectrum)
# and 10 (minimum value of the spectrum) integration over the line-free
# channels of the non-primary-beam corrected image-cube, after continuum
# subtraction; where the "line-free" channels are taken from those identified
# as continuum channels.
# This is a diagnostic plot representing the residual emission
# in the line-free (aka continuum) channels. If the continuum subtraction
# worked well, then this image should just contain noise.
def _calc_mom0_8_10_fc(self, result):
# Find max iteration that was performed.
maxiter = max(result.iterations.keys())
# Get filename of image from result, and modify to select the
# non-PB-corrected image.
extension = '.tt0' if result.multiterm else ''
imagename = result.iterations[maxiter]['image'].replace('.image', '.image%s' % (extension)).replace('.pbcor', '')
# Set output filename for MOM0_FC image.
mom0fc_name = '%s.mom0_fc' % imagename
# Set output filename for MOM8_FC image.
mom8fc_name = '%s.mom8_fc' % imagename
# Set output filename for MOM10_FC image.
mom10fc_name = '%s.mom10_fc' % imagename
# Convert frequency ranges to channel ranges.
cont_chan_ranges = utils.freq_selection_to_channels(imagename, self.cont_freq_ranges)
# Only continue if there were continuum channel ranges for this spw.
if cont_chan_ranges[0] != 'NONE':
# Create a channel ranges string.
if cont_chan_ranges[0] in ('ALL', 'ALLCONT'):
cont_chan_ranges_str = ''
cont_chan_indices = slice(None)
else:
cont_chan_ranges_str = ";".join(["%s~%s" % (ch0, ch1) for ch0, ch1 in cont_chan_ranges])
cont_chan_indices = np.hstack([np.arange(start, stop + 1) for start, stop in cont_chan_ranges])
# Calculate MOM0_FC image
self._calc_moment_image(imagename=imagename, moments=[0], outfile=mom0fc_name, chans=cont_chan_ranges_str,
iter=maxiter)
# Update the result.
result.set_mom0_fc(maxiter, mom0fc_name)
# Calculate MOM8_FC image
self._calc_moment_image(imagename=imagename, moments=[8], outfile=mom8fc_name, chans=cont_chan_ranges_str,
iter=maxiter)
# Calculate MOM10_FC image
self._calc_moment_image(imagename=imagename, moments=[10], outfile=mom10fc_name, chans=cont_chan_ranges_str,
iter=maxiter)
# Calculate the MOM8_FC peak SNR for the QA
# Create flattened PB over continuum channels
flattened_pb_name = result.flux + extension + '.flattened_mom_0_8_10_fc'
with casa_tools.ImageReader(result.flux + extension) as image:
flattened_pb = image.collapse(function='mean', axes=[2, 3], chans=cont_chan_ranges_str, outfile=flattened_pb_name)
flattened_pb.done()
# If possible, create flattened mask over continuum channels
cleanmask = result.iterations[maxiter].get('cleanmask', '')
if os.path.exists(cleanmask):
if '.mask' in cleanmask:
flattened_mask_name = cleanmask.replace('.mask', '.mask.flattened_mom_0_8_10_fc')
elif '.cleanmask' in cleanmask:
flattened_mask_name = cleanmask.replace('.cleanmask', '.cleanmask.flattened_mom_0_8_10_fc')
else:
raise 'Cannot handle clean mask name %s' % (os.path.basename(cleanmask))
with casa_tools.ImageReader(cleanmask) as image:
flattened_mask_image = image.collapse(function='max', axes=[2, 3], chans=cont_chan_ranges_str, outfile=flattened_mask_name)
flattened_mask_image.done()
else:
flattened_mask_name = None
# The PIPE-704 edge exclusion of 5% fails for smaller than regular PL imsizes because
# the fraction of the PB becomes too small. Thus covering these cases with the actual
# ratio of pblimit_cleanmask/pblimit_image.
pblimit_factor = min(1.05, utils.math.round_down(1.0 + 0.5 * (result.pblimit_cleanmask / result.pblimit_image - 1.0), 2))
# Calculate MOM8_FC statistics
with casa_tools.ImageReader(mom8fc_name) as image:
# Get the min, max, median, MAD and number of pixels of the MOM8 FC image from the area excluding the cleaned area edges (PIPE-704)
mom8_statsmask = '"{:s}" > {:f}'.format(os.path.basename(flattened_pb_name), result.pblimit_image * pblimit_factor)
mom8_stats = image.statistics(mask=mom8_statsmask, axes=[0, 1, 3], robust=True, stretch=True)
mom8_image_median_all = mom8_stats.get('median')[0]
mom8_image_mad = mom8_stats.get('medabsdevmed')[0]
mom8_image_min = mom8_stats.get('min')[0]
mom8_image_max = mom8_stats.get('max')[0]
mom8_n_pixels = int(mom8_stats.get('npts')[0])
# Additionally get the median in the MOM8 FC annulus region for the peak SNR calculation
mom8_statsmask2 = '"{:s}" > {:f} && "{:s}" < {:f}'.format(os.path.basename(flattened_pb_name), result.pblimit_image * pblimit_factor, os.path.basename(flattened_pb_name), result.pblimit_cleanmask)
mom8_stats2 = image.statistics(mask=mom8_statsmask2, axes=[0, 1, 3], robust=True, stretch=True)
mom8_image_median_annulus = mom8_stats2.get('median')[0]
# Calculate MOM10 FC statistics
with casa_tools.ImageReader(mom10fc_name) as image:
# Get the min, max, median, MAD and number of pixels of the MOM10 FC image from the area excluding the cleaned area edges
mom10_statsmask = '"{:s}" > {:f}'.format(os.path.basename(flattened_pb_name), result.pblimit_image * pblimit_factor)
mom10_stats = image.statistics(mask=mom10_statsmask, axes=[0, 1, 3], robust=True)
mom10_image_median_all = mom10_stats.get('median')[0]
mom10_image_mad = mom10_stats.get('medabsdevmed')[0]
mom10_image_min = mom10_stats.get('min')[0]
mom10_image_max = mom10_stats.get('max')[0]
mom10_n_pixels = int(mom10_stats.get('npts')[0])
# Additionally get the median in the MOM8 FC annulus region for the peak SNR calculation
mom10_statsmask2 = '"{:s}" > {:f} && "{:s}" < {:f}'.format(os.path.basename(flattened_pb_name), result.pblimit_image * pblimit_factor, os.path.basename(flattened_pb_name), result.pblimit_cleanmask)
mom10_stats2 = image.statistics(mask=mom10_statsmask2, axes=[0, 1, 3], robust=True)
mom10_image_median_annulus = mom10_stats2.get('median')[0]
# Get sigma and channel scaled MAD from the cube
if flattened_mask_name is not None:
# Mask cleanmask and pblimit_factor * pblimit_image < pb < pblimit_cleanmask
cube_statsmask = '"{:s}" > {:f} && "{:s}" < {:f} && "{:s}" < 0.1'.format(os.path.basename(flattened_pb_name), result.pblimit_image * pblimit_factor, os.path.basename(flattened_pb_name), result.pblimit_cleanmask, flattened_mask_name)
else:
# Mask pblimit_factor * pblimit_image < pb < pblimit_cleanmask
cube_statsmask = '"{:s}" > {:f} && "{:s}" < {:f}'.format(os.path.basename(flattened_pb_name), result.pblimit_image * pblimit_factor, os.path.basename(flattened_pb_name), result.pblimit_cleanmask)
LOG.info(f'No cleanmask available to exclude for MOM8_FC RMS and peak SNR calculation. Calculating sigma, channel scaled MAD and peak SNR from annulus area of {pblimit_factor} x pblimit_image < pb < pblimit_cleanmask.')
with casa_tools.ImageReader(imagename) as image:
if self.inputs.stokes == 'I':
cube_stats_masked = image.statistics(mask=cube_statsmask, stretch=True, robust=True, axes=[0, 1, 2], algorithm='chauvenet', maxiter=5)
cube_sigma_fc_chans = np.median(cube_stats_masked.get('sigma')[cont_chan_indices])
cube_scaledMAD_fc_chans = np.median(cube_stats_masked.get('medabsdevmed')[cont_chan_indices]) / 0.6745
else:
cube_stats_masked = image.statistics(mask=cube_statsmask, stretch=True, robust=True, axes=[0, 1], algorithm='chauvenet', maxiter=5)
cube_sigma_fc_chans = np.median(cube_stats_masked.get('sigma')[0][cont_chan_indices])
cube_scaledMAD_fc_chans = np.median(cube_stats_masked.get('medabsdevmed')[0][cont_chan_indices]) / 0.6745
mom8_fc_peak_snr = (mom8_image_max - mom8_image_median_annulus) / cube_scaledMAD_fc_chans
LOG.info('MOM8_FC image {:s} has a maximum of {:#.5g}, median of {:#.5g} resulting in a Peak SNR of {:#.5g} times the channel scaled MAD of {:#.5g}.'.format(os.path.basename(mom8fc_name), mom8_image_max, mom8_image_median_annulus, mom8_fc_peak_snr, cube_scaledMAD_fc_chans))
# New score based on mom8/mom10 histogram asymmetry and largest mom8 segment needs
# more metrics (PIPE-1232)
histogram_threshold = mom8_image_median_all + 2.0 * mom8_image_mad / 0.6745
# Get the PB image as numpy array
with casa_tools.ImageReader(flattened_pb_name) as image:
flattened_pb_image = image.getchunk()[:,:,0,0]
# Get the MOM8 FC image as numpy array
with casa_tools.ImageReader(mom8fc_name) as image:
mom8fc_image = image.getchunk()[:,:,0,0]
mom8fc_image_summary = image.summary()
mom8fc_masked_image = np.ma.array(mom8fc_image, mask=np.where(flattened_pb_image > result.pblimit_image * pblimit_factor, False, True))
# Get number of pixels per beam
major_radius = casa_tools.quanta.getvalue(casa_tools.quanta.convert(mom8fc_image_summary['restoringbeam']['major'], 'rad'))[0] / 2
minor_radius = casa_tools.quanta.getvalue(casa_tools.quanta.convert(mom8fc_image_summary['restoringbeam']['minor'], 'rad'))[0] / 2
cellx = abs(casa_tools.quanta.getvalue(casa_tools.quanta.convert(casa_tools.quanta.quantity(mom8fc_image_summary['incr'][0], mom8fc_image_summary['axisunits'][0]), 'rad'))[0])
celly = abs(casa_tools.quanta.getvalue(casa_tools.quanta.convert(casa_tools.quanta.quantity(mom8fc_image_summary['incr'][1], mom8fc_image_summary['axisunits'][1]), 'rad'))[0])
num_pixels_in_beam = float(major_radius * minor_radius * np.pi / np.log(2) / cellx / celly)
# Get threshold for maximum segment calculation
cut1 = mom8_image_median_annulus + 3.0 * cube_scaledMAD_fc_chans
cut2 = mom8_image_median_annulus + 0.5 * np.ma.max(mom8fc_masked_image - mom8_image_median_annulus)
cut3 = mom8_image_median_annulus + 2.0 * cube_scaledMAD_fc_chans
segments_threshold = max(min(cut1, cut2), cut3)
# Get largest segment
mom8_segments_image = np.ma.where(mom8fc_masked_image > segments_threshold, 1, 0)
mom8_frac_max_segment = 0.0
mom8_max_segment_beams = 0.0
# Label segments
mom8_label_image, num_labels = label(mom8_segments_image)
if num_labels > 0:
# Calculate largest segment size
mom8_num_pixels_in_largest_segment = np.max([np.ma.sum(np.ma.where(mom8_label_image == i, 1, 0)) for i in range(1, num_labels + 1)])
# Prune small segments
if mom8_num_pixels_in_largest_segment >= 0.1 * num_pixels_in_beam:
mom8_frac_max_segment = mom8_num_pixels_in_largest_segment / mom8_n_pixels
mom8_max_segment_beams = mom8_num_pixels_in_largest_segment / num_pixels_in_beam
# Get the MOM10 FC image as numpy array
with casa_tools.ImageReader(mom10fc_name) as image:
mom10fc_image = image.getchunk()[:,:,0,0]
mom10fc_masked_image = np.ma.array(np.abs(mom10fc_image), mask=np.where(flattened_pb_image > result.pblimit_image * pblimit_factor, False, True))
# Get histogram asymmetry
# Global minimum/maximum of both images
mom8_10fc_min = np.ma.min(np.ma.append(mom8fc_masked_image, mom10fc_masked_image))
mom8_10fc_max = np.ma.max(np.ma.append(mom8fc_masked_image, mom10fc_masked_image))
# Calculate histograms
# np.histogram does not know masked arrays; need to convert to normal array using inverted mask as index
mom8fc_histogram, bins = np.histogram(mom8fc_masked_image[np.invert(mom8fc_masked_image.mask)], range=[mom8_10fc_min, mom8_10fc_max], bins='auto')
mom8fc_flux_bins = 0.5*(bins[:-1]+bins[1:])
# np.histogram does not know masked arrays; need to convert to normal array using inverted mask as index
mom10fc_histogram, bins = np.histogram(mom10fc_masked_image[np.invert(mom10fc_masked_image.mask)], range=[mom8_10fc_min, mom8_10fc_max], bins='auto')
mom10fc_flux_bins = 0.5*(bins[:-1]+bins[1:])
# Interpolating numpix of moment10 at the position of moment8 flux
mom8fc_histogram_interp = np.interp(mom10fc_flux_bins, mom8fc_flux_bins, mom8fc_histogram, left=0.0, right=0.0)
# Interpolating numpix of moment8 at the position of moment10 flux
mom10fc_histogram_interp = np.interp(mom8fc_flux_bins, mom10fc_flux_bins, mom10fc_histogram, left=0.0, right=0.0)
# Compute the deviation between mom8 and mom10 pixel histogram for every flux bin
deviation1 = mom8fc_histogram_interp[mom10fc_flux_bins > histogram_threshold] - mom10fc_histogram[mom10fc_flux_bins > histogram_threshold]
deviation2 = mom10fc_histogram_interp[mom8fc_flux_bins > histogram_threshold] - mom8fc_histogram[mom8fc_flux_bins > histogram_threshold]
# Compute the area of the histogram (whatever miminum)
smallerarea = np.min(np.append(np.sum(mom10fc_histogram[mom10fc_flux_bins > histogram_threshold]), np.sum(mom8fc_histogram[mom8fc_flux_bins > histogram_threshold])))
# histasymm is now comparing the summation of the absolute deviation vs histogram area
mom_8_10_histogram_asymmetry = 0.5 * (np.sum(np.abs(deviation1)) + np.sum(np.abs(deviation2))) / smallerarea
# Update the result.
result.set_cube_sigma_fc_chans(maxiter, cube_sigma_fc_chans)
result.set_cube_scaledMAD_fc_chans(maxiter, cube_scaledMAD_fc_chans)
result.set_mom8_fc(maxiter, mom8fc_name)
result.set_mom8_fc_image_min(maxiter, mom8_image_min)
result.set_mom8_fc_image_max(maxiter, mom8_image_max)
result.set_mom8_fc_image_median_all(maxiter, mom8_image_median_all)
result.set_mom8_fc_image_median_annulus(maxiter, mom8_image_median_annulus)
result.set_mom8_fc_image_mad(maxiter, mom8_image_mad)
result.set_mom8_fc_peak_snr(maxiter, mom8_fc_peak_snr)
result.set_mom8_fc_n_pixels(maxiter, mom8_n_pixels)
result.set_mom8_fc_frac_max_segment(maxiter, mom8_frac_max_segment)
result.set_mom8_fc_max_segment_beams(maxiter, mom8_max_segment_beams)
result.set_mom10_fc(maxiter, mom10fc_name)
result.set_mom10_fc_image_min(maxiter, mom10_image_min)
result.set_mom10_fc_image_max(maxiter, mom10_image_max)
result.set_mom10_fc_image_median_all(maxiter, mom10_image_median_all)
result.set_mom10_fc_image_median_annulus(maxiter, mom10_image_median_annulus)
result.set_mom10_fc_image_mad(maxiter, mom10_image_mad)
result.set_mom10_fc_n_pixels(maxiter, mom10_n_pixels)
result.set_mom8_10_fc_histogram_asymmetry(maxiter, mom_8_10_histogram_asymmetry)
else:
LOG.warning('Cannot create MOM0_FC / MOM8_FC / MOM10_FC images for intent "%s", '
'field %s, spw %s, no continuum ranges found.' %
(self.inputs.intent, self.inputs.field, self.inputs.spw))
def _calc_mom0_8(self, result):
'''
Creates moment 0 (integrated flux) and 8 (peak flux) maps for non--primary-beam corrected
images after continuum subtraction, using *all channels* (may include channels with lines).
See PIPE-558 (https://open-jira.nrao.edu/browse/PIPE-558).
'''
# Find max iteration that was performed.
maxiter = max(result.iterations.keys())
# Get filename of image from result, and modify to select the
# non-PB-corrected image.
imagename = result.iterations[maxiter]['image'].replace('.pbcor', '')
# Set output filename for MOM0 all channel image.
mom0_name = '%s.mom0' % imagename
# Calculate moment image
self._calc_moment_image(imagename=imagename, moments=[0], outfile=mom0_name, chans='', iter=maxiter)
# Update the result.
result.set_mom0(maxiter, mom0_name)
# Set output filename for MOM8 all channel image.
mom8_name = '%s.mom8' % imagename
# Calculate moment image
self._calc_moment_image(imagename=imagename, moments=[8], outfile=mom8_name, chans='', iter=maxiter)
# Update the result.
result.set_mom8(maxiter, mom8_name)
def _update_miscinfo(self, imagename: str, nfield: int | None = None, datamin: float | None = None,
datamax: float | None = None, datarms: float | None = None,
stokes: str | None = None, effbw: float | None = None,
level: str | None = None, ctrfrq: float | None = None,
obspatt: str | None = None, arrays: str | None = None,
modifier: str | None = None, session: str | None = None):
"""
Update image header keywords.
Args:
imagename (str): image name
nfield (int): number of mosaic fields
datamin (float): minimum image value
datamax (float): maximum image value
datarms (float): rms image value
stokes (str): Stokes parameter
effbw (float): effective bandwidth in Hz
level (str): OUS kind (member, group)
obspatt (str): observing pattern (mos, sf, sd)
ctrfrq: Image center frequency in Hz
modifier (str): image name modifier (binnedN?)
session (str): session name
Returns:
None
"""
with casa_tools.ImageReader(imagename) as image:
# Get current header
info = image.miscinfo()
# Update keywords that are not None
frame = inspect.currentframe()
keywords, _, _, values = inspect.getargvalues(frame)
for keyword in keywords:
if keyword not in ('self', 'imagename') and values[keyword] is not None:
if keyword == 'session':
# PIPE-2148, limiting 'sessionX' keyword length to 68 characters
# due to FITS header keyword string length limit.
info = imageheader.wrap_key(info, 'sessio', session)
else:
info[keyword] = values[keyword]
# Save header back to image
image.setmiscinfo(info)