import os
import numpy
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.h.tasks.common import calibrationtableaccess as caltableaccess
from pipeline.hifa.heuristics import snr as snr_heuristics
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry
LOG = infrastructure.get_logger(__name__)
class BpSolintInputs(vdp.StandardInputs):
@vdp.VisDependentProperty
def field(self):
# Return field names in the current ms that have been
# observed with the desired intent
fields = self.ms.get_fields(intent=self.intent)
fieldids = set(sorted([f.id for f in fields]))
fieldnames = []
for fieldid in fieldids:
field = self.ms.get_fields(field_id=fieldid)
fieldnames.append(field[0].name)
field_names = set(fieldnames)
return ','.join(field_names)
intent = vdp.VisDependentProperty(default='BANDPASS')
@vdp.VisDependentProperty
def spw(self):
# Get the science spw ids
sci_spws = {spw.id for spw in self.ms.get_spectral_windows(science_windows_only=True)}
# Get the bandpass spw ids
bandpass_spws = []
for scan in self.ms.get_scans(scan_intent=self.intent):
bandpass_spws.extend(spw.id for spw in scan.spws)
bandpass_spws = set(bandpass_spws).intersection(sci_spws)
# Get science target spw ids
target_spws = []
for scan in self.ms.get_scans(scan_intent='TARGET'):
target_spws.extend([spw.id for spw in scan.spws])
target_spws = set(target_spws).intersection(sci_spws)
# Compute the intersection of the bandpass and science target spw
# ids
spws = list(bandpass_spws.intersection(target_spws))
spws = [str(spw) for spw in sorted(spws)]
return ','.join(spws)
phaseupsnr = vdp.VisDependentProperty(default=20.0)
minphaseupints = vdp.VisDependentProperty(default=2)
evenbpints = vdp.VisDependentProperty(default=False)
bpsnr = vdp.VisDependentProperty(default=50.0)
minbpsnr = vdp.VisDependentProperty(default=20.0)
minbpnchan = vdp.VisDependentProperty(default=8)
hm_nantennas = vdp.VisDependentProperty(default='unflagged')
maxfracflagged = vdp.VisDependentProperty(default=0.90)
# docstring and type hints: supplements hifa_bpsolint
def __init__(self, context, output_dir=None, vis=None, field=None,
intent=None, spw=None, phaseupsnr=None, minphaseupints=None,
evenbpints=None, bpsnr=None, minbpsnr=None, minbpnchan=None, hm_nantennas=None, maxfracflagged=None):
"""Initialize Inputs.
Args:
context: Pipeline context object containing state information.
output_dir: Output directory.
Defaults to None, which corresponds to the current working directory.
vis: The list of input MeasurementSets. Defaults to the list of
MeasurementSets specified in the pipeline context.
Example: ``vis=['M82A.ms', 'M82B.ms']``
field: The list of field names of sources to be used for
signal-to-noise estimation. Defaults to all fields with the
standard intent.
Example: ``field='3C279'``
intent: A string containing a comma delimited list of intents against
which the selected fields are matched. Defaults to
'BANDPASS'.
Example: ``intent='PHASE'``
spw: The list of spectral windows and channels for which gain
solutions are computed. Defaults to all the science spectral
windows for which there are both 'intent' and TARGET intents.
Example: ``spw='13,15'``
phaseupsnr: The required phase-up gain time interval solution
signal-to-noise.
Example: ``phaseupsnr=10.0``
minphaseupints: The minimum number of time intervals in the phase-up gain
solution.
Example: ``minphaseupints=4``
evenbpints: Use a bandpass frequency solint that is an integer divisor of
the spw bandwidth, to prevent the occurrence of one narrower
fractional frequency interval.
bpsnr: The required bandpass frequency interval solution
signal-to-noise.
Example: ``bpsnr=30.0``
minbpsnr: The minimum required bandpass frequency interval solution
signal-to-noise when strong atmospheric lines exist in Tsys
spectra.
Example: ``minbpsnr=10.0``
minbpnchan: The minimum number of frequency intervals in the bandpass
solution.
Example: ``minbpnchan=16``
hm_nantennas: The heuristics for determines the number of antennas to use
in the signal-to-noise estimate. The options are ``'all'`` and
``'unflagged'``. The 'unflagged' options is not currently
supported.
Example: ``hm_nantennas='unflagged'``
maxfracflagged: The maximum fraction of an antenna that can be flagged before
it is excluded from the signal-to-noise estimate.
Example: ``maxfracflagged=0.80``
"""
super().__init__()
self.context = context
self.vis = vis
self.output_dir = output_dir
self.field = field
self.intent = intent
self.spw = spw
self.phaseupsnr = phaseupsnr
self.minphaseupints = minphaseupints
self.evenbpints = evenbpints
self.bpsnr = bpsnr
self.minbpsnr = minbpsnr
self.minbpnchan = minbpnchan
self.hm_natennas = hm_nantennas
self.maxfracflagged = maxfracflagged
[docs]
@task_registry.set_equivalent_casa_task('hifa_bpsolint')
@task_registry.set_casa_commands_comment('Compute the best per spw bandpass solution intervals.')
class BpSolint(basetask.StandardTaskTemplate):
Inputs = BpSolintInputs
[docs]
def prepare(self, **parameters):
# Simplify the inputs
inputs = self.inputs
# Turn the CASA field name and spw id lists into Python lists
fieldlist = inputs.field.split(',')
spwlist = [int(spw) for spw in inputs.spw.split(',')]
# Setup BP SNR
bpsnr = inputs.bpsnr if 'bpsnr' not in parameters else parameters['bpsnr']
# Log the data selection choices
LOG.info('Estimating bandpass solution intervals for MS %s' % inputs.ms.basename)
LOG.info(' Setting bandpass intent to %s ' % inputs.intent)
LOG.info(' Selecting bandpass fields %s ' % fieldlist)
LOG.info(' Selecting bandpass spws %s ' % spwlist)
LOG.info(' Setting requested phaseup snr to %0.1f ' % inputs.phaseupsnr)
LOG.info(' Setting requested bandpass snr to %0.1f ' % bpsnr)
if len(fieldlist) <= 0 or len(spwlist) <= 0:
LOG.info(' No bandpass data')
return BpSolintResults(vis=inputs.vis)
# Compute the bandpass solint parameters and return a solution
# dictionary
solint_dict = snr_heuristics.estimate_bpsolint(inputs.ms, fieldlist,
inputs.intent, spwlist, inputs.hm_nantennas,
inputs.maxfracflagged,
inputs.phaseupsnr, inputs.minphaseupints, bpsnr,
inputs.minbpnchan, evenbpsolints=inputs.evenbpints)
if not solint_dict:
LOG.info('No solution interval dictionary')
return BpSolintResults(vis=inputs.vis)
# Check the existence of strong atmospheric line and recalculate
# solution interval with lower bpsnr if necessary
if bpsnr > inputs.minbpsnr and \
self._get_max_solint_channels(solint_dict) >= 2:
# check if Tsys caltable exists in context
caltable = self._get_tsys_caltable(inputs.ms.name)
if caltable is None:
LOG.warning("No Tsys calibration table found for MS %s. Skipping strong atmospheric line check." %
inputs.ms.basename)
elif check_strong_atm_lines(inputs.ms, fieldlist, inputs.intent, spwlist, solint_dict, caltable):
LOG.info("Strong atmospheric line(s) detected in Tsys spectra. Recalculating solution interval with bpsnr = %f" % inputs.minbpsnr)
return self.prepare(bpsnr=inputs.minbpsnr)
# Construct the results object
result = self._get_results(inputs.vis, spwlist, solint_dict)
# Return the results
return result
[docs]
def analyse(self, result):
return result
# Get final results from the spw dictionary
@staticmethod
def _get_results(vis, spwidlist, solint_dict):
# Initialize the lists
phsolints = []
phintsolints = []
nphsolutions = []
phsensitivities = []
phintsnrs = []
exptimes = []
bpsolints = []
bpchansolints = []
nbpsolutions = []
bpsensitivities = []
bpchansnrs = []
bandwidths = []
# Loop over the spws. Values for spws with
# not dictionary entries are set to None
for spwid in spwidlist:
if spwid not in solint_dict:
phsolints.append(None)
phintsolints.append(None)
nphsolutions.append(None)
phsensitivities.append(None)
phintsnrs.append(None)
exptimes.append(None)
bpsolints.append(None)
bpchansolints.append(None)
nbpsolutions.append(None)
bpsensitivities.append(None)
bpchansnrs.append(None)
bandwidths.append(None)
else:
phsolints.append(solint_dict[spwid]['phaseup_solint'])
phintsolints.append(solint_dict[spwid]['nint_phaseup_solint'])
nphsolutions.append(solint_dict[spwid]['nphaseup_solutions'])
phsensitivities.append('%fmJy' % solint_dict[spwid]['sensitivity_per_integration_mJy'])
phintsnrs.append(solint_dict[spwid]['snr_per_integration'])
exptimes.append('%fs' % (60*solint_dict[spwid]['exptime_minutes']))
bpsolints.append(solint_dict[spwid]['bpsolint'])
bpchansolints.append(solint_dict[spwid]['nchan_bpsolint'])
nbpsolutions.append(solint_dict[spwid]['nbandpass_solutions'])
bpsensitivities.append('%fmJy' % solint_dict[spwid]['sensitivity_per_channel_mJy'])
bpchansnrs.append(solint_dict[spwid]['snr_per_channel'])
bandwidths.append('%fHz' % solint_dict[spwid]['bandwidth'])
# Initialize result structure.
result = BpSolintResults(vis=vis, spwids=spwidlist)
# Populate the result.
result.phsolints = phsolints
result.phintsolints = phintsolints
result.nphsolutions = nphsolutions
result.phsensitivities = phsensitivities
result.phintsnrs = phintsnrs
result.exptimes = exptimes
result.bpsolints = bpsolints
result.bpchansolints = bpchansolints
result.nbpsolutions = nbpsolutions
result.bpchansensitivities = bpsensitivities
result.bpchansnrs = bpchansnrs
result.bandwidths = bandwidths
result.low_channel_solutions = solint_dict['low_channel_solutions']
return result
@staticmethod
def _get_max_solint_channels(solint_dict):
"""
The method returns maximum BP solution interval of all SPWs
in solution interval dictionary (solint_dict)
Inputs: solution interval dictionary returned by snr.estimate_bpsolint
"""
# the original code assumed a dict of int spw IDs to data, but the
# introduction of low_channel_solutions in PIPE-1760 breaks that
# assumption so we identify that case and skip it.
return max(
(spw_solint['nchan_bpsolint']
for key, spw_solint in solint_dict.items()
if key != 'low_channel_solutions'),
default=0,
)
def _get_tsys_caltable(self, vis):
caltables = self.inputs.context.callibrary.active.get_caltable(
caltypes='tsys')
# return just the tsys table that matches the vis being handled
result = None
for name in caltables:
# Get the tsys table name
tsystable_vis = caltableaccess.CalibrationTableDataFiller._readvis(name)
if tsystable_vis in vis:
result = name
break
return result
class BpSolintResults(basetask.Results):
def __init__(self, vis=None, spwids=None,
phsolints=None, phintsolints=None, nphsolutions=None,
phsensitivities=None, phintsnrs=None, exptimes=None,
bpsolints=None, bpchansolints=None, nbpsolutions=None,
bpsensitivities=None, bpchansnrs=None, bandwidths=None,
low_channel_solutions=None):
"""
Initialise the results object.
"""
super().__init__()
if spwids is None:
spwids = []
if phsolints is None:
phsolints = []
if phintsolints is None:
phintsolints = []
if nphsolutions is None:
nphsolutions = []
if phsensitivities is None:
phsensitivities = []
if phintsnrs is None:
phintsnrs = []
if exptimes is None:
exptimes = []
if bpsolints is None:
bpsolints = []
if bpchansolints is None:
bpchansolints = []
if nbpsolutions is None:
nbpsolutions = []
if bpsensitivities is None:
bpsensitivities = []
if bpchansnrs is None:
bpchansnrs = []
if bandwidths is None:
bandwidths = []
if low_channel_solutions is None:
low_channel_solutions = []
self.vis = vis
# Spw list
self.spwids = spwids
# Phaseup solutions
self.phsolints = phsolints
self.phintsolints = phintsolints
self.nphsolutions = nphsolutions
self.phsensitivities = phsensitivities
self.phintsnrs = phintsnrs
self.exptimes = exptimes
# Bandpass solutions
self.bpsolints = bpsolints
self.bpchansolints = bpchansolints
self.nbpsolutions = nbpsolutions
self.bpchansensitivities = bpsensitivities
self.bpchansnrs = bpchansnrs
self.bandwidths = bandwidths
# PIPE-1760: processed as a QA score in hifa_bandpass
self.low_channel_solutions = low_channel_solutions
def __repr__(self):
if self.vis is None or not self.spwids:
return ('BpSolintResults:\n'
'\tNo bandpass solution intervals computed')
else:
line = 'BpSolintResults:\nvis %s\n' % (self.vis)
line = line + 'Phaseup solution time intervals\n'
for i in range(len(self.spwids)):
line = line + \
" spwid %2d solint '%s' intsolint %2d sensitivity %s intsnr %0.1f\n" % \
(self.spwids[i], self.phsolints[i], self.phintsolints[i],
self.phsensitivities[i], self.phintsnrs[i])
line = line + 'Bandpass frequency solution intervals\n'
for i in range(len(self.spwids)):
line = line + \
" spwid %2d solint '%s' channels %2d sensitivity %s chansnr %0.1f\n" % \
(self.spwids[i], self.bpsolints[i], self.bpchansolints[i],
self.bpchansensitivities[i], self.bpchansnrs[i])
return line
def check_strong_atm_lines(ms, fieldlist, intent, spwidlist, solint_dict, tsysname, lineStrengthThreshold=0.1,
minAdjacantChannels=3, nSigma=8):
"""
This function tests if existence of strong atmospheric lines in Tsys spectra
(see CAS-11951).
Inputs:
ms: Measurementset object
fieldlist: a list of field names
intents: an intent string
spwidlist: a list of spw IDs
solint_dict: solution interval dictionary returned by snr.estimate_bpsolint
tsysname: the name of Tsys caltable
Conditions of strong atmospheric line:
(a) the peak of atmospheric line components is larger than a threshold, AND
(b) there is at least an atmospheric line with it's width larger than a threshold
Returns: True if any of SpW meets condition of strong atmospheric lines
"""
LOG.info('Check for strong atmospheric line in Tsys spectra of fields, %s, in %s' % \
(str(fieldlist), ms.basename))
# make sure MS and Tsys caltable corresponds
tsystable_vis = caltableaccess.CalibrationTableDataFiller._readvis(tsysname)
if tsystable_vis != ms.name:
raise RuntimeError("Input MS ({}) and Tsys caltable ({}) does not correspond."
"".format(os.path.basename(ms.name), os.path.basename(tsysname)))
tsys_info = snr_heuristics.get_tsysinfo(ms, fieldlist, intent, spwidlist)
strong_atm_lines = False
for spw in spwidlist:
if spw not in tsys_info or spw not in solint_dict:
continue
tsys_spw = solint_dict[spw]['tsys_spw']
LOG.info('Investigating Tsys spectra for spw %d (Tsys spw %d)' % (spw, tsys_spw))
# Obtain median Tsys spectrum
scanobj = ms.get_scans(scan_id = tsys_info[spw]['tsys_scan'])[0]
atmfields = ms.get_fields(intent='ATMOSPHERE')
fieldids = [fobj.id for fobj in scanobj.fields.intersection(frozenset(atmfields))]
median_tsys = get_median_tsys_spectrum_from_caltable(tsysname, tsys_spw, fieldids[0])
if median_tsys is None:
LOG.warning('Unable to define median Tsys spectrum for Tsys spw = %d, scan = %d' %
(tsys_spw, tsys_info[spw]['tsys_scan']))
continue
# Smooth median Tsys spectrum with kernel size, # of Tsys chans /16
kernel_width = len(median_tsys) // 16
kernel = numpy.ones(kernel_width, dtype=float)/float(kernel_width)
LOG.debug("Subtracting smoothed Tsys spectrum (kernel = %d channels)" % kernel_width)
smoothed_tsys = numpy.convolve(median_tsys, kernel, mode='same')
# Define an index range to avoid edge effect of smoothing
idx_offset = kernel_width // 2
idx_range = slice(idx_offset, idx_offset + numpy.abs(len(median_tsys)-kernel_width) + 1, 1)
# Take absolute difference of median Tsys spectum w/ and w/o smoothing
diff_tsys = numpy.abs(median_tsys - smoothed_tsys)[idx_range]
# If peak is smaller than a threshold, no strong line -> continue to the next spw
peak = numpy.max(diff_tsys)
threshold = lineStrengthThreshold * numpy.median(median_tsys[idx_range])
if peak < threshold:
LOG.info('No strong line is found. peak = %f, threshold = %f' % (peak, threshold))
continue
LOG.info('Strong line(s) are found. peak = %f, threshold = %f' % (peak, threshold))
# Scaled MAD of diff_tsys
scaled_mad = 1.4826*numpy.median(numpy.abs(diff_tsys - numpy.median(diff_tsys)))
LOG.debug('*** Scaled MAD of diff_tsys = %f' % scaled_mad)
# Check amplitude and width of diff_tsys (presumably atm lines).
# If both exceeds thresholds -> strong line
LOG.info('Examining line intensities and widths ' + \
'(thresholds: intensity = %f, width = %d channels)' % \
(nSigma * scaled_mad, minAdjacantChannels))
diff_tsys.mask = (diff_tsys.data < nSigma * scaled_mad)
line_slices = numpy.ma.notmasked_contiguous(diff_tsys)
if line_slices is None:
# notmasked_contiguous returns None when all array is masked.
# this happens when diff_tsys does not exceed nSigma threshold.
LOG.info('No line exceeds intensity threshold.')
continue
for line in line_slices:
LOG.debug('*** line channels: start = %d, width=%d' % \
(line.start+idx_offset, line.stop-line.start))
# check line width
if line.stop-line.start < minAdjacantChannels:
continue
else:
# strong atmospheric line
strong_atm_lines = True
LOG.info('*** Found a spectral line with >= %d channels' % minAdjacantChannels)
return strong_atm_lines
LOG.info('*** All lines are < %d channels' % minAdjacantChannels)
return strong_atm_lines
def get_median_tsys_spectrum_from_caltable(tsysname, spwid, fieldid, interpolate_flagged=True):
"""
Returns masked median Tsys spectrum of an SPW and scan combination in Tsys caltable.
Inputs:
tsysname: the path to Tsys caltable
spwid: SPW ID of Tsys to select
fieldid: field ID of Tsys to select
interpolate_flag: if True, operate piecewise linear interpolation of flagged channels
Returns: masked array of median Tsys spectrum
"""
if not os.path.exists(tsysname):
raise ValueError('Could not find Tsys caltable, %s' % tsysname)
with casa_tools.TableReader(tsysname) as tb:
seltb = tb.query('SPECTRAL_WINDOW_ID == %s && FIELD_ID == %s' % (spwid, fieldid))
if seltb.nrows() == 0:
seltb.close()
LOG.warning('No matching Tsys measurement for SPW=%s and field=%s in Tsys caltable %s' %
(spwid, fieldid, tsysname))
return None
try: # axis order: [POL, FREQ, ROW]
tsys = seltb.getcol('FPARAM')
flag = seltb.getcol('FLAG')
finally:
seltb.close()
ma_median = numpy.median(numpy.ma.masked_array(tsys, flag), axis=[0, 2])
if ma_median.count() == 0: # No valid channel
return None
if interpolate_flagged:
ma_median.data[ma_median.mask] = numpy.interp(numpy.where(ma_median.mask==True)[0],
numpy.where(ma_median.mask==False)[0],
ma_median[~ma_median.mask])
# clear-up mask
ma_median.mask = False
return ma_median