"""Imaging stage."""
from __future__ import annotations
import collections
import functools
import math
import os
from numbers import Number
from typing import TYPE_CHECKING
import numpy
from scipy import interpolate
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.filenamer as filenamer
import pipeline.infrastructure.imageheader as imageheader
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.domain import DataTable, DataType
from pipeline.h.heuristics import fieldnames
from pipeline.h.tasks.common.sensitivity import Sensitivity
from pipeline.hsd.heuristics import rasterscan
from pipeline.hsd.heuristics import SDcalatmcorr as sdatm
from pipeline.hsd.heuristics.rasterscan import RasterScanHeuristicsResult, RasterScanHeuristicsFailure
from pipeline.hsd.tasks import common
from pipeline.hsd.tasks.baseline import baseline
from pipeline.hsd.tasks.common import compress, direction_utils, observatory_policy, rasterutil, sdtyping
from pipeline.hsd.tasks.common import utils as sdutils
from pipeline.hsd.tasks.imaging import (detect_missed_lines,
detectcontamination, gridding,
imaging_params, resultobjects,
sdcombine, worker)
from pipeline.infrastructure import casa_tools, task_registry
if TYPE_CHECKING:
from casatools import coordsys
from pipeline.domain import MeasurementSet
from pipeline.infrastructure import Context
from pipeline.infrastructure.imagelibrary import ImageItem
from resultobjects import SDImagingResults
LOG = infrastructure.logging.get_logger(__name__)
# SensitivityInfo:
# sensitivity: Sensitivity of an image
# frequency_range: frequency ranges from which the sensitivity is calculated
# to_export: True if the sensitivity shall be exported to aqua report. (to avoid exporting NRO sensitivity in K)
SensitivityInfo = collections.namedtuple('SensitivityInfo', 'sensitivity frequency_range to_export')
# RasterInfo: center_ra, center_dec = R.A. and Declination of map center
# width=map extent along scan, height=map extent perpendicular to scan
# angle=scan direction w.r.t. horizontal coordinate, row_separation=separation between raster rows.
RasterInfo = collections.namedtuple('RasterInfo', 'center_ra center_dec width height '
'scan_angle row_separation row_duration')
# Reference MS in combined list
REF_MS_ID = 0
# The minimum limit of integration time (seconds) to be a valid scan duration (0.99 ms)
# The current minimum, 0.99 ms, comes from the typical integration time of fast-scan observation
# by SQLD in ALMA (1 ms) with 1% margin to avoid rejecting the exact 1 ms case (w/ numerical error).
# Adjust the value when Pipeline supports observation modes/instruments with smaller integration time.
MIN_INTEGRATION_SEC = 9.9e-4
class SDImagingInputs(vdp.StandardInputs):
"""Inputs for imaging task class."""
# Search order of input vis
processing_data_type = [DataType.BASELINED, DataType.ATMCORR,
DataType.REGCAL_CONTLINE_ALL, DataType.RAW]
infiles = vdp.VisDependentProperty(default='', null_input=['', None, [], ['']])
spw = vdp.VisDependentProperty(default='')
pol = vdp.VisDependentProperty(default='')
field = vdp.VisDependentProperty(default='')
mode = vdp.VisDependentProperty(default='line')
@field.postprocess
def field(self, unprocessed: str | None) -> str | None:
"""Get fields as a string.
Args:
unprocessed : Unprocessed fields
Returns:
fields as a string
"""
# LOG.info('field.postprocess: unprocessed = "{0}"'.format(unprocessed))
if unprocessed is not None and unprocessed != '':
return unprocessed
# filters field with intents in self.intent
p = fieldnames.IntentFieldnames()
fields = set()
vislist = [self.vis] if isinstance(self.vis, str) else self.vis
for vis in vislist:
# This assumes the same fields in all MSes
msobj = self.context.observing_run.get_ms(vis)
# this will give something like '0542+3243,0343+242'
intent_fields = p(msobj, self.intent)
fields.update(utils.safe_split(intent_fields))
# LOG.info('field.postprocess: fields = "{0}"'.format(fields))
return ','.join(fields)
@property
def antenna(self) -> str:
return ''
@property
def intent(self) -> str:
return 'TARGET'
# Synchronization between infiles and vis is still necessary
@vdp.VisDependentProperty
def vis(self) -> list[str]:
return self.infiles
@property
def is_ampcal(self) -> bool:
return self.mode.upper() == 'AMPCAL'
# TODO: Replace the decorator with @functools.cached_property
# when we completely get rid of python 3.6 support
@property
@functools.lru_cache(1)
def datatype(self) -> DataType:
"""Return datatype enum corresponding to dataset returned by vis/infiles attribute."""
# TODO: It would be ideal to integrate datatype stuff into vdp.
# Since this is urgent fix for PIPE-1480, it is better for now
# to confine the change into single file to avoid unexpected
# side effect.
_, _datatype = self.context.observing_run.get_measurement_sets_of_type(
self.processing_data_type, msonly=False
)
return _datatype
# docstring and type hints: supplements hsd_imaging
def __init__(self, context: Context, mode: str | None=None, restfreq: str | None=None,
infiles: list[str] | None=None, field: str | None=None, spw: str | None=None,
org_direction: sdtyping.Direction | None=None):
"""Initialize an object.
Args:
context : Pipeline context
mode: Imaging mode controls imaging parameters in the task.
Accepts either "line" (spectral line imaging) or "ampcal"
(image settings for amplitude calibrator).
Default: ``None`` (equivalent to ``'line'``)
restfreq: Rest frequency. Defaults to None,
it executes without rest frequency.
infiles: List of data files. These must be a name of
MeasurementSets that are registered to context via
hsd_importdata or hsd_restoredata tasks.
Example: ``vis=['uid___A002_X85c183_X36f.ms', 'uid___A002_X85c183_X60b.ms']``
Default: ``None`` (process all registered MeasurementSets)
field: Data selection by field names or ids.
Example: ``"*Sgr*,M100"``
Default: ``None`` (process all science fields)
spw: Data selection by spw ids.
Example: ``"3,4"`` (generate images for spw 3 and 4)
Default: ``None`` (process all science spws)
org_direction: Directions of the origin for moving targets.
Defaults to ``None``, it doesn't have some moving targets.
"""
super().__init__()
self.context = context
self.restfreq = restfreq
if self.restfreq is None:
self.restfreq = ''
self.mode = mode
self.infiles = infiles
self.field = field
self.mode = mode
self.spw = spw
self.org_direction = org_direction
[docs]
@task_registry.set_equivalent_casa_task('hsd_imaging')
@task_registry.set_casa_commands_comment('Perform single dish imaging.')
class SDImaging(basetask.StandardTaskTemplate):
"""SDImaging processing class."""
Inputs = SDImagingInputs
# stokes to image and requred POLs for it
stokes = 'I'
# for linear feed in ALMA. this affects pols passed to gridding module
required_pols = ['XX', 'YY']
is_multi_vis_task = True
[docs]
def prepare(self) -> resultobjects.SDImagingResults:
"""Execute imaging process. This is the main method of imaging.
Returns:
result object of imaging
"""
cp = self._initialize_common_parameters()
# loop over reduction group (spw and source combination)
for _group_id, _group_desc in cp.reduction_group.items():
rgp = imaging_params.ReductionGroupParameters(_group_id, _group_desc)
if not self._initialize_reduction_group_parameters(cp, rgp):
continue
for rgp.name, rgp.members in rgp.image_group.items():
self._set_image_group_item_into_reduction_group_patameters(cp, rgp)
# Step 1: (removed with PIPE-2491)
# Step 2: imaging
if not self._execute_imaging(cp, rgp):
continue
if self._has_imager_result_outcome(rgp):
# Imaging was successful, proceed following steps
self._add_image_list_to_combine(rgp)
# Additional Step.
# Make grid_table and put rms and valid spectral number array
# to the outcome.
# The rms and number of valid spectra is used to create RMS maps.
self._make_grid_table(cp, rgp)
self._define_rms_range_in_image(cp, rgp)
self._set_asdm_to_outcome_vis_if_imagemode_is_ampcal(cp, rgp)
# NRO doesn't need per-antenna Stokes I images
if cp.is_not_nro():
self._append_result(cp, rgp)
if self._has_nro_imager_result_outcome(rgp):
self._additional_imaging_process_for_nro(cp, rgp)
if self._skip_this_loop(rgp):
continue
self._prepare_for_combine_images(rgp)
# Step 3: imaging of all antennas
self._execute_combine_images(rgp)
pp = imaging_params.PostProcessParameters()
if self._has_imager_result_outcome(rgp):
# Imaging was successful, proceed following steps
# Additional Step.
# Make grid_table and put rms and valid spectral number array
# to the outcome
# The rms and number of valid spectra is used to create RMS maps
self._make_post_grid_table(cp, rgp, pp)
# calculate RMS of line free frequencies in a combined image
try:
self._generate_parameters_for_calculate_sensitivity(cp, rgp, pp)
self._set_representative_flag(rgp, pp)
self._warn_if_early_cycle(rgp)
self._calculate_sensitivity(cp, rgp, pp)
finally:
pp.done()
self._detect_missed_lines( cp, rgp, pp )
self._append_result(cp, rgp)
# NRO specific: generate combined image for each correlation
if cp.is_nro and not self._execute_combine_images_for_nro(cp, rgp, pp):
continue
return cp.results
@classmethod
def _finalize_worker_result(cls,
context: Context,
result: SDImagingResults,
session: str,
sourcename: str,
spwlist: list[int],
antenna: str,
specmode: str,
imagemode: str,
stokes: str,
datatype: DataType,
datamin: float | None,
datamax: float | None,
datarms: float | None,
validsp: list[list[int]],
rms: list[list[float]],
edge: list[int],
reduction_group_id: int,
file_index: list[int],
assoc_antennas: list[int],
assoc_fields: list[int],
assoc_spws: list[int],
sensitivity_info: SensitivityInfo | None=None,
theoretical_rms: dict | None=None,
effbw: float | None=None):
"""
Fanalize the worker result.
Args:
context : Pipeline context
result : SDImagingResults instance
session : Session name
sourcename : Name of the source
spwlist : List of SpWs
antenna : Antenna name
specmode : Specmode for tsdimaging
imagemode : Image mode
stokes : Stokes parameter
datatype : Datatype enum
datamin : Minimum value of the image
datamax : Maximum value of the image
datarms : Rms of the image
validsp : # of combined spectra
rms : Rms values
edge : Edge channels
reduction_group_id : Reduction group ID
file_index : MS file index
assoc_antennas : List of associated antennas
assoc_fields : List of associated fields
assoc_spws : List of associated SpWs
sensitivity_info : Sensitivity information
theoretical_rms : Theoretical RMS
effbw : Effective channel bandwidth in Hz
Returns:
(none)
"""
# override attributes for image item
# the following attribute is currently hard-coded
sourcetype = 'TARGET'
_locals = locals()
image_keys = ('sourcename', 'spwlist', 'antenna', 'ant_name', 'specmode', 'sourcetype')
for x in image_keys:
if x in _locals:
setattr(result.outcome['image'], x, _locals[x])
# fill outcomes
outcome_keys = ('imagemode', 'stokes', 'validsp', 'rms', 'edge', 'reduction_group_id',
'file_index', 'assoc_antennas', 'assoc_fields', 'assoc_spws', 'assoc_spws')
for x in outcome_keys:
if x in _locals:
result.outcome[x] = _locals[x]
# attach sensitivity_info if available
if sensitivity_info is not None:
result.sensitivity_info = sensitivity_info
# attach theoretical RMS if available
if theoretical_rms is not None:
result.theoretical_rms = theoretical_rms
# set some information to image header
image_item = result.outcome['image']
imagename = image_item.imagename
# Virtual spws are not applicable for NRO data.
# So if is_nro() returns True, the 'virtspw' flag is set to False.
virtspw = not sdutils.is_nro(context)
for name in (imagename, imagename + '.weight'):
imageheader.set_miscinfo(name=name,
spw=','.join(map(str, spwlist)),
virtspw=virtspw,
field=image_item.sourcename,
nfield=1,
datatype=datatype.name,
type='singledish',
iter=1, # nominal
intent=sourcetype,
specmode=specmode,
is_per_eb=False,
context=context)
# update miscinfo
# TODO: Eventually, the following code together with
# Tclean.update_miscinfo should be merged into
# imageheader.set_miscinfo.
with casa_tools.ImageReader(name) as image:
info = image.miscinfo()
if '.weight' not in name:
if datamin:
info['datamin'] = datamin
if datamax:
info['datamax'] = datamax
if datarms:
info['datarms'] = datarms
info['stokes'] = stokes
if effbw:
info['effbw'] = effbw
info['level'] = 'member'
info['obspatt'] = 'sd'
info['arrays'] = 'TP'
info['modifier'] = ''
# PIPE-2148, limiting 'sessionX' keyword length to 68 characters
# due to FITS header keyword string length limit.
info = imageheader.wrap_key(info, 'sessio', session)
image.setmiscinfo(info)
# finally replace task attribute with the top-level one
result.task = cls
def _get_edge(self) -> list[int]:
"""
Search results and retrieve edge parameter from the most recent SDBaselineResults if it exists.
Returns:
A list of edge
"""
_getresult = lambda r: r.read() if hasattr(r, 'read') else r
_registered_results = [_getresult(r) for r in self.inputs.context.results]
_baseline_stage = -1
for _stage in range(len(_registered_results) - 1, -1, -1):
if isinstance(_registered_results[_stage], baseline.SDBaselineResults):
_baseline_stage = _stage
if _baseline_stage > 0:
ret = list(_registered_results[_baseline_stage].outcome['edge'])
LOG.info('Retrieved edge information from SDBaselineResults: {}'.format(ret))
else:
LOG.info('No SDBaselineResults available. Set edge as [0,0]')
ret = [0, 0]
return ret
def _initialize_common_parameters(self) -> imaging_params.CommonParameters:
"""Initialize common parameters of prepare().
Returns:
common parameters object of prepare()
"""
return imaging_params.initialize_common_parameters(
reduction_group=self.inputs.context.observing_run.ms_reduction_group,
infiles=self.inputs.infiles,
restfreq_list=self.inputs.restfreq,
ms_list=self.inputs.ms,
ms_names=[msobj.name for msobj in self.inputs.ms],
session_names=[msobj.session for msobj in self.inputs.ms],
args_spw=sdutils.convert_spw_virtual2real(self.inputs.context, self.inputs.spw),
in_field=self.inputs.field,
imagemode=self.inputs.mode.upper(),
is_nro=sdutils.is_nro(self.inputs.context),
results=resultobjects.SDImagingResults(),
edge=self._get_edge(),
dt_dict=dict((_ms.basename, DataTable(sdutils.get_data_table_path(self.inputs.context, _ms)))
for _ms in self.inputs.ms)
)
def _get_correlations_if_nro(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters) -> str | None:
"""If data is from NRO, then get correlations.
Args:
cp : Common parameters object of prepare()
rgp : Reduction group parameter object of prepare()
Returns:
joined list of correlations
"""
if cp.is_nro:
_correlations = []
for c in rgp.pols_list:
if c not in _correlations:
_correlations.append(c)
assert len(_correlations) == 1
return ''.join(_correlations[0])
else:
return None
def _get_rgp_image_group(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters) -> dict[str, list[list[str]]]:
"""Get image group of reduction group.
Args:
cp : Common parameters object of prepare()
rgp : Reduction group parameter object of prepare()
Returns:
image group dictionary, value is list of [ms, antenna, spwid,
fieldid, pollist, channelmap]
"""
_image_group = {}
for _msobj, _ant, _spwid, _fieldid, _pollist, _chanmap in \
zip(cp.ms_list, rgp.antenna_list, rgp.spwid_list, rgp.fieldid_list, rgp.pols_list,
rgp.channelmap_range_list):
_identifier = _msobj.fields[_fieldid].name
_antenna = _msobj.antennas[_ant].name
_identifier += '.' + _antenna
# create image per asdm and antenna for ampcal
if self.inputs.is_ampcal:
_asdm_name = common.asdm_name_from_ms(_msobj)
_identifier += '.' + _asdm_name
if _identifier in _image_group:
_image_group[_identifier].append([_msobj, _ant, _spwid, _fieldid, _pollist, _chanmap])
else:
_image_group[_identifier] = [[_msobj, _ant, _spwid, _fieldid, _pollist, _chanmap]]
return _image_group
def _initialize_reduction_group_parameters(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters) -> bool:
"""Set default values into the instance of imaging_params.ReductionGroupParameters.
Note: cp.ms_list is set in this function.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
"""
LOG.debug('Processing Reduction Group {}'.format(rgp.group_id))
LOG.debug('Group Summary:')
for _group in rgp.group_desc:
LOG.debug('\t{}: Antenna {:d} ({}) Spw {:d} Field {:d} ({})'.format(_group.ms.basename, _group.antenna_id,
_group.antenna_name, _group.spw_id,
_group.field_id, _group.field_name))
# Which group in group_desc list should be processed
# fix for CAS-9747
# There may be the case that observation didn't complete so that some of
# target fields are missing in MS. In this case, directly pass in_field
# to get_valid_ms_members causes trouble. As a workaround, ad hoc pre-selection
# of field name is applied here.
# 2017/02/23 TN
_field_sel = ''
if len(cp.in_field) == 0:
# fine, just go ahead
_field_sel = cp.in_field
elif rgp.group_desc.field_name in [x.strip('"') for x in cp.in_field.split(',')]:
# pre-selection of the field name
_field_sel = rgp.group_desc.field_name
else:
LOG.info('Skip reduction group {:d}'.format(rgp.group_id))
return False # no field name is included in in_field, skip
rgp.member_list = list(common.get_valid_ms_members(rgp.group_desc, cp.ms_names, self.inputs.antenna,
_field_sel, cp.args_spw))
LOG.trace('group {}: member_list={}'.format(rgp.group_id, rgp.member_list))
# skip this group if valid member list is empty
if len(rgp.member_list) == 0:
LOG.info('Skip reduction group {:d}'.format(rgp.group_id))
return False
rgp.member_list.sort() # list of group_desc IDs to image
rgp.antenna_list = [rgp.group_desc[i].antenna_id for i in rgp.member_list]
rgp.spwid_list = [rgp.group_desc[i].spw_id for i in rgp.member_list]
cp.ms_list = [rgp.group_desc[i].ms for i in rgp.member_list]
rgp.fieldid_list = [rgp.group_desc[i].field_id for i in rgp.member_list]
_temp_dd_list = [cp.ms_list[i].get_data_description(spw=rgp.spwid_list[i])
for i in range(len(rgp.member_list))]
rgp.channelmap_range_list = [rgp.group_desc[i].channelmap_range for i in rgp.member_list]
# this becomes list of list [[poltypes for ms0], [poltypes for ms1], ...]
# polids_list = [[ddobj.get_polarization_id(corr) for corr in ddobj.corr_axis \
# if corr in self.required_pols ] for ddobj in temp_dd_list]
rgp.pols_list = [[_corr for _corr in _ddobj.corr_axis if
_corr in self.required_pols] for _ddobj in _temp_dd_list]
# NRO specific
rgp.correlations = self._get_correlations_if_nro(cp, rgp)
LOG.debug('Members to be processed:')
for i in range(len(rgp.member_list)):
LOG.debug('\t{}: Antenna {} Spw {} Field {}'.format(cp.ms_list[i].basename, rgp.antenna_list[i],
rgp.spwid_list[i], rgp.fieldid_list[i]))
# image is created per antenna (science) or per asdm and antenna (ampcal)
rgp.image_group = self._get_rgp_image_group(cp, rgp)
LOG.debug('image_group={}'.format(rgp.image_group))
rgp.combined = imaging_params.CombinedImageParameters()
rgp.tocombine = imaging_params.ToCombineImageParameters()
return True
def _pick_restfreq_from_restfreq_list(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters):
"""Pick restfreq from restfreq_list.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
"""
if isinstance(cp.restfreq_list, list):
_v_spwid = self.inputs.context.observing_run.real2virtual_spw_id(rgp.spwids[0], rgp.msobjs[0])
_v_spwid_list = [
self.inputs.context.observing_run.real2virtual_spw_id(int(i), rgp.msobjs[0]) for
i in cp.args_spw[rgp.msobjs[0].name].split(',')]
_v_idx = _v_spwid_list.index(_v_spwid)
if len(cp.restfreq_list) > _v_idx:
rgp.restfreq = cp.restfreq_list[_v_idx]
if rgp.restfreq is None:
rgp.restfreq = ''
LOG.info("Picked restfreq = '{}' from {}".format(rgp.restfreq, cp.restfreq_list))
else:
rgp.restfreq = ''
LOG.warning("No restfreq for spw {} in {}. Applying default value.".format(_v_spwid,
cp.restfreq_list))
else:
rgp.restfreq = cp.restfreq_list
LOG.info("Processing with restfreq = {}".format(rgp.restfreq))
def _set_image_name_based_on_virtual_spwid(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters):
"""Generate image name based on virtual spw id and set it to RGP.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
"""
_v_spwids_unique = numpy.unique(rgp.v_spwids)
assert len(_v_spwids_unique) == 1
rgp.imagename = self.get_imagename(rgp.source_name, _v_spwids_unique, rgp.ant_name,
rgp.asdm, specmode=rgp.specmode)
LOG.info("Output image name: {}".format(rgp.imagename))
rgp.imagename_nro = None
if cp.is_nro:
rgp.imagename_nro = self.get_imagename(rgp.source_name, _v_spwids_unique, rgp.ant_name, rgp.asdm,
stokes=rgp.correlations, specmode=rgp.specmode)
LOG.info("Output image name for NRO: {}".format(rgp.imagename_nro))
def _set_image_group_item_into_reduction_group_patameters(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters):
"""Set values for imaging into RGP.
This method does (1)get parameters from image group in RGP to do gridding(imaging) and set them into RGP,
and (2)generate an image name and pick the rest frequency.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
"""
rgp.msobjs = [x[0] for x in rgp.members]
rgp.antids = [x[1] for x in rgp.members]
rgp.spwids = [x[2] for x in rgp.members]
rgp.fieldids = [x[3] for x in rgp.members]
rgp.polslist = [x[4] for x in rgp.members]
rgp.chanmap_range_list = [x[5] for x in rgp.members]
LOG.info("Processing image group: {}".format(rgp.name))
for idx in range(len(rgp.msobjs)):
LOG.info(
"\t{}: Antenna {:d} ({}) Spw {} Field {:d} ({})"
"".
format(rgp.msobjs[idx].basename, rgp.antids[idx], rgp.msobjs[idx].antennas[rgp.antids[idx]].name,
rgp.spwids[idx], rgp.fieldids[idx], rgp.msobjs[idx].fields[rgp.fieldids[idx]].name))
# reference data is first MS
rgp.ref_ms = rgp.msobjs[0]
rgp.ant_name = rgp.ref_ms.antennas[rgp.antids[0]].name
# for ampcal
rgp.asdm = None
if self.inputs.is_ampcal:
rgp.asdm = common.asdm_name_from_ms(rgp.ref_ms)
# source name
rgp.source_name = rgp.group_desc.field_name.replace(' ', '_')
# specmode
_ref_field = rgp.fieldids[0]
_is_eph_obj = rgp.ref_ms.get_fields(field_id=_ref_field)[0].source.is_eph_obj
rgp.specmode = 'cubesource' if _is_eph_obj else 'cube'
# filenames for gridding
cp.infiles = [_ms.name for _ms in rgp.msobjs]
LOG.debug('infiles={}'.format(cp.infiles))
# virtual spw ids
rgp.v_spwids = [self.inputs.context.observing_run.real2virtual_spw_id(s, m)
for s, m in zip(rgp.spwids, rgp.msobjs)]
# image name
self._set_image_name_based_on_virtual_spwid(cp, rgp)
# restfreq
self._pick_restfreq_from_restfreq_list(cp, rgp)
def _initialize_coord_set(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters) -> bool:
"""Initialize coordinate set of MS.
if initialize is fault, current loop of reduction group goes to next loop immediately.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
Returns:
A flag initialize succeeded or not
"""
# PIPE-313: evaluate map extent using pointing data from all the antenna in the data
_dummyids = [None for _ in rgp.antids]
_image_coord = worker.ImageCoordinateUtil(self.inputs.context, cp.infiles,
_dummyids, rgp.spwids, rgp.fieldids)
if not _image_coord: # No valid data is found
return False
rgp.coord_set = True
rgp.phasecenter, rgp.cellx, rgp.celly, rgp.nx, rgp.ny, rgp.org_direction = _image_coord
return True
def _execute_imaging_worker(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters):
"""Execute imaging worker.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
"""
# register data for combining
rgp.combined.extend(cp, rgp)
rgp.stokes_list = [self.stokes]
_imagename_list = [rgp.imagename]
if cp.is_nro:
rgp.stokes_list.append(rgp.correlations)
_imagename_list.append(rgp.imagename_nro)
_imager_results = []
for _stokes, _imagename in zip(rgp.stokes_list, _imagename_list):
_imager_inputs = worker.SDImagingWorker.Inputs(self.inputs.context, cp.infiles,
outfile=_imagename,
mode=cp.imagemode,
antids=rgp.antids,
spwids=rgp.spwids,
fieldids=rgp.fieldids,
restfreq=rgp.restfreq,
stokes=_stokes,
edge=cp.edge,
phasecenter=rgp.phasecenter,
cellx=rgp.cellx,
celly=rgp.celly,
nx=rgp.nx, ny=rgp.ny,
org_direction=rgp.org_direction)
_imager_task = worker.SDImagingWorker(_imager_inputs)
_imager_result = self._executor.execute(_imager_task)
_imager_results.append(_imager_result)
# per-antenna image (usually Stokes I)
rgp.imager_result = _imager_results[0]
# per-antenna correlation image (XXYY/RRLL)
rgp.imager_result_nro = _imager_results[1] if cp.is_nro else None
def _make_grid_table(self, cp: imaging_params.CommonParameters, rgp: imaging_params.ReductionGroupParameters):
"""Make grid table for gridding.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
"""
LOG.info('Additional Step. Make grid_table')
rgp.imagename = rgp.imager_result.outcome['image'].imagename
with casa_tools.ImageReader(rgp.imagename) as ia:
_cs = ia.coordsys()
_dircoords = [i for i in range(_cs.naxes()) if _cs.axiscoordinatetypes()[i] == 'Direction']
_cs.done()
rgp.nx = ia.shape()[_dircoords[0]]
rgp.ny = ia.shape()[_dircoords[1]]
_observing_pattern = rgp.msobjs[0].observing_pattern[rgp.antids[0]][rgp.spwids[0]][rgp.fieldids[0]]
_grid_task_class = gridding.gridding_factory(_observing_pattern)
rgp.validsps = []
rgp.rmss = []
_grid_input_dict = {}
for _msobj, _antid, _spwid, _fieldid, _poltypes, _ in rgp.members:
_msname = _msobj.name # Use parent ms
for p in _poltypes:
if p not in _grid_input_dict:
_grid_input_dict[p] = [[_msname], [_antid], [_fieldid], [_spwid]]
else:
_grid_input_dict[p][0].append(_msname)
_grid_input_dict[p][1].append(_antid)
_grid_input_dict[p][2].append(_fieldid)
_grid_input_dict[p][3].append(_spwid)
# Generate grid table for each POL in image (per ANT,
# FIELD, and SPW, over all MSes)
for _pol, _member in _grid_input_dict.items():
_mses = _member[0]
_antids = _member[1]
_fieldids = _member[2]
_spwids = _member[3]
_pols = [_pol for i in range(len(_mses))]
_gridding_inputs = _grid_task_class.Inputs(self.inputs.context, infiles=_mses,
antennaids=_antids,
fieldids=_fieldids,
spwids=_spwids,
poltypes=_pols,
nx=rgp.nx, ny=rgp.ny)
_gridding_task = _grid_task_class(_gridding_inputs)
_gridding_result = self._executor.execute(_gridding_task, merge=False,
datatable_dict=cp.dt_dict)
# Extract RMS and number of spectra from grid_tables
if isinstance(_gridding_result.outcome, compress.CompressedObj):
_grid_table = _gridding_result.outcome.decompress()
else:
_grid_table = _gridding_result.outcome
rgp.validsps.append([r[6] for r in _grid_table])
rgp.rmss.append([r[8] for r in _grid_table])
def _add_image_list_to_combine(self, rgp: imaging_params.ReductionGroupParameters):
"""Add image list to combine.
Args:
rgp : Reduction group parameter object of prepare()
"""
if os.path.exists(rgp.imagename) and os.path.exists(rgp.imagename + '.weight'):
rgp.tocombine.images.append(rgp.imagename)
rgp.tocombine.org_directions.append(rgp.org_direction)
rgp.tocombine.specmodes.append(rgp.specmode)
def _define_rms_range_in_image(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters):
"""Define RMS range and finalize worker result.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
"""
_cqa = casa_tools.quanta
LOG.info("Calculate spectral line and deviation mask frequency ranges in image.")
with casa_tools.ImageReader(rgp.imagename) as ia:
_cs = ia.coordsys()
_frequency_frame = _cs.getconversiontype('spectral')
_cs.done()
_rms_exclude_freq = self._get_rms_exclude_freq_range_image(_frequency_frame, cp, rgp)
LOG.info("The spectral line and deviation mask frequency ranges = {}".format(str(_rms_exclude_freq)))
rgp.combined.rms_exclude.extend(_rms_exclude_freq)
_file_index = [common.get_ms_idx(self.inputs.context, name) for name in cp.infiles]
_spwid = str(rgp.combined.v_spws[REF_MS_ID])
_spwobj = rgp.ref_ms.get_spectral_window(_spwid)
_effective_bw = _cqa.quantity(_spwobj.channels.chan_effbws[0], 'Hz')
effbw = float(_cqa.getvalue(_effective_bw)[0])
self._finalize_worker_result(self.inputs.context, rgp.imager_result, session=','.join(cp.session_names), sourcename=rgp.source_name,
spwlist=rgp.v_spwids, antenna=rgp.ant_name, specmode=rgp.specmode,
imagemode=cp.imagemode, stokes=self.stokes,
datatype=self.inputs.datatype, datamin=None, datamax=None, datarms=None,
validsp=rgp.validsps,
rms=rgp.rmss, edge=cp.edge, reduction_group_id=rgp.group_id,
file_index=_file_index, assoc_antennas=rgp.antids, assoc_fields=rgp.fieldids,
assoc_spws=rgp.v_spwids, effbw=effbw)
def _additional_imaging_process_for_nro(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters):
"""Add image list to combine and finalize worker result.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
"""
_cqa = casa_tools.quanta
# Imaging was successful, proceed following steps
# add image list to combine
if os.path.exists(rgp.imagename_nro) and os.path.exists(rgp.imagename_nro + '.weight'):
rgp.tocombine.images_nro.append(rgp.imagename_nro)
rgp.tocombine.org_directions_nro.append(rgp.org_direction)
rgp.tocombine.specmodes.append(rgp.specmode)
_file_index = [common.get_ms_idx(self.inputs.context, name) for name in cp.infiles]
_spwid = str(rgp.combined.v_spws[REF_MS_ID])
_spwobj = rgp.ref_ms.get_spectral_window(_spwid)
_effective_bw = _cqa.quantity(_spwobj.channels.chan_effbws[0], 'Hz')
effbw = float(_cqa.getvalue(_effective_bw)[0])
self._finalize_worker_result(self.inputs.context, rgp.imager_result_nro, session=','.join(cp.session_names), sourcename=rgp.source_name,
spwlist=rgp.v_spwids, antenna=rgp.ant_name, specmode=rgp.specmode,
imagemode=cp.imagemode, stokes=rgp.stokes_list[1],
datatype=self.inputs.datatype, datamin=None, datamax=None, datarms=None,
validsp=rgp.validsps,
rms=rgp.rmss, edge=cp.edge, reduction_group_id=rgp.group_id,
file_index=_file_index, assoc_antennas=rgp.antids, assoc_fields=rgp.fieldids,
assoc_spws=rgp.v_spwids, effbw=effbw)
cp.results.append(rgp.imager_result_nro)
def _make_post_grid_table(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters,
pp: imaging_params.PostProcessParameters):
"""Make grid table on post process.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
pp : Imaging post process parameters of prepare()
"""
LOG.info('Additional Step. Make grid_table')
pp.imagename = rgp.imager_result.outcome['image'].imagename
pp.org_direction = rgp.imager_result.outcome['image'].org_direction
with casa_tools.ImageReader(pp.imagename) as ia:
_cs = ia.coordsys()
_dircoords = [i for i in range(_cs.naxes()) if _cs.axiscoordinatetypes()[i] == 'Direction']
_cs.done()
pp.nx = ia.shape()[_dircoords[0]]
pp.ny = ia.shape()[_dircoords[1]]
_antid = rgp.combined.antids[REF_MS_ID]
_spwid = rgp.combined.spws[REF_MS_ID]
_fieldid = rgp.combined.fieldids[REF_MS_ID]
_observing_pattern = rgp.ref_ms.observing_pattern[_antid][_spwid][_fieldid]
_grid_task_class = gridding.gridding_factory(_observing_pattern)
pp.validsps = []
pp.rmss = []
_grid_input_dict = {}
for _msname, _antid, _spwid, _fieldid, _poltypes in zip(rgp.combined.infiles,
rgp.combined.antids,
rgp.combined.spws,
rgp.combined.fieldids,
rgp.combined.pols):
for p in _poltypes:
if p not in _grid_input_dict:
_grid_input_dict[p] = [[_msname], [_antid], [_fieldid], [_spwid]]
else:
_grid_input_dict[p][0].append(_msname)
_grid_input_dict[p][1].append(_antid)
_grid_input_dict[p][2].append(_fieldid)
_grid_input_dict[p][3].append(_spwid)
for _pol, _member in _grid_input_dict.items():
_mses = _member[0]
_antids = _member[1]
_fieldids = _member[2]
_spwids = _member[3]
_pols = [_pol for i in range(len(_mses))]
_gridding_inputs = _grid_task_class.Inputs(self.inputs.context, infiles=_mses, antennaids=_antids,
fieldids=_fieldids, spwids=_spwids, poltypes=_pols,
nx=pp.nx, ny=pp.ny)
_gridding_task = _grid_task_class(_gridding_inputs)
_gridding_result = self._executor.execute(_gridding_task, merge=False, datatable_dict=cp.dt_dict)
# Extract RMS and number of spectra from grid_tables
if isinstance(_gridding_result.outcome, compress.CompressedObj):
_grid_table = _gridding_result.outcome.decompress()
else:
_grid_table = _gridding_result.outcome
pp.validsps.append([r[6] for r in _grid_table])
pp.rmss.append([r[8] for r in _grid_table])
def _generate_parameters_for_calculate_sensitivity(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters,
pp: imaging_params.PostProcessParameters):
"""Generate parameters to calculate sensitivity.
Note: If it fails to calculate image statistics for some reason, it sets the RMS value to -1.0.
-1.0 is the special value in image statistics calculation of all tasks.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
pp : Imaging post process parameters of prepare()
"""
LOG.info('Calculate sensitivity of combined image')
with casa_tools.ImageReader(pp.imagename) as ia:
pp.cs = ia.coordsys()
pp.faxis = pp.cs.findaxisbyname('spectral')
pp.chan_width = pp.cs.increment()['numeric'][pp.faxis]
pp.brightnessunit = ia.brightnessunit()
pp.beam = ia.restoringbeam()
pp.qcell = list(pp.cs.increment(format='q', type='direction')['quantity'].values())
# cs.increment(format='s', type='direction')['string']
# Define image channels to calculate statistics
pp.include_channel_range = self._get_stat_chans(pp.imagename, rgp.combined.rms_exclude, cp.edge)
pp.stat_chans = convert_range_list_to_string(pp.include_channel_range)
# Define region to calculate statistics
pp.raster_infos = self.get_raster_info_list(cp, rgp)
pp.region = self._get_stat_region(pp)
# Image statistics
if pp.region is None:
LOG.warning('Could not get valid region of interest to calculate image statistics.')
pp.image_rms = -1.0
else:
_statval = calc_image_statistics(pp.imagename, pp.stat_chans, pp.region)
if len(_statval['rms']):
pp.image_rms = _statval['rms'][0]
LOG.info("Statistics of line free channels ({}): RMS = {:f} {}, Stddev = {:f} {}, "
"Mean = {:f} {}".format(pp.stat_chans, _statval['rms'][0], pp.brightnessunit,
_statval['sigma'][0], pp.brightnessunit,
_statval['mean'][0], pp.brightnessunit))
else:
LOG.warning('Could not get image statistics. Potentially no valid pixel in region of interest.')
pp.image_rms = -1.0
for _stat_name in ['max', 'min']:
_val = _statval.get(_stat_name, [])
setattr(pp, f'image_{_stat_name}', _val[0] if _val else -1.0)
# Theoretical RMS
LOG.info('Calculating theoretical RMS of image, {}'.format(pp.imagename))
pp.theoretical_rms = self.calculate_theoretical_image_rms(cp, rgp, pp)
def _execute_combine_images(self, rgp: imaging_params.ReductionGroupParameters):
"""Combine images.
Args:
rgp : Reduction group parameter object of prepare()
"""
LOG.info('Combine images of Source {} Spw {:d}'.format(rgp.source_name, rgp.combined.v_spws[REF_MS_ID]))
_combine_inputs = sdcombine.SDImageCombineInputs(self.inputs.context, inimages=rgp.tocombine.images,
outfile=rgp.imagename,
org_directions=rgp.tocombine.org_directions,
specmodes=rgp.tocombine.specmodes)
_combine_task = sdcombine.SDImageCombine(_combine_inputs)
_freq_chan_reversed = False
if isinstance(rgp.imager_result, resultobjects.SDImagingResultItem):
_freq_chan_reversed = rgp.imager_result.frequency_channel_reversed
rgp.imager_result = self._executor.execute(_combine_task)
rgp.imager_result.frequency_channel_reversed = _freq_chan_reversed
def _set_representative_flag(self,
rgp: imaging_params.ReductionGroupParameters,
pp: imaging_params.PostProcessParameters):
"""Set is_representative_source_and_spw flag.
Args:
rgp : Reduction group parameter object of prepare()
pp : Imaging post process parameters of prepare()
"""
_rep_source_name, _rep_spw_id = rgp.ref_ms.get_representative_source_spw()
pp.is_representative_source_and_spw = \
_rep_spw_id == rgp.combined.spws[REF_MS_ID] and \
_rep_source_name == utils.dequote(rgp.source_name)
def _warn_if_early_cycle(self, rgp: imaging_params.ReductionGroupParameters):
"""Warn when it processes MeasurementSet of ALMA Cycle 2 and earlier.
Args:
rgp (imaging_params.ReductionGroupParameters): Reduction group parameter object of prepare()
"""
_cqa = casa_tools.quanta
if rgp.ref_ms.antenna_array.name == 'ALMA' and \
_cqa.time(rgp.ref_ms.start_time['m0'], 0, ['ymd', 'no_time'])[0] < '2015/10/01':
LOG.warning("ALMA Cycle 2 and earlier project does not have a valid effective bandwidth. "
"Therefore, a nominal value of channel separation loaded from the MS "
"is used as an effective bandwidth for RMS estimation.")
def _calculate_sensitivity(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters,
pp: imaging_params.PostProcessParameters):
"""Calculate channel and frequency ranges of line free channels.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
pp : Imaging post process parameters of prepare()
"""
_ref_pixel = pp.cs.referencepixel()['numeric']
_freqs = []
_cqa = casa_tools.quanta
for _ichan in pp.include_channel_range:
_ref_pixel[pp.faxis] = _ichan
_freqs.append(pp.cs.toworld(_ref_pixel)['numeric'][pp.faxis])
if len(_freqs) > 1 and _freqs[0] > _freqs[1]: # LSB
_freqs.reverse()
pp.stat_freqs = str(', ').join(['{:f}~{:f}GHz'.format(_freqs[_iseg] * 1.e-9, _freqs[_iseg + 1] * 1.e-9)
for _iseg in range(0, len(_freqs), 2)])
_file_index = [common.get_ms_idx(self.inputs.context, name) for name in rgp.combined.infiles]
_bw = _cqa.quantity(pp.chan_width, 'Hz')
_spwid = str(rgp.combined.v_spws[REF_MS_ID])
_spwobj = rgp.ref_ms.get_spectral_window(_spwid)
_effective_bw = _cqa.quantity(_spwobj.channels.chan_effbws[0], 'Hz')
_theoretical_rms_aqua = pp.theoretical_rms if pp.brightnessunit == 'Jy/beam' else None # for some telescopes, the unit might be different from 'Jy/beam'
_sensitivity = Sensitivity(array='TP', intent='TARGET', field=rgp.source_name,
spw=_spwid, is_representative=pp.is_representative_source_and_spw,
bandwidth=_bw, bwmode='cube', beam=pp.beam, cell=pp.qcell,
observed_sensitivity=_cqa.quantity(pp.image_rms, pp.brightnessunit),
effective_bw=_effective_bw, imagename=rgp.imagename,
datatype=self.inputs.datatype.name, theoretical_sensitivity=_theoretical_rms_aqua)
_theoretical_noise = Sensitivity(array='TP', intent='TARGET', field=rgp.source_name,
spw=_spwid, is_representative=pp.is_representative_source_and_spw,
bandwidth=_bw, bwmode='cube', beam=pp.beam, cell=pp.qcell,
theoretical_sensitivity=pp.theoretical_rms, observed_sensitivity=None)
_sensitivity_info = SensitivityInfo(_sensitivity, pp.stat_freqs, (cp.is_not_nro()))
effbw = float(_cqa.getvalue(_effective_bw)[0])
self._finalize_worker_result(self.inputs.context, rgp.imager_result, session=','.join(cp.session_names), sourcename=rgp.source_name,
spwlist=rgp.combined.v_spws, antenna='COMBINED', specmode=rgp.specmode,
imagemode=cp.imagemode, stokes=self.stokes,
datatype=self.inputs.datatype, datamin=pp.image_min, datamax=pp.image_max,
datarms=pp.image_rms, validsp=pp.validsps, rms=pp.rmss,
edge=cp.edge, reduction_group_id=rgp.group_id, file_index=_file_index,
assoc_antennas=rgp.combined.antids, assoc_fields=rgp.combined.fieldids,
assoc_spws=rgp.combined.v_spws, sensitivity_info=_sensitivity_info,
theoretical_rms=_theoretical_noise, effbw=effbw)
def _execute_combine_images_for_nro(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters,
pp: imaging_params.PostProcessParameters) -> bool:
"""Combine images for NRO data.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
pp : Imaging post process parameters of prepare()
Returns:
False if a valid image to combine does not exist for a specified source or spw.
"""
_cqa = casa_tools.quanta
if len(rgp.tocombine.images_nro) == 0:
LOG.warning("No valid image to combine for Source {}, Spw {:d}".format(rgp.source_name, rgp.spwids[0]))
return False
# image name
# image name should be based on virtual spw id
pp.imagename = self.get_imagename(rgp.source_name, rgp.combined.v_spws_unique,
stokes=rgp.correlations, specmode=rgp.specmode)
# Imaging of all antennas
LOG.info('Combine images of Source {} Spw {:d}'.format(rgp.source_name, rgp.combined.v_spws[REF_MS_ID]))
_combine_inputs = sdcombine.SDImageCombineInputs(self.inputs.context, inimages=rgp.tocombine.images_nro,
outfile=pp.imagename,
org_directions=rgp.tocombine.org_directions_nro,
specmodes=rgp.tocombine.specmodes)
_combine_task = sdcombine.SDImageCombine(_combine_inputs)
rgp.imager_result = self._executor.execute(_combine_task)
if rgp.imager_result.outcome is not None:
# Imaging was successful, proceed following steps
_file_index = [common.get_ms_idx(self.inputs.context, name) for name in rgp.combined.infiles]
_spwid = str(rgp.combined.v_spws[REF_MS_ID])
_spwobj = rgp.ref_ms.get_spectral_window(_spwid)
_effective_bw = _cqa.quantity(_spwobj.channels.chan_effbws[0], 'Hz')
effbw = float(_cqa.getvalue(_effective_bw)[0])
self._finalize_worker_result(self.inputs.context, rgp.imager_result, session=','.join(cp.session_names), sourcename=rgp.source_name,
spwlist=rgp.combined.v_spws, antenna='COMBINED', specmode=rgp.specmode,
imagemode=cp.imagemode, stokes=rgp.stokes_list[1],
datatype=self.inputs.datatype, datamin=pp.image_min, datamax=pp.image_max,
datarms=pp.image_rms, validsp=pp.validsps,
rms=pp.rmss, edge=cp.edge, reduction_group_id=rgp.group_id,
file_index=_file_index, assoc_antennas=rgp.combined.antids,
assoc_fields=rgp.combined.fieldids, assoc_spws=rgp.combined.v_spws, effbw=effbw)
cp.results.append(rgp.imager_result)
return True
def _prepare_for_combine_images(self, rgp: imaging_params.ReductionGroupParameters):
"""Prepare before combining images.
Args:
rgp : Reduction group parameter object of prepare()
"""
# reference MS
rgp.ref_ms = self.inputs.context.observing_run.get_ms(name=rgp.combined.infiles[REF_MS_ID])
# image name
# image name should be based on virtual spw id
rgp.combined.v_spws_unique = numpy.unique(rgp.combined.v_spws)
assert len(rgp.combined.v_spws_unique) == 1
rgp.imagename = self.get_imagename(rgp.source_name, rgp.combined.v_spws_unique, specmode=rgp.specmode)
def _skip_this_loop(self, rgp: imaging_params.ReductionGroupParameters) -> bool:
"""Determine whether combine should be skipped.
Args:
rgp : Reduction group parameter object of prepare()
Returns:
A boolean flag of determination whether the loop should be skipped or not.
"""
if self.inputs.is_ampcal:
LOG.info("Skipping combined image for the amplitude calibrator.")
return True
# Make combined image
if len(rgp.tocombine.images) == 0:
LOG.warning("No valid image to combine for Source {}, Spw {:d}".format(rgp.source_name, rgp.spwids[0]))
return True
return False
def _execute_imaging(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters) -> bool:
"""Execute imaging per antenna, source.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
Returns:
False if coordinate setting fails before imaging is executed.
"""
LOG.info('Imaging Source {}, Ant {} Spw {:d}'.format(rgp.source_name, rgp.ant_name, rgp.spwids[0]))
# map coordinate (use identical map coordinate per spw)
if not rgp.coord_set and not self._initialize_coord_set(cp, rgp):
return False
self._execute_imaging_worker(cp, rgp)
return True
def _set_asdm_to_outcome_vis_if_imagemode_is_ampcal(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters):
"""Set ASDM to vis.outcome if imagemode of the vis is AMPCAL.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
"""
if self.inputs.is_ampcal:
if len(cp.infiles) == 1 and (rgp.asdm not in ['', None]):
rgp.imager_result.outcome['vis'] = rgp.asdm
def _has_imager_result_outcome(self, rgp: imaging_params.ReductionGroupParameters) -> bool:
"""Check whether imager_result.outcome has some value or not.
Args:
rgp : Reduction group parameter object of prepare()
Returns:
True if imager_result.outcome has some value.
"""
return rgp.imager_result.outcome is not None
def _has_nro_imager_result_outcome(self, rgp: imaging_params.ReductionGroupParameters) -> bool:
"""Check whether imager_result_nro.outcome has some value or not.
Args:
rgp : Reduction group parameter object of prepare()
Returns:
True if imager_result_nro.outcome has some value.
"""
return rgp.imager_result_nro is not None and rgp.imager_result_nro.outcome is not None
def _detect_missed_lines(self,
cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters,
pp: imaging_params.PostProcessParameters ):
"""
Detect lines that are possibly missed to be identified
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
pp : Imaging post process parameters of prepare()
Raises:
ValueError if unknown mask mode is returned DetectMissedLines.analyze()
"""
do_plot = not basetask.DISABLE_WEBLOG
# pick valid_lines
valid_lines = [ ll[0:2] for ll in rgp.channelmap_range_list[0] if ll[2] is True ]
# get linefree ranges from pp
linefree_ranges = convert_range_list_to_ranges( pp.include_channel_range )
# initialize
missed_lines = detect_missed_lines.DetectMissedLines(
self.inputs.context,
rgp.msobjs,
rgp.spwid_list,
rgp.fieldid_list,
rgp.imager_result.outcome['image'],
rgp.imager_result.frequency_channel_reversed,
cp.edge,
do_plot
)
# analyze
detections = missed_lines.analyze( valid_lines, linefree_ranges )
# register the results
rgp.imager_result.outcome['line_emission_off_range_at_peak'] = detections['single_beam']
rgp.imager_result.outcome['line_emission_off_range_extended'] = detections['moment_mask']
def _detect_contamination(
self,
image_item: ImageItem,
is_lsb: bool,
edge_channels: tuple[int, int] = (0, 0),
atm_channels: numpy.ndarray | list[bool] = []
) -> bool:
"""Detect contamination of image.
Args:
image_item: Image item to be checked for contamination
is_lsb: A boolean flag whether the image is for lower sideband
(LSB) spw or not
edge_channels: Two tuple of integers for the number of
edge channels to be excluded from the analysis
atm_channels: Channel mask for ATM feature. True means that the
channel is a part of ATM feature.
Returns:
A boolean flag whether the image is contaminated or not
"""
# PIPE-251: detect contamination
do_plot = not basetask.DISABLE_WEBLOG
# exclude ATM feature
channel_mask = numpy.logical_not(atm_channels)
# exclude edge channels
edge_count_lower, edge_count_upper = edge_channels
if edge_count_lower > 0:
channel_mask[:edge_count_lower] = False
if edge_count_upper > 0:
channel_mask[-edge_count_upper:] = False
contaminated = detectcontamination.detect_contamination(
self.inputs.context, image_item, is_lsb, do_plot, channel_mask
)
return contaminated
def _append_result(self, cp: imaging_params.CommonParameters, rgp: imaging_params.ReductionGroupParameters):
"""Append result to RGP.
Args:
rgp : Reduction group parameter object of prepare()
"""
cp.results.append(rgp.imager_result)
[docs]
def analyse(self, results: SDImagingResults) -> SDImagingResults:
"""Analyse results generated by prepare.
Analysis includes a detection of edge channels as well as
channel range for ATM feature. Based on these information,
contamination of the image is examined.
Args:
results: Results object
Returns:
Results object
"""
ctx = self.inputs.context
# edge channel detection for combined images
results_for_edge_channel_detection = [
r for r in results if r.outcome["image"].antenna == "COMBINED"
]
for r in results_for_edge_channel_detection:
image_item = r.outcome['image']
# detect edge channels
# edge_channels is a two tuple of integers
edge_channels = detect_edge_channels(image_item.imagename)
LOG.debug(
"edge channels for %s are %s",
image_item.imagename,
edge_channels
)
r.outcome["edge_channels"] = edge_channels
# contamination analysis for Stokes I combined images
results_for_contamination_analysis = filter(
lambda r: r.outcome["image"].stokes == "I",
results_for_edge_channel_detection
)
for r in results_for_contamination_analysis:
image_item = r.outcome['image']
edge_channels = r.outcome["edge_channels"]
with casa_tools.ImageReader(image_item.imagename) as ia:
image_shape = ia.shape()
cs = ia.coordsys()
freq_axis = cs.findaxisbyname("spectral")
cs.done()
nchan = image_shape[freq_axis]
# detect range of ATM feature
ms_index_list, unique_indices = numpy.unique(
r.outcome['file_index'], return_index=True
)
vspw_list = numpy.asarray(
r.outcome['assoc_spws']
)[unique_indices]
atm_channels = numpy.zeros(nchan, dtype=bool)
for ms_id, vspw in zip(ms_index_list, vspw_list):
ms = ctx.observing_run.measurement_sets[ms_id]
spw_id = ctx.observing_run.virtual2real_spw_id(vspw, ms)
# detect channels overlapped with ATM feature
_detected_chans = detect_atm_channels(ms, spw_id)
if _detected_chans is not None and len(_detected_chans) == nchan:
atm_channels = numpy.logical_or(
atm_channels, _detected_chans
)
is_lsb = r.frequency_channel_reversed
if is_lsb:
# mask needs to be reversed
atm_channels = atm_channels[::-1]
LOG.debug(
"ATM feature range: %s",
numpy.where(atm_channels)[0].tolist()
)
contaminated = self._detect_contamination(
image_item,
is_lsb,
edge_channels=edge_channels,
atm_channels=atm_channels
)
r.outcome['contaminated'] = contaminated
return results
def _get_rms_exclude_freq_range_image(self, to_frame: str, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters) -> list[tuple[Number, Number]]:
"""
Return a combined list of frequency ranges.
This method combines deviation mask, channel map ranges, and edges.
Args
to_frame : The frequency frame of output
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
Returns:
a list of combined frequency ranges in output frequency frame (to_frame),
e.g., [ [minfreq0,maxfreq0], [minfreq1,maxfreq1], ...]
"""
image_rms_freq_range = []
channelmap_range = []
# LOG.info("#####Raw chanmap_range={}".format(str(rgp.chanmap_range_list)))
for chanmap_range in rgp.chanmap_range_list:
for map_range in chanmap_range:
if map_range[2]:
min_chan = int(map_range[0] - map_range[1] * 0.5)
max_chan = int(numpy.ceil(map_range[0] + map_range[1] * 0.5))
channelmap_range.append([min_chan, max_chan])
LOG.debug("#####CHANNEL MAP RANGE = {}".format(str(channelmap_range)))
for i in range(len(rgp.msobjs)):
# define channel ranges of lines and deviation mask for each MS
msobj = rgp.msobjs[i]
fieldid = rgp.fieldids[i]
antid = rgp.antids[i]
spwid = rgp.spwids[i]
spwobj = msobj.get_spectral_window(spwid)
deviation_mask = getattr(msobj, 'deviation_mask', {})
exclude_range = deviation_mask.get((fieldid, antid, spwid), [])
LOG.debug("#####{} : DEVIATION MASK = {}".format(msobj.basename, str(exclude_range)))
if len(exclude_range) == 1 and exclude_range[0] == [0, spwobj.num_channels - 1]:
# deviation mask is full channel range when all data are flagged
LOG.warning("Ignoring DEVIATION MASK of {} (SPW {:d}, FIELD {:d}, ANT {:d}). "
"Possibly all data flagged".format(msobj.basename, spwid, fieldid, antid))
exclude_range = []
if cp.edge[0] > 0:
exclude_range.append([0, cp.edge[0] - 1])
if cp.edge[1] > 0:
exclude_range.append([spwobj.num_channels - cp.edge[1], spwobj.num_channels - 1])
if len(channelmap_range) > 0:
exclude_range.extend(channelmap_range)
# check the validity of channel number and fix it when out of range
min_chan = 0
max_chan = spwobj.num_channels - 1
exclude_channel_range = [[max(min_chan, x[0]), min(max_chan, x[1])]
for x in merge_ranges(exclude_range)]
LOG.info("{} : channel map and deviation mask channel ranges "
"in MS frame = {}".format(msobj.basename, str(exclude_channel_range)))
# define frequency ranges of RMS
exclude_freq_range = numpy.zeros(2 * len(exclude_channel_range))
for jseg in range(len(exclude_channel_range)):
(lfreq, rfreq) = (spwobj.channels.chan_freqs[jchan] for jchan in exclude_channel_range[jseg])
# handling of LSB
exclude_freq_range[2 * jseg: 2 * jseg + 2] = [min(lfreq, rfreq), max(lfreq, rfreq)]
LOG.debug("#####CHANNEL MAP AND DEVIATION MASK FREQ RANGE = {}".format(str(exclude_freq_range)))
if len(exclude_freq_range) == 0:
continue # no ranges to add
# convert MS freqency ranges to image frame
field = msobj.fields[fieldid]
direction_ref = field.mdirection
start_time = msobj.start_time
end_time = msobj.end_time
me = casa_tools.measures
qa = casa_tools.quanta
qmid_time = qa.quantity(start_time['m0'])
qmid_time = qa.add(qmid_time, end_time['m0'])
qmid_time = qa.div(qmid_time, 2.0)
time_ref = me.epoch(rf=start_time['refer'],
v0=qmid_time)
position_ref = msobj.antennas[antid].position
if to_frame == 'REST':
mse = casa_tools.ms
mse.open(msobj.name)
obstime = qa.time(qmid_time, form='ymd')[0]
v_to = mse.cvelfreqs(spwids=[spwid], obstime=obstime, outframe='SOURCE')
v_from = mse.cvelfreqs(spwids=[spwid], obstime=obstime, outframe=spwobj.frame)
mse.close()
_to_imageframe = interpolate.interp1d(v_from, v_to, kind='linear',
bounds_error=False, fill_value='extrapolate')
else:
# initialize
me.done()
me.doframe(time_ref)
me.doframe(direction_ref)
me.doframe(position_ref)
def _to_imageframe(x):
m = me.frequency(rf=spwobj.frame, v0=qa.quantity(x, 'Hz'))
converted = me.measure(v=m, rf=to_frame)
qout = qa.convert(converted['m0'], outunit='Hz')
return qout['value']
image_rms_freq_range.extend(map(_to_imageframe, exclude_freq_range))
me.done()
# LOG.info("#####Overall LINE CHANNELS IN IMAGE FRAME = {}".format(str(image_rms_freq_range)))
if len(image_rms_freq_range) == 0:
return image_rms_freq_range
return merge_ranges(numpy.reshape(image_rms_freq_range, (len(image_rms_freq_range) // 2, 2), 'C'))
[docs]
def get_imagename(self, source: str, spwids: list[int],
antenna: str=None, asdm: str=None, stokes: str=None, specmode: str='cube') -> str:
"""Generate a filename of the image.
Args:
source : Source name
spwids : SpW IDs
antenna : Antenna name. Defaults to None.
asdm : ASDM. Defaults to None.
stokes : Stokes parameter. Defaults to None.
specmode : specmode for tsdimaging. Defaults to 'cube'.
Raises:
ValueError: if asdm is not provided for ampcal
Returns:
A filename of the image
"""
context = self.inputs.context
is_nro = sdutils.is_nro(context)
if is_nro:
namer = filenamer.Image(virtspw=False)
else:
namer = filenamer.Image()
if self.inputs.is_ampcal:
nameroot = asdm
if nameroot is None:
raise ValueError('ASDM uid must be provided to construct ampcal image name')
elif is_nro:
nameroot = ''
else:
nameroot = context.project_structure.ousstatus_entity_id
if nameroot == 'unknown':
nameroot = 'oussid'
nameroot = filenamer.sanitize(nameroot)
namer._associations.asdm(nameroot)
# output_dir = context.output_dir
# if output_dir:
# namer.output_dir(output_dir)
if not is_nro:
namer.stage(context.stage)
namer.source(source)
if self.inputs.is_ampcal:
namer.intent(self.inputs.mode.lower())
elif is_nro:
pass
else:
namer.science()
namer.spectral_window(spwids[0])
if stokes is None:
stokes = self.stokes
namer.polarization(stokes)
namer.specmode(specmode)
# so far we always create native resolution, full channel image
# namer.spectral_image()
namer._associations.format('image.sd')
# namer.single_dish()
namer.antenna(antenna)
# iteration is necessary for exportdata
namer.iteration(0)
imagename = namer.get_filename()
return imagename
def _get_stat_chans(self, imagename: str,
combined_rms_exclude: list[tuple[float, float]],
edge: tuple[int, int]=(0, 0)) -> list[int]:
"""Return a list of channel ranges to calculate image statistics.
Args:
imagename : A filename of the image
combined_rms_exclude : A list of frequency ranges to exclude
edge : The left and right edge channels to exclude. Defaults to (0, 0).
Retruns:
A 1-d list of channel ranges to INCLUDE in calculation of image
statistics, e.g., [imin0, imax0, imin0, imax0, ...]
"""
with casa_tools.ImageReader(imagename) as ia:
try:
cs = ia.coordsys()
faxis = cs.findaxisbyname('spectral')
num_chan = ia.shape()[faxis]
exclude_chan_ranges = convert_frequency_ranges_to_channels(combined_rms_exclude, cs, num_chan)
finally:
cs.done()
LOG.info("Merged spectral line channel ranges of combined image = {}".format(str(exclude_chan_ranges)))
include_chan_ranges = invert_ranges(exclude_chan_ranges, num_chan, edge)
LOG.info("Line free channel ranges of image to calculate RMS = {}".format(str(include_chan_ranges)))
return include_chan_ranges
def _get_stat_region(self, pp: imaging_params.PostProcessParameters) -> str | None:
"""
Retrun region to calculate statistics.
Median width, height, and position angle is adopted as a reference
map extent and then the width and height will be shrinked by 2 beam
size in each direction.
Arg:
pp : Imaging post process parameters of prepare()
Retruns:
Region expression string of a rotating box.
Returns None if no valid region of interest is defined.
"""
cqa = casa_tools.quanta
beam_unit = cqa.getunit(pp.beam['major'])
assert cqa.getunit(pp.beam['minor']) == beam_unit
beam_size = numpy.sqrt(cqa.getvalue(pp.beam['major'])[0] * cqa.getvalue(pp.beam['minor'])[0])
center_unit = 'deg'
angle_unit = None
for r in pp.raster_infos:
if r is None:
continue
angle_unit = cqa.getunit(r.scan_angle)
break
if angle_unit is None:
LOG.warning('No valid raster information available.')
return None
def _value_in_unit(quantity: dict, unit: str) -> float:
# Get value(s) of quantity in a specified unit
return cqa.getvalue(cqa.convert(quantity, unit))
def _extract_values(value: str, unit: str) -> Number:
# Extract valid values of specified attributes in list
return [_value_in_unit(getattr(r, value), unit) for r in pp.raster_infos if r is not None]
rep_width = numpy.nanmedian(_extract_values('width', beam_unit))
rep_height = numpy.nanmedian(_extract_values('height', beam_unit))
rep_angle = numpy.nanmedian([cqa.getvalue(r.scan_angle) for r in pp.raster_infos if r is not None])
center_ra = numpy.nanmedian(_extract_values('center_ra', center_unit))
center_dec = numpy.nanmedian(_extract_values('center_dec', center_unit))
width = rep_width - beam_size
height = rep_height - beam_size
if width <= 0 or height <= 0: # No valid region selected.
return None
if pp.org_direction is not None:
(center_ra, center_dec) = direction_utils.direction_recover(center_ra,
center_dec,
pp.org_direction)
center = [cqa.tos(cqa.quantity(center_ra, center_unit)),
cqa.tos(cqa.quantity(center_dec, center_unit))]
region = "rotbox[{}, [{}{}, {}{}], {}{}]".format(center,
width, beam_unit,
height, beam_unit,
rep_angle, angle_unit)
return region
[docs]
def get_raster_info_list(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters) -> list[RasterInfo]:
"""
Retrun a list of raster information.
Each raster infromation is analyzed for element wise combination of
infile, antenna, field, and SpW IDs in input parameter lists.
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
Returns:
A list of RasterInfo. If raster information could not be obtained,
the corresponding elements in the list will be None.
"""
assert len(rgp.combined.infiles) == len(rgp.combined.antids)
assert len(rgp.combined.infiles) == len(rgp.combined.fieldids)
assert len(rgp.combined.infiles) == len(rgp.combined.spws)
raster_info_list = []
for (infile, antid, fieldid, spwid) in \
zip(rgp.combined.infiles, rgp.combined.antids, rgp.combined.fieldids, rgp.combined.spws):
msobj = self.inputs.context.observing_run.get_ms(name=infile)
# Non raster data set.
if msobj.observing_pattern[antid][spwid][fieldid] != 'RASTER':
f = msobj.get_fields(field_id=fieldid)[0]
LOG.warning('Not a raster map: field {} in {}'.format(f.name, msobj.basename))
raster_info_list.append(None)
dt = cp.dt_dict[msobj.basename]
try:
raster_info_list.append(_analyze_raster_pattern(dt, msobj, fieldid, spwid, antid, rgp))
except Exception:
f = msobj.get_fields(field_id=fieldid)[0]
a = msobj.get_antenna(antid)[0]
LOG.info('Could not get raster information of field {}, Spw {}, Ant {}, MS {}. '
'Potentially be because all data are flagged.'.format(f.name, spwid, a.name, msobj.basename))
raster_info_list.append(None)
assert len(rgp.combined.infiles) == len(raster_info_list)
return raster_info_list
[docs]
def calculate_theoretical_image_rms(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters,
pp: imaging_params.PostProcessParameters) -> dict[str, float]:
"""Calculate theoretical RMS of an image (PIPE-657).
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
pp : Imaging post process parameters of prepare()
Note: the number of elements in rgp.combined.antids, fieldids, spws, and pols should be equal
to that of infiles
Returns:
A quantum value of theoretical image RMS.
The value of quantity will be negative when calculation is aborted, i.e., -1.0 Jy/beam
"""
tirp = imaging_params.TheoreticalImageRmsParameters(pp, self.inputs.context)
if len(rgp.combined.infiles) == 0:
LOG.error('No MS given to calculate a theoretical RMS. Aborting calculation of theoretical thermal noise.')
return tirp.failed_rms
assert len(rgp.combined.infiles) == len(rgp.combined.antids)
assert len(rgp.combined.infiles) == len(rgp.combined.fieldids)
assert len(rgp.combined.infiles) == len(rgp.combined.spws)
assert len(rgp.combined.infiles) == len(rgp.combined.pols)
assert len(rgp.combined.infiles) == len(pp.raster_infos)
for (tirp.infile, tirp.antid, tirp.fieldid, tirp.spwid, tirp.pol_names, tirp.raster_info) in \
zip(rgp.combined.infiles, rgp.combined.antids, rgp.combined.fieldids,
rgp.combined.spws, rgp.combined.pols, pp.raster_infos):
halt, skip = self._loop_initializer_of_theoretical_image_rms(cp, rgp, tirp)
if halt:
return tirp.failed_rms
if skip:
continue
# effective BW
self._obtain_effective_BW(tirp)
# obtain average Tsys
self._obtain_average_tsys(tirp)
# obtain Wx, and Wy
self._obtain_wx_and_wy(tirp)
# obtain T_ON
self._obtain_t_on_actual(tirp)
if tirp.t_on_act < MIN_INTEGRATION_SEC:
continue
# obtain calibration tables applied
self._obtain_calibration_tables_applied(tirp)
# obtain Tsub,on, Tsub,off (average ON and OFF integration duration per raster row)
if not self._obtain_t_sub_on_off(tirp):
return tirp.failed_rms
if tirp.t_sub_on < MIN_INTEGRATION_SEC or \
tirp.t_sub_off < MIN_INTEGRATION_SEC:
continue
# obtain factors by convolution function
# (THIS ASSUMES SF kernel with either convsupport = 6 (ALMA) or 3 (NRO)
# TODO: Ggeneralize factor for SF, and Gaussian convolution function
if not self._obtain_and_set_factors_by_convolution_function(pp, tirp):
return tirp.failed_rms
if tirp.weight_sum == 0:
LOG.warning('No rms estimate is available.')
return tirp.failed_rms
_theoretical_rms = numpy.sqrt(tirp.sq_rms) / tirp.weight_sum
LOG.info('Theoretical RMS of image = {} {}'.format(_theoretical_rms, pp.brightnessunit))
return tirp.cqa.quantity(_theoretical_rms, pp.brightnessunit)
def _obtain_t_sub_on_off(self, tirp: imaging_params.TheoreticalImageRmsParameters) -> bool:
"""Obtain Tsub,on and Tsub,off. A sub method of calculate_theoretical_image_rms().
Args:
tirp : Parameter object of calculate_theoretical_image_rms()
Returns:
False if it cannot get Tsub,on/off values by some error.
Raises:
BaseException : raises when it cannot find a sky caltable applied.
"""
tirp.t_sub_on = tirp.cqa.getvalue(tirp.cqa.convert(tirp.raster_info.row_duration, tirp.time_unit))[0]
_sky_field = tirp.calmsobj.calibration_strategy['field_strategy'][tirp.fieldid]
try:
_skytab = ''
_caltabs = tirp.context.callibrary.applied.get_caltable('ps')
# For some reasons, sky caltable is not registered to calstate
for _cto, _cfrom in tirp.context.callibrary.applied.merged().items():
if _cto.vis == tirp.calmsobj.name and (_cto.field == '' or
tirp.fieldid in
[f.id for f in tirp.calmsobj.get_fields(name=_cto.field)]):
for _cf in _cfrom:
if _cf.gaintable in _caltabs:
_skytab = _cf.gaintable
break
except BaseException:
LOG.error('Could not find a sky caltable applied. ' + tirp.error_msg)
raise
if not os.path.exists(_skytab):
LOG.warning('Could not find a sky caltable applied. ' + tirp.error_msg)
return False
LOG.info('Searching OFF scans in {}'.format(os.path.basename(_skytab)))
with casa_tools.TableReader(_skytab) as tb:
_interval_unit = tb.getcolkeyword('INTERVAL', 'QuantumUnits')[0]
_t = tb.query('SPECTRAL_WINDOW_ID=={}&&ANTENNA1=={}&&FIELD_ID=={}'.format(tirp.spwid, tirp.antid,
_sky_field),
columns='INTERVAL')
if _t.nrows == 0:
LOG.warning('No sky caltable row found for spw {}, antenna {}, field {} in {}. {}'.format(
tirp.spwid, tirp.antid, _sky_field, os.path.basename(_skytab), tirp.error_msg))
_t.close()
return False
try:
_interval = _t.getcol('INTERVAL')
finally:
_t.close()
tirp.t_sub_off = tirp.cqa.getvalue(tirp.cqa.convert(tirp.cqa.quantity(_interval.mean(),
_interval_unit), tirp.time_unit))[0]
LOG.info('Subscan Time ON = {} {}, OFF = {} {}'.format(tirp.t_sub_on, tirp.time_unit,
tirp.t_sub_off, tirp.time_unit))
return True
def _obtain_jy_per_k(self, pp: imaging_params.PostProcessParameters,
tirp: imaging_params.TheoreticalImageRmsParameters) -> float | bool:
"""Obtain Jy/K. A sub method of calculate_theoretical_image_rms().
Args:
pp : Imaging post process parameters of prepare()
tirp : Parameter object of calculate_theoretical_image_rms()
Returns:
Jy/K value or failure flag
Raises:
BaseException : raises when it cannot find a Jy/K caltable applied.
"""
if pp.brightnessunit == 'K':
_jy_per_k = 1.0
LOG.info('No Jy/K conversion was performed to the image.')
else:
try:
_k2jytab = ''
_caltabs = tirp.context.callibrary.applied.get_caltable(('amp', 'gaincal'))
_found = _caltabs.intersection(tirp.calst.get_caltable(('amp', 'gaincal')))
if len(_found) == 0:
LOG.warning('Could not find a Jy/K caltable applied. ' + tirp.error_msg)
return False
if len(_found) > 1:
LOG.warning('More than one Jy/K caltables are found.')
_k2jytab = _found.pop()
LOG.info('Searching Jy/K factor in {}'.format(os.path.basename(_k2jytab)))
except BaseException:
LOG.error('Could not find a Jy/K caltable applied. ' + tirp.error_msg)
raise
if not os.path.exists(_k2jytab):
LOG.warning('Could not find a Jy/K caltable applied. ' + tirp.error_msg)
return False
with casa_tools.TableReader(_k2jytab) as tb:
_t = tb.query('SPECTRAL_WINDOW_ID=={}&&ANTENNA1=={}'.format(tirp.spwid, tirp.antid),
columns='CPARAM')
if _t.nrows == 0:
LOG.warning('No Jy/K caltable row found for spw {}, antenna {} in {}. {}'.format(tirp.spwid,
tirp.antid, os.path.basename(_k2jytab), tirp.error_msg))
_t.close()
return False
try:
tc = _t.getcol('CPARAM')
finally:
_t.close()
_jy_per_k = (1. / tc.mean(axis=-1).real ** 2).mean()
LOG.info('Jy/K factor = {}'.format(_jy_per_k)) # obtain Jy/k factor
return _jy_per_k
def _obtain_and_set_factors_by_convolution_function(self, pp: imaging_params.PostProcessParameters,
tirp: imaging_params.TheoreticalImageRmsParameters) -> bool:
"""Obtain factors by convolution function, and set it into TIRP. A sub method of calculate_theoretical_image_rms().
Args:
pp : Imaging post process parameters of prepare()
tirp : Parameter object of calculate_theoretical_image_rms()
Returns:
False if it cannot get Jy/K
"""
policy = observatory_policy.get_imaging_policy(tirp.context)
jy_per_k = self._obtain_jy_per_k(pp, tirp)
if jy_per_k is False:
return False
ang = tirp.cqa.getvalue(tirp.cqa.convert(tirp.raster_info.scan_angle, 'rad'))[0] + 0.5 * numpy.pi
c_proj = numpy.sqrt((tirp.cy_val * numpy.sin(ang)) ** 2 + (tirp.cx_val * numpy.cos(ang)) ** 2)
inv_variant_on = tirp.effBW * numpy.abs(tirp.cx_val * tirp.cy_val) * \
tirp.t_on_act / tirp.width / tirp.height
inv_variant_off = tirp.effBW * c_proj * tirp.t_sub_off * tirp.t_on_act / tirp.t_sub_on / tirp.height
weight = tirp.t_on_act ** 2 * tirp.t_sub_off / tirp.t_sub_on
for ipol in tirp.polids:
tirp.sq_rms += (jy_per_k * tirp.mean_tsys_per_pol[ipol] * weight) ** 2 * \
(policy.get_conv2d() ** 2 / inv_variant_on + policy.get_conv1d() ** 2 / inv_variant_off)
tirp.weight_sum += weight
return True
def _obtain_t_on_actual(self, tirp: imaging_params.TheoreticalImageRmsParameters):
"""Obtain T_on actual. A sub method of calculate_theoretical_image_rms().
Args:
tirp : Parameter object of calculate_theoretical_image_rms()
"""
unit = tirp.dt.getcolkeyword('EXPOSURE', 'UNIT')
# Total time on source of data not online flagged for all polarizations.
t_on_tot = tirp.cqa.getvalue(tirp.cqa.convert(tirp.cqa.quantity(
tirp.dt.getcol('EXPOSURE').take(tirp.index_list, axis=-1).sum(), unit), tirp.time_unit))[0]
# Additional flag fraction
flag_summary = tirp.dt.getcol('FLAG_SUMMARY').take(tirp.index_list, axis=-1)
(num_pol, num_data) = flag_summary.shape
# PIPE-2508: fraction of data where any of polarization is flagged. (FLAG_SUMMARY is 0 for flagged data)
# TODO: This logic should be improved in future when full polarization is supported.
num_flagged = numpy.count_nonzero(flag_summary.sum(axis=0) < num_pol)
frac_flagged = num_flagged / num_data
LOG.debug('Per polarization flag summary (# of integrations): total=%d, flagged per pol=%s, any pol flagged=%d',
num_data, num_data - flag_summary.sum(axis=1), num_flagged)
# the actual time on source
tirp.t_on_act = t_on_tot * (1.0 - frac_flagged)
LOG.info('The actual on source time = {} {}'.format(tirp.t_on_act, tirp.time_unit))
LOG.info('- total time on source (excl. online flagged integrations) = {} {}'.format(t_on_tot, tirp.time_unit))
LOG.info('- addtional flag fraction = {} %'.format(100 * frac_flagged))
def _obtain_calibration_tables_applied(self, tirp: imaging_params.TheoreticalImageRmsParameters):
"""Obtain calibration tables applied. A sub method of calculate_theoretical_image_rms().
Args:
tirp : Parameter object of calculate_theoretical_image_rms()
"""
_calto = callibrary.CalTo(vis=tirp.calmsobj.name, field=str(tirp.fieldid))
tirp.calst = tirp.context.callibrary.applied.trimmed(tirp.context, _calto)
def _obtain_wx_and_wy(self, tirp: imaging_params.TheoreticalImageRmsParameters):
"""Obtain Wx and Wy. A sub method of calculate_theoretical_image_rms().
Args:
tirp : Parameter object of calculate_theoretical_image_rms()
"""
tirp.width = tirp.cqa.getvalue(tirp.cqa.convert(tirp.raster_info.width, tirp.ang_unit))[0]
tirp.height = tirp.cqa.getvalue(tirp.cqa.convert(tirp.raster_info.height, tirp.ang_unit))[0]
def _obtain_average_tsys(self, tirp: imaging_params.TheoreticalImageRmsParameters):
"""Obtain average Tsys. A sub method of calculate_theoretical_image_rms().
Args:
tirp : Parameter object of calculate_theoretical_image_rms()
"""
tirp.mean_tsys_per_pol = tirp.dt.getcol('TSYS').take(tirp.index_list, axis=-1).mean(axis=-1)
LOG.info('Mean Tsys = {} K'.format(str(tirp.mean_tsys_per_pol)))
def _obtain_effective_BW(self, tirp: imaging_params.TheoreticalImageRmsParameters):
"""Obtain effective BW. A sub method of calculate_theoretical_image_rms().
Args:
tirp : Parameter object of calculate_theoretical_image_rms()
"""
with casa_tools.MSMDReader(tirp.infile) as _msmd:
tirp.effBW = _msmd.chaneffbws(tirp.spwid).mean()
LOG.info('Using an MS effective bandwidth, {} kHz'.format(tirp.effBW * 0.001))
def _loop_initializer_of_theoretical_image_rms(self, cp: imaging_params.CommonParameters,
rgp: imaging_params.ReductionGroupParameters,
tirp: imaging_params.TheoreticalImageRmsParameters) -> tuple[bool]:
"""Initialize imaging_params.TheoreticalImageRmsParameters for the loop of calculate_theoretical_image_rms().
Args:
cp : Common parameter object of prepare()
rgp : Reduction group parameter object of prepare()
tirp : Parameter object of calculate_theoretical_image_rms()
Returns:
Tupled flag to describe the loop action [go|halt|skip].
GO : (False, False)
HALT : (True, True)
SKIP : (False, True)
"""
tirp.msobj = tirp.context.observing_run.get_ms(name=tirp.infile)
_callist = tirp.context.observing_run.get_measurement_sets_of_type([DataType.REGCAL_CONTLINE_ALL])
tirp.calmsobj = sdutils.match_origin_ms(_callist, tirp.msobj.origin_ms)
_dd_corrs = tirp.msobj.get_data_description(spw=tirp.spwid).corr_axis
tirp.polids = [_dd_corrs.index(p) for p in tirp.pol_names if p in _dd_corrs]
_field_name = tirp.msobj.get_fields(field_id=tirp.fieldid)[0].name
tirp.error_msg = 'Aborting calculation of theoretical thermal noise of ' + \
'Field {} and SpW {}'.format(_field_name, rgp.combined.spws)
HALT = (True, True)
SKIP = (False, True)
GO = (False, False)
if tirp.msobj.observing_pattern[tirp.antid][tirp.spwid][tirp.fieldid] != 'RASTER':
LOG.warning('Unable to calculate RMS of non-Raster map. ' + tirp.error_msg)
return HALT
LOG.info(
'Processing MS {}, Field {}, SpW {}, '
'Antenna {}, Pol {}'.
format(tirp.msobj.basename, _field_name, tirp.spwid,
tirp.msobj.get_antenna(tirp.antid)[0].name, str(tirp.pol_names)))
if tirp.raster_info is None:
_rsres = RasterScanHeuristicsResult(tirp.msobj)
rgp.imager_result.rasterscan_heuristics_results_incomp \
.setdefault(tirp.msobj.origin_ms, []) \
.append(_rsres)
_rsres.set_result_fail(tirp.antid, tirp.spwid, tirp.fieldid)
LOG.debug(f'Raster scan analysis incomplete. Skipping calculation of theoretical image RMS : EB:{tirp.msobj.execblock_id}:{tirp.msobj.antennas[tirp.antid].name}')
return SKIP
tirp.dt = cp.dt_dict[tirp.msobj.basename]
# Note: index_list is a list of DataTable row IDs for selected data EXCLUDING rows where all pols are flagged online.
tirp.index_list = common.get_index_list_for_ms(tirp.dt, [tirp.msobj.origin_ms],
[tirp.antid], [tirp.fieldid], [tirp.spwid])
if len(tirp.index_list) == 0: # this happens when permanent flag is set to all selection.
LOG.info('No unflagged row in DataTable. Skipping further calculation.')
return SKIP
return GO
def _analyze_raster_pattern(datatable: DataTable, msobj: MeasurementSet,
fieldid: int, spwid: int, antid: int, rgp: imaging_params.ReductionGroupParameters) -> RasterInfo:
"""Analyze raster scan pattern from pointing in DataTable.
Args:
datatable : DataTable instance
msobj : MS class instance to process
fieldid : A field ID to process
spwid : An SpW ID to process
antid : An antenna ID to process
rgp : Reduction group parameter object of prepare()
Returns:
A named Tuple of RasterInfo
"""
origin_basename = os.path.basename(msobj.origin_ms)
metadata = rasterutil.read_datatable(datatable)
pflag = metadata.pflag
ra = metadata.ra
dec = metadata.dec
exposure = datatable.getcol('EXPOSURE')
timetable = datatable.get_timetable(ant=antid, spw=spwid, field_id=fieldid, ms=origin_basename, pol=None)
# dtrow_list is a list of numpy array holding datatable rows separated by raster rows
# [[r00, r01, r02, ...], [r10, r11, r12, ...], ...]
dtrow_list_nomask = rasterutil.extract_dtrow_list(timetable)
dtrow_list = [rows[pflag[rows]] for rows in dtrow_list_nomask if numpy.any(pflag[rows] == True)]
radec_unit = datatable.getcolkeyword('OFS_RA', 'UNIT')
assert radec_unit == datatable.getcolkeyword('OFS_DEC', 'UNIT')
exp_unit = datatable.getcolkeyword('EXPOSURE', 'UNIT')
_rsres = RasterScanHeuristicsResult(msobj)
rgp.imager_result.rasterscan_heuristics_results_rgap \
.setdefault(msobj.origin_ms, []) \
.append(_rsres)
try:
gap_r = rasterscan.find_raster_gap(ra, dec, dtrow_list)
except Exception as e:
if isinstance(e, RasterScanHeuristicsFailure):
_rsres.set_result_fail(antid, spwid, fieldid)
LOG.debug('{} : EB:{}:{}'.format(e, msobj.execblock_id, msobj.antennas[antid].name))
try:
dtrow_list_large = rasterutil.extract_dtrow_list(timetable, for_small_gap=False)
se_small = [(v[0], v[-1]) for v in dtrow_list]
se_large = [(v[0], v[-1]) for v in dtrow_list_large]
gap_r = []
for sl, el in se_large:
for i, (ss, es) in enumerate(se_small):
if ss == sl:
gap_r.append(i)
break
gap_r.append(len(dtrow_list))
except Exception:
LOG.warning('Could not find gaps between raster scans. No result is produced.')
return None
cqa = casa_tools.quanta
idx_all = numpy.concatenate(dtrow_list)
mean_dec = numpy.mean(datatable.getcol('DEC')[idx_all])
dec_unit = datatable.getcolkeyword('DEC', 'UNIT')
map_center_dec = cqa.getvalue(
cqa.convert(cqa.quantity(mean_dec, dec_unit), 'rad')
)[0]
dec_factor = numpy.abs(numpy.cos(map_center_dec))
ndata = len(dtrow_list)
duration = numpy.fromiter(
map(lambda x: exposure[x].sum(), dtrow_list),
dtype=float, count=ndata
)
num_integration = numpy.fromiter(
map(len, dtrow_list), dtype=int, count=ndata
)
center_ra = numpy.fromiter(
map(lambda x: ra[x].mean(), dtrow_list),
dtype=float, count=ndata
)
center_dec = numpy.fromiter(
map(lambda x: dec[x].mean(), dtrow_list),
dtype=float, count=ndata
)
delta_ra = numpy.fromiter(
map(lambda x: (ra[x[-1]] - ra[x[0]]) * dec_factor, dtrow_list),
dtype=float, count=ndata
)
delta_dec = numpy.fromiter(
map(lambda x: dec[x[-1]] - dec[x[0]], dtrow_list),
dtype=float, count=ndata
)
height_list = []
for s, e in zip(gap_r[:-1], gap_r[1:]):
start_ra = center_ra[s]
start_dec = center_dec[s]
end_ra = center_ra[e - 1]
end_dec = center_dec[e - 1]
height_list.append(
numpy.hypot((start_ra - end_ra) * dec_factor, start_dec - end_dec)
)
LOG.debug('REFACTOR: ant %s, spw %s, field %s', antid, spwid, fieldid)
LOG.debug('REFACTOR: map_center_dec=%s, dec_factor=%s', map_center_dec, dec_factor)
LOG.debug('REFACTOR: duration=%s', list(duration))
LOG.debug('REFACTOR: num_integration=%s', list(num_integration))
LOG.debug('REFACTOR: delta_ra=%s', list(delta_ra))
LOG.debug('REFACTOR: delta_dec=%s', list(delta_dec))
LOG.debug('REFACTOR: center_ra=%s', list(center_ra))
LOG.debug('REFACTOR: center_dec=%s', list(center_dec))
LOG.debug('REFACTOR: height_list=%s', list(height_list))
LOG.debug('REFACTOR: dtrows')
for rows in dtrow_list:
LOG.debug('REFACTOR: %s', list(rows))
center_ra = numpy.array(center_ra)
center_dec = numpy.array(center_dec)
row_sep_ra = (center_ra[1:] - center_ra[:-1]) * dec_factor
row_sep_dec = center_dec[1:] - center_dec[:-1]
row_separation = numpy.median(numpy.hypot(row_sep_ra, row_sep_dec))
# find complate raster
num_row_int = rasterutil.find_most_frequent(num_integration)
complete_idx = numpy.where(num_integration >= num_row_int)
# raster scan parameters
row_duration = numpy.array(duration)[complete_idx].mean()
assert row_duration > 0
row_delta_ra = numpy.abs(delta_ra)[complete_idx].mean()
row_delta_dec = numpy.abs(delta_dec)[complete_idx].mean()
width = numpy.hypot(row_delta_ra, row_delta_dec)
assert width > 0
sign_ra = +1.0 if delta_ra[complete_idx[0][0]] >= 0 else -1.0
sign_dec = +1.0 if delta_dec[complete_idx[0][0]] >= 0 else -1.0
scan_angle = math.atan2(sign_dec * row_delta_dec, sign_ra * row_delta_ra)
height = numpy.max(height_list)
assert height > 0
center = (cqa.quantity(0.5 * (center_ra.min() + center_ra.max()), radec_unit),
cqa.quantity(0.5 * (center_dec.min() + center_dec.max()), radec_unit))
raster_info = RasterInfo(center[0], center[1],
cqa.quantity(width, radec_unit), cqa.quantity(height, radec_unit),
cqa.quantity(scan_angle, 'rad'), cqa.quantity(row_separation, radec_unit),
cqa.quantity(row_duration, exp_unit))
LOG.info('Raster Information')
LOG.info('- Map Center: [{}, {}]'.format(cqa.angle(raster_info.center_ra, prec=8, form='time')[0],
cqa.angle(raster_info.center_dec, prec=8)[0]))
LOG.info('- Scan Extent: [{}, {}] (scan direction: {})'.format(cqa.tos(raster_info.width),
cqa.tos(raster_info.height),
cqa.tos(raster_info.scan_angle)))
LOG.info('- Raster row separation = {}'.format(cqa.tos(raster_info.row_separation)))
LOG.info('- Raster row scan duration = {}'.format(cqa.tos(cqa.convert(raster_info.row_duration, 's'))))
return raster_info
def calc_image_statistics(imagename: str, chans: str, region: str) -> dict:
"""Return image statistics with channel and region selection.
Args:
imagename : Path to image to calculate statistics
chans : Channel range selection string, e.g., '0~110;240~300'
region : Region definition string.
Returns:
A dictionary of statistic values returned by ia.statistics.
"""
LOG.info("Calculateing image statistics of chans='{}', region='{}' in {}".format(chans, region, imagename))
rg = casa_tools.regionmanager
with casa_tools.ImageReader(imagename) as ia:
cs = ia.coordsys()
try:
chan_sel = rg.frombcs(csys=cs.torecord(), shape=ia.shape(), chans=chans)
finally:
cs.done()
rg.done()
subim = ia.subimage(region=chan_sel)
try:
stat = subim.statistics(region=region)
finally:
subim.close()
return stat
# Utility methods to calcluate channel ranges
def convert_frequency_ranges_to_channels(range_list: list[tuple[float, float]],
cs: coordsys, num_chan: int) -> list[tuple[int, int]]:
"""Convert frequency ranges to channel ones.
Args:
range_list : A list of min/max frequency ranges,
e.g., [[fmin0,fmax0],[fmin1, fmax1],...]
cs : A coordinate system to convert world values to pixel one
num_chan : The number of channels in frequency axis
Returns:
A list of min/max channels, e.g., [[imin0, imax0],[imin1,imax1],...]
"""
faxis = cs.findaxisbyname('spectral')
ref_world = cs.referencevalue()['numeric']
LOG.info("Aggregated spectral line frequency ranges of combined image = {}".format(str(range_list)))
channel_ranges = [] # should be list for sort
for segment in range_list:
ref_world[faxis] = segment[0]
start_chan = cs.topixel(ref_world)['numeric'][faxis]
ref_world[faxis] = segment[1]
end_chan = cs.topixel(ref_world)['numeric'][faxis]
# handling of LSB
min_chan = min(start_chan, end_chan)
max_chan = max(start_chan, end_chan)
# LOG.info("#####Freq to Chan: [{:f}, {:f}] -> [{:f}, {:f}]".format(segment[0], segment[1], min_chan, max_chan))
if max_chan < -0.5 or min_chan > num_chan - 0.5: # out of range
# LOG.info("#####Omitting channel range [{:f}, {:f}]".format(min_chan, max_chan))
continue
channel_ranges.append([max(int(min_chan), 0),
min(int(max_chan), num_chan - 1)])
channel_ranges.sort()
return merge_ranges(channel_ranges)
def convert_range_list_to_string(range_list: list[int]) -> str:
"""Convert a list of index ranges to string.
Args:
range_list : A list of ranges, e.g., [imin0, imax0, imin1, imax1, ...]
Returns:
A string in form, e.g., 'imin0~imax0;imin1~imax1'
Examples:
>>> convert_range_list_to_string( [5, 10, 15, 20] )
'5~10;15~20'
"""
stat_chans = str(';').join(['{:d}~{:d}'.format(range_list[iseg], range_list[iseg + 1])
for iseg in range(0, len(range_list), 2)])
return stat_chans
def convert_range_list_to_ranges(range_list: list[int]) -> list[list[int]]:
"""
Convert a list of index ranges to list of signle ranges
Args:
range_list : A list of ranges, e.g., [imin0, imax0, imin1, imax1, ...]
Returns:
A list of single ranges, e.g. '[ [imin0, imax0], [imin1, imax1], ...]
Example:
>>> convert_range_list_to_string( [5, 10, 15, 20] )
'[[5, 10], [15, 20]]'
"""
ranges = [ [range_list[i], range_list[i+1]] for i in range(0, len(range_list), 2) ]
return ranges
def merge_ranges(range_list: list[tuple[Number, Number]]) -> list[tuple[Number, Number]]:
"""Merge overlapping ranges in range_list.
Args:
range_list : A list of ranges to merge, e.g., [ [min0,max0], [min1,max1], .... ]
Each range in the list should be in ascending order (min0 <= max0)
There is no assumption in the order of ranges, e.g., min0 w.r.t min1
Raises:
ValueError: too few elements in the range description, such as [] or [min0]
Returns:
A list of merged ranges
e.g., [[min_merged0,max_marged0], [min_merged1,max_merged1], ....]
"""
# LOG.info("#####Merge ranges: {}".format(str(range_list)))
num_range = len(range_list)
if num_range == 0:
return []
merged = [range_list[0][0:2]]
for i in range(1, num_range):
segment = range_list[i]
if len(segment) < 2:
raise ValueError("segments in range list must have 2 elements")
overlap = -1
for j in range(len(merged)):
if segment[1] < merged[j][0] or segment[0] > merged[j][1]: # no overlap
continue
else:
overlap = j
break
if overlap < 0:
merged.append(segment[0:2])
else:
merged[j][0] = min(merged[j][0], segment[0])
merged[j][1] = max(merged[j][1], segment[1])
# Check if further merge is necessary
while len(merged) < num_range:
num_range = len(merged)
merged = merge_ranges(merged)
# LOG.info("#####Merged: {}".format(str(merged)))
return merged
def invert_ranges(id_range_list: list[tuple[int, int]],
num_ids: int, edge: tuple[int, int]) -> list[int]:
"""Return inverted ID ranges.
Args:
id_range_list : A list of min/max ID ranges to invert. The list should
be sorted in the ascending order of min IDs.
num_ids : Number of IDs to consider
edge : The left and right edges to exclude
Returns:
A 1-d list of inverted ranges
Examples:
>>> id_range_list = [[5,10],[15,20]]
>>> num_ids = 30
>>> edge = (2,3)
>>> invert_ranges(id_range_list, num_ids, edge)
[2, 4, 11, 14, 21, 26]
"""
inverted_list = []
if len(id_range_list) == 0:
inverted_list = [edge[0], num_ids - 1 - edge[1]]
else:
if id_range_list[0][0] > edge[0]:
inverted_list.extend([edge[0], id_range_list[0][0] - 1])
for j in range(len(id_range_list) - 1):
start_include = id_range_list[j][1] + 1
end_include = id_range_list[j + 1][0] - 1
if start_include <= end_include:
inverted_list.extend([start_include, end_include])
if id_range_list[-1][1] + 1 < num_ids - 1 - edge[1]:
inverted_list.extend([id_range_list[-1][1] + 1, num_ids - 1 - edge[1]])
return inverted_list
def detect_edge_channels(imagename: str) -> tuple[int, int]:
"""Detect list of edge channels to be excluded from QA evaluation.
There are a few edge channels that have less valid spatial pixels
than other spectral channels, probably due to the effect of frame
conversion from TOPO to LSRK with progressively varying time stamp.
Raster OTF scan without frequency tracking can cause this effect.
Other possible reason is edge channel flagging by hsd_flagdata
and/or hsd_tsysflag.
This function detects such edge channels by calculating the median
number of valid spatial pixels in each channel. Consecutive
channels with less valid spatial pixels than the median are
regarded as edge channels.
Args:
imagename: Name of the image
Returns:
Number of edge channels excluded from lower and upper
side of the spectral axis
"""
with casa_tools.ImageReader(imagename) as ia:
mask = ia.getchunk(getmask=True)
num_valid_pixels = numpy.sum(mask, axis=(0, 1, 2))
nchan = len(num_valid_pixels)
# PIPE-1727 threshold for edge channels should be strict,
# no tolerance using stddev nor MAD
median_num_valid_pixels = numpy.median(num_valid_pixels)
LOG.debug(
"typical number of valid pixels %s",
median_num_valid_pixels
)
threshold = median_num_valid_pixels
edge_count_lower = 0
for i in range(nchan):
if num_valid_pixels[i] < threshold:
edge_count_lower += 1
else:
break
edge_count_upper = 0
for i in range(nchan - 1, -1, -1):
if num_valid_pixels[i] < threshold:
edge_count_upper += 1
else:
break
return edge_count_lower, edge_count_upper
def detect_atm_channels(ms: MeasurementSet, spw: int) -> numpy.ndarray | None:
if ms.antenna_array.name == 'NRO':
return None
spwlist = [spw]
spwsetup = sdatm.getSpecSetup(ms.basename, spwlist=spwlist)
tau = sdatm.getCalAtmData(ms.basename, spwlist, spwsetup)[-2]
skylines = sdatm.getskylines(tau[spw], spw, spwsetup, fraclevel=0.3, minpeaklevel=0.05)
atm_masks = sdatm.skysel(skylines, linestouse='all')
return atm_masks