import enum
import functools
import itertools
import math
import os
import numpy
import pipeline.domain.measures as measures
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.sessionutils as sessionutils
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.hif.tasks import gaincal
from pipeline.hif.tasks.bandpass import bandpassmode, bandpassworker
from pipeline.hif.tasks.bandpass.common import BandpassResults, SolintAdjustment
from pipeline.hifa.heuristics import phasespwmap
from pipeline.hifa.tasks.bpsolint import bpsolint, BpSolintResults
from pipeline.infrastructure import callibrary
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import exceptions
from pipeline.infrastructure import task_registry
from pipeline.infrastructure.pipelineqa import TargetDataSelection
from pipeline.infrastructure.utils.math import round_up
LOG = infrastructure.get_logger(__name__)
__all__ = [
'ALMAPhcorBandpassInputs',
'SerialALMAPhcorBandpass',
'ALMAPhcorBandpass',
'SessionALMAPhcorBandpass',
'SessionALMAPhcorBandpassInputs'
]
class LowSNRPhaseupSolintOrigin(enum.Enum):
"""
Enumeration of all possible hierarchy values for SolintAdjustment.
These values represent the different scenarios where phase-up solution
intervals may be adjusted during bandpass calibration.
"""
# When no SNR results are available
NO_SNR_RESULT = "bandpass.solint.no_snr_result"
# When SNR result does not contain data for expected spws
SPWS_MISSING_DATA = "bandpass.solint.insufficient_data"
# When insufficient points are present in the solution
INSUFFICIENT_POINTS = "bandpass.solint.insufficient_points"
# When insufficient channels to use smoothing
TOO_FEW_CHANNELS ="bandpass.solint.too_few_channels"
# When solint is less than integration time
LT_INTEGRATION_TIME = "bandpass.solint.lt_integration_time"
# Regular phaseup solint adjustments
UNCOMBINED_GT_PHASEUPMAXSOLINT = "phaseup.solint.uncombined.gt_phaseupmaxsolint"
UNCOMBINED_LE_PHASEUPMAXSOLINT = "phaseup.solint.uncombined.le_phaseupmaxsolint"
# Combined SpW phaseup solint adjustments
COMBINED_MIN_BEST_LE_INTEGRATION_TIME = (
"phaseup.solint.combined.min_best_le_integration_time"
)
COMBINED_LE_INTEGRATION_TIME = "phaseup.solint.combined.le_integration_time"
COMBINED_GT_PHASEUPMAXSOLINT = "phaseup.solint.combined.gt_phaseupmaxsolint"
COMBINED_LT_PHASEUPMAXSOLINT = "phaseup.solint.combined.lt_phaseupmaxsolint"
class ALMAPhcorBandpassInputs(bandpassmode.BandpassModeInputs):
bpnsols = vdp.VisDependentProperty(default=8)
bpsnr = vdp.VisDependentProperty(default=50.0)
minbpsnr = vdp.VisDependentProperty(default=20.0)
evenbpints = vdp.VisDependentProperty(default=True)
# Boolean declaring how to populate the bandpass solution parameter
# "fillgaps" (PIPE-2116, also depends on hm_bandpass modes).
hm_auto_fillgaps = vdp.VisDependentProperty(default=True)
# Bandpass heuristics, options are 'fixed', 'smoothed', and 'snr'
hm_bandpass = vdp.VisDependentProperty(default='snr')
@hm_bandpass.convert
def hm_bandpass(self, value):
allowed = ('fixed', 'smoothed', 'snr')
if value not in allowed:
m = ', '.join(('{!r}'.format(i) for i in allowed))
raise ValueError('Value not in allowed value set ({!s}): {!r}'.format(m, value))
return value
# PIPE-2442: SpW combination heuristic for phase-up in bandpass task.
# Options are 'snr', 'always', and 'never'.
hm_phaseup_combine = vdp.VisDependentProperty(default='snr')
@hm_phaseup_combine.convert
def hm_phaseup_combine(self, value):
allowed = ('snr', 'always', 'never')
if value not in allowed:
m = ', '.join(('{!r}'.format(i) for i in allowed))
raise ValueError('Value not in allowed value set ({!s}): {!r}'.format(m, value))
return value
# Phaseup heuristics, options are '', 'manual' and 'snr'
hm_phaseup = vdp.VisDependentProperty(default='snr')
@hm_phaseup.convert
def hm_phaseup(self, value):
allowed = ('', 'manual', 'snr')
if value not in allowed:
m = ', '.join(('{!r}'.format(i) for i in allowed))
raise ValueError('Value not in allowed value set ({!s}): {!r}'.format(m, value))
return value
maxchannels = vdp.VisDependentProperty(default=240)
phaseupbw = vdp.VisDependentProperty(default='')
phaseupmaxsolint = vdp.VisDependentProperty(default=60.0)
phaseupnsols = vdp.VisDependentProperty(default=2)
phaseupsnr = vdp.VisDependentProperty(default=20.0)
phaseupsolint = vdp.VisDependentProperty(default='int')
solint = vdp.VisDependentProperty(default='inf')
# PIPE-628: new parameter to unregister existing bcals before appending to callibrary
unregister_existing = vdp.VisDependentProperty(default=False)
parallel = sessionutils.parallel_inputs_impl(default=False)
def __init__(self, context, output_dir=None, vis=None, mode='channel', hm_phaseup=None, phaseupbw=None,
phaseupmaxsolint=None, phaseupsolint=None, phaseupsnr=None, phaseupnsols=None, hm_phaseup_combine=None,
hm_bandpass=None, solint=None, maxchannels=None, evenbpints=None, bpsnr=None, minbpsnr=None,
bpnsols=None, unregister_existing=None, hm_auto_fillgaps=None, parallel=None, **parameters):
"""Initialize Inputs.
Args:
context: Pipeline context object containing state information.
output_dir: Output directory.
Defaults to None, which corresponds to the current working directory.
vis: List of input MeasurementSets. Defaults to the list of
MeasurementSets specified in the pipeline context.
Example: ``vis=['ngc5921.ms']``
mode: Type of bandpass solution. Currently only supports the
default value of 'channel' (corresponding to bandtype='B' in
CASA bandpass) to perform a channel-by-channel solution for each
spw.
hm_phaseup: The pre-bandpass solution phaseup gain heuristics. The
options are:
- ``'snr'``: compute solution required to achieve the specified SNR
- ``'manual'``: use manual solution parameters
- ``''``: skip phaseup
Example: ``hm_phaseup='manual'``
phaseupbw: Bandwidth to be used for phaseup. Used when
``hm_phaseup= 'manual'``.
Example:
- ``phaseupbw=''`` to use entire bandpass
- ``phaseupbw='500MHz'`` to use central 500MHz
phaseupmaxsolint: Maximum phase correction solution interval (in
seconds) allowed in very low-SNR cases. Used only when
``hm_phaseup='snr'``.
Example: ``phaseupmaxsolint=60.0``
phaseupsolint: The phase correction solution interval in CASA syntax.
Used when ``hm_phaseup='manual'`` or as a default if
the ``hm_phaseup='snr'`` heuristic computation fails.
Example: ``phaseupsolint='300s'``
phaseupsnr: The required SNR for the phaseup solution. Used to calculate
the phaseup time solint, and only if ``hm_phaseup='snr'``.
Example: ``phaseupsnr=10.0``
phaseupnsols: The minimum number of phaseup gain solutions. Used only
if ``hm_phaseup='snr'``.
Example: ``phaseupnsols=4``
hm_phaseup_combine: The spw combination heuristic for the phase-up
solution. Accepts one of following 3 options:
- ``'snr'``, default: heuristics will use combine='spw' in phase-up
gaincal, when SpWs have SNR < phaseupsnr (default = 20).
- ``'always'``: heuristic will force combine='spw' in the phase-up
gaincal.
- ``'never'``: heuristic will not use spw combination; this was the
default logic for Pipeline release 2024 and prior.
Example: ``hm_phaseup_combine='always'``
hm_bandpass: The bandpass solution heuristics. The options are:
``'snr'``: compute the solution required to achieve the specified SNR
``'smoothed'``: simple 'smoothing' i.e. spectral solint>1chan
``'fixed'``: use the user defined parameters for all spws
Example: ``hm_bandpass='snr'``
solint: Time and channel solution intervals in CASA syntax.
Default is solint='inf', which is used when
``hm_bandpass='fixed'``.
If ``hm_bandpass='snr'``, then the task will attempt
to compute and use an optimal SNR-based solint (and warn
if this solint is not good enough).
If ``hm_bandpass='smoothed'``, the task will override
the spectral solint with bandwidth/maxchannels.
Example: ``solint='int'``
maxchannels: The bandpass solution 'smoothing' factor in channels,
i.e. spectral solint will be set to bandwidth / maxchannels
Set to 0 for no smoothing. Used if
``hm_bandpass='smoothed'``.
Example: ``maxchannels=240``
evenbpints: Force the per spw frequency solint to be evenly divisible
into the spw bandpass if ``hm_bandpass='snr'``.
Example: ``evenbpints=False``
bpsnr: The required SNR for the bandpass solution. Used only if
``hm_bandpass='snr'``.
Example: ``bpsnr=30.0``
minbpsnr: The minimum required SNR for the bandpass solution
when strong atmospheric lines exist in Tsys spectra.
Used only if ``hm_bandpass='snr'``.
Example: ``minbpsnr=10.0``
bpnsols: The minimum number of bandpass solutions. Used only if
``hm_bandpass= 'snr'``.
Example: bpnsols=8
unregister_existing: Unregister all bandpass calibrations from the pipeline
context before registering the new bandpass calibrations
from this task. Defaults to False.
Example: ``unregister_existing=True``
hm_auto_fillgaps: If True, then the ``hm_bandpass`` = 'snr' or 'smoothed'
modes, that solve bandpass per SpW, are performed with
CASA bandpass task parameter 'fillgaps' set to a quarter
of the respective SpW bandwidth (in channels).
If False, then these bandpass solves will use
fillgaps=0.
The ``hm_bandpass='fixed'`` mode is unaffected by
``hm_auto_fillgaps`` and always uses fillgaps=0.
caltable: List of names for the output calibration tables. Defaults
to the standard pipeline naming convention.
Example: ``caltable=['ngc5921.gcal']``
field: The list of field names or field ids for which
bandpasses are computed. Set to field='' by default,
which means the task will select all fields.
Example: ``field='3C279'``, ``field='3C279,M82'``
intent: A string containing a comma delimited list of intents
against which the selected fields are matched. Set to
intent='' by default, which means the task will select
all data with the BANDPASS intent.
Example: ``intent='*PHASE*'``
spw: The list of spectral windows and channels for which
bandpasses are computed. Set to spw='' by default, which
means the task will select all science spectral windows.
Example: ``spw='11,13,15,17'``
antenna: Set of data selection antenna IDs
combine: Data axes to combine for solving. Axes are ``''``, ``'scan'``,
``'spw'``, ``'field'`` or any comma-separated combination.
Example: ``combine='scan,field'``
refant: List of reference antenna names. Defaults to the
value(s) stored in the pipeline context. If undefined in
the pipeline context defaults to the CASA reference
antenna naming scheme.
Example: ``refant='DV06,DV07'``
solnorm: Normalise the bandpass solution; defaults to ``True``.
Example: ``solnorm=False``
minblperant: Minimum number of baselines required per antenna for
each solve. Antennas with fewer baselines are excluded
from solutions.
Example: minblperant=4
minsnr: Solutions below this SNR are rejected in the phaseup and
bandpass solves.
Example: minsnr=3.0
parallel: Process multiple MeasurementSets in parallel using the casampi parallelization framework.
Options: ``'automatic'``, ``'true'``, ``'false'``, ``True``, ``False``
Default: ``None`` (equivalent to ``False``)
"""
super().__init__(context, output_dir=output_dir, vis=vis, mode=mode, **parameters)
self.bpnsols = bpnsols
self.bpsnr = bpsnr
self.minbpsnr = minbpsnr
self.evenbpints = evenbpints
self.hm_auto_fillgaps = hm_auto_fillgaps
self.hm_bandpass = hm_bandpass
self.hm_phaseup_combine = hm_phaseup_combine
self.hm_phaseup = hm_phaseup
self.maxchannels = maxchannels
self.phaseupbw = phaseupbw
self.phaseupmaxsolint = phaseupmaxsolint
self.phaseupnsols = phaseupnsols
self.phaseupsnr = phaseupsnr
self.phaseupsolint = phaseupsolint
self.solint = solint
self.unregister_existing = unregister_existing
self.parallel = parallel
class SerialALMAPhcorBandpass(bandpassworker.BandpassWorker):
Inputs = ALMAPhcorBandpassInputs
def prepare(self, **parameters):
inputs = self.inputs
solint_adjustments: list[SolintAdjustment] = []
if inputs.unregister_existing:
# Unregister old bandpass calibrations to stop them from being preapplied
# when calculating phase-ups, etc.
# predicate function that triggers when a bandpass caltable is detected
def bandpass_matcher(_: callibrary.CalToArgs, calfrom: callibrary.CalFrom) -> bool:
return 'bandpass' in calfrom.caltype
LOG.info('Temporarily unregistering all previous bandpass calibrations while task executes')
inputs.context.callibrary.unregister_calibrations(bandpass_matcher)
# Call the SNR estimater if either
# hm_phaseup='snr'
# or
# hm_bandpass='snr'
if inputs.hm_phaseup == 'snr' or inputs.hm_bandpass == 'snr':
snr_result = self._compute_bpsolints()
else:
snr_result = None
# If requested, execute a phaseup job. This will add the resulting
# caltable to the on-the-fly calibration context, so we don't need any
# subsequent gaintable manipulation
if inputs.hm_phaseup != '':
# If requested and available, use the SNR results to determine the
# optimal values for combine and solution interval.
if inputs.hm_phaseup == 'snr' and snr_result.spwids:
phaseup_solint, phaseup_combine, phaseup_snr_expected, adjustments = self._get_best_phaseup_solint(snr_result)
solint_adjustments.extend(adjustments)
# Otherwise, skip determination of optimal values, and stick with
# default values, i.e. use the input phase solint, and use the
# default of no spw combination unless explicitly forced by user.
else:
# Log warning if optimal determination was requested but not
# possible due to missing SNR results.
if inputs.hm_phaseup == 'snr':
LOG.warning('SNR based phaseup solint estimates are unavailable for MS %s' % inputs.ms.basename)
phaseup_solint = inputs.phaseupsolint
phaseup_combine = 'spw' if inputs.hm_phaseup_combine == 'always' else ''
phaseup_snr_expected = None
# Report choice for solint and combine (when applicable), and
# compute temporary spw-to-spw phase offset caltable if necessary.
if phaseup_combine == '':
LOG.info("Using phaseup solint of '%s' in MS %s" % (phaseup_solint, inputs.ms.basename))
phasediff_result = None
else:
LOG.info("Using combine='spw' and phaseup solint of '%s' in MS %s" % (phaseup_solint, inputs.ms.basename))
# Compute the spw-to-spw phase offsets cal table and accept into
# local context.
LOG.info(f'{inputs.ms.basename}: since phaseup will use spw combination, first computing temporary'
f' spw-to-spw phase offsets table to use in pre-apply.')
phasediff_result = self._do_phaseup(solint='inf')
# Run the phase-up task.
phaseup_result = self._do_phaseup(combine=phaseup_combine, solint=phaseup_solint)
# Now perform the bandpass
if inputs.hm_bandpass == 'snr':
if len(snr_result.spwids) <= 0:
LOG.warning('SNR based bandpass solint estimates are unavailable for MS %s' % inputs.ms.basename)
else:
LOG.info('Using SNR based solint estimates')
result = self._do_snr_bandpass(snr_result)
elif inputs.hm_bandpass == 'smoothed':
LOG.info('Using simple bandpass smoothing solint estimates')
result = self._do_smoothed_bandpass()
else:
LOG.info('Using fixed solint estimates')
result = self._do_bandpass()
# Attach the preparatory results to the final result so we have a
# complete log of all the executed tasks.
if inputs.hm_phaseup != '':
if phasediff_result:
result.preceding.append(phasediff_result.final)
result.preceding.append(phaseup_result.final)
# PIPE-2442: add phase-up SNR to the final result for QA scoring.
result.phaseup_snr_expected = phaseup_snr_expected
# PIPE-1624: Store bandpass phaseup caltable table name so it
# can be saved into the context. Do not use the version in
# preceding.append (above), as it is labeled "deprecated"
for cal in phaseup_result.final:
result.phaseup_caltable_for_phase_rms.append(cal.gaintable)
# PIPE-628: set whether we should unregister old bandpass calibrators
# on results acceptance
result.unregister_existing = inputs.unregister_existing
# PIPE-1760: pull bpsolint messages etc. up into result for optional printing as a QA warning
result.solint_adjustments.extend(solint_adjustments)
if snr_result:
result.low_channel_solutions = snr_result.low_channel_solutions
return result
# Compute the solints required to match the SNR
def _compute_bpsolints(self) -> BpSolintResults:
inputs = self.inputs
# Note currently the phaseup bandwidth is not supported
bpsolint_inputs = bpsolint.BpSolint.Inputs(inputs.context,
vis = inputs.vis,
field = inputs.field,
intent = inputs.intent,
spw = inputs.spw,
phaseupsnr = inputs.phaseupsnr,
minphaseupints = inputs.phaseupnsols,
evenbpints = inputs.evenbpints,
bpsnr = inputs.bpsnr,
minbpsnr = inputs.minbpsnr,
minbpnchan = inputs.bpnsols
)
bpsolint_task = bpsolint.BpSolint(bpsolint_inputs)
return self._executor.execute(bpsolint_task)
# Returns the best solint, spw mapping, expected SNR, and solint
# adjustments for the bandpass phase-up solution that will be pre-applied
# during 'bandpass'.
def _get_best_phaseup_solint(
self,
snr_result: bpsolint.BpSolintResults
) -> tuple[str, str, float | None, list[SolintAdjustment]]:
"""
Use SNR results (incl. optimal bandpass solution intervals) to select
the best solution interval and spw combination setting to use in the
phase-up, and the expected phase-up SNR.
Args:
snr_result: dictionary with SNR results for each SpW.
Returns:
4-tuple containing:
- phase-up solution interval
- phase-up combine setting
- expected phase-up SNR
- list of solint adjustments
"""
inputs = self.inputs
quanta = casa_tools.quanta
# holds adjustments made to solint. For hifa_bandpassflag these become QA scores
adjustments: list[SolintAdjustment] = []
# If the SNR results are empty, then no optimal solint can be
# determined. In this case, log a warning, and return early with the
# default input phaseup solint and phaseup combine.
if not snr_result.spwids:
LOG.info(f"{inputs.ms.basename}: no SNR results available, therefore unable to determine optimal phaseup"
f" solint. Reverting to phaseup solint default {inputs.phaseupsolint}.")
phase_combine = 'spw' if inputs.hm_phaseup_combine == 'always' else ''
adjustments.append(
SolintAdjustment(
applies_to=TargetDataSelection(vis={inputs.ms.basename}),
original="",
adjusted=inputs.phaseupsolint,
threshold="",
origin=LowSNRPhaseupSolintOrigin.NO_SNR_RESULT.value,
reason=f"SNR results expected but unavailable",
)
)
return inputs.phaseupsolint, phase_combine, None, adjustments
# PIPE-2442: select which SpWs to consider in determination of best
# solint, and retrieve the corresponding indices of those SpWs in the
# SNR result structure.
if inputs.ms.is_band_to_band:
# For Band-to-Band datasets, only refine solint based on the TARGET
# (high frequency) spectral windows that are expected to have the
# lowest SNRs.
LOG.info(
'%s is a Band-to-Band dataset: selecting high-frequency SpWs for solint refinement.', inputs.ms.basename
)
spwindex = [
snr_result.spwids.index(s.id)
for s in inputs.ms.get_spectral_windows(inputs.spw, intent='TARGET')
]
else:
# For all other datasets, restrict the refinement to the SpWs of a
# single SpectralSpec. Start with retrieving mapping of SpectralSpec
# to science spectral windows.
spspec_to_spwid = utils.get_spectralspec_to_spwid_map(inputs.ms.get_spectral_windows(inputs.spw))
# If there is only 1 SpectralSpec, then use that one.
if len(spspec_to_spwid) == 1:
spwindex = [snr_result.spwids.index(s) for s in next(iter(spspec_to_spwid.values()))]
# Otherwise, with 2+ SpectralSpec, select which SpectralSpec to use.
# PIPE-2442: in this case, it is assumed that the representative
# SpectralSpec to use is the one that has the lowest value of
# maximum SNR for its SpWs.
else:
# Identify the maximum SNR for the SpWs in each SpectralSpec.
max_snr_spspec = []
for spwids in spspec_to_spwid.values():
# Get the indices of these SpWs into the SNR result.
spwi = [snr_result.spwids.index(s) for s in spwids]
# Identify SNRs for all SpWs in this SpectralSpec. It is
# possible for "phintsnrs" in the SNR result to contain None
# and those are immediately rejected.
snrs = [snr_result.phintsnrs[i] for i in spwi if snr_result.phintsnrs[i] is not None]
# If at least one valid SNR was found, then compute the
# maximum SNR, and store the outcome for this SpectralSpec
# as the corresponding spw index and max SNR. If no valid
# SNRs were found (i.e. all were None), then this
# SpectralSpec and its SpWs are rejected from consideration.
if snrs:
max_snr_spspec.append((spwi, max(snrs)))
# If a maximum SNR was found for at least one SpectralSpec, then
# proceed to identify the SpectralSpec with the lowest max SNR,
# and use its corresponding spw index.
if max_snr_spspec:
spwindex = min(max_snr_spspec, key=lambda x: x[1])[0]
# Otherwise, in the unlikely scenario that no valid SNRs were
# found for any SpW of any SpectralSpec, proceed with an empty
# spw index.
else:
spwindex = []
# If there are no valid SpWs, or the SNR results are missing optimal
# solint values for all selected SpWs, then no optimal solint can be
# determined. In this case, log a warning, and return early with the
# default input phaseup solint and phaseup combine.
if not spwindex or not any(snr_result.phsolints[i] for i in spwindex):
LOG.info(
f"{inputs.ms.basename}: no SNR results available for any of the expected SpWs, therefore unable"
f" to determine optimal phaseup solint. Reverting to phaseup solint default {inputs.phaseupsolint}."
)
phase_combine = 'spw' if inputs.hm_phaseup_combine == 'always' else ''
adjustments.append(
SolintAdjustment(
applies_to=TargetDataSelection(vis={inputs.ms.basename}),
original="",
adjusted=inputs.phaseupsolint,
threshold="",
origin=LowSNRPhaseupSolintOrigin.SPWS_MISSING_DATA.value,
reason=f"SNR results not available for any of the expected spws",
)
)
return inputs.phaseupsolint, phase_combine, None, adjustments
# Since SNR results are not empty, then use the first available SpW to
# determine the scan integration time as a timedelta object. It is
# assumed that all scans for all SpWs are taken with the exact same
# integration interval.
scans = inputs.ms.get_scans(scan_intent=inputs.intent)
mean_intervals = {scan.mean_interval(snr_result.spwids[0])
for scan in scans if snr_result.spwids[0] in [spw.id for spw in scan.spws]}
timedelta = mean_intervals.pop()
timedelta_integration_time = timedelta.total_seconds()
# int time is needed to calculate QA score and does not change from here on in
adjustment_cls = functools.partial(SolintAdjustment, integration_time=timedelta_integration_time)
# Determine best solution interval based on SNR results for each SpW
# under consideration.
spwids = []
phintsnrs = []
bestsolint = []
for i in spwindex:
# No solution available for this SpW.
if not snr_result.phsolints[i]:
# This per-spw warning is distinct from the early return on
# L585, which is a global warning for all spws
LOG.warning('No phaseup solint estimate for spw %s in MS %s' %
(snr_result.spwids[i], inputs.ms.basename))
continue
# Otherwise, keep this SpW under consideration and store its SNR
# estimate.
spwids.append(snr_result.spwids[i])
phintsnrs.append(snr_result.phintsnrs[i])
# If the number of phase solutions in the SNR result was below the
# minimum number of phaseup gain solution points:
if snr_result.nphsolutions[i] < inputs.phaseupnsols:
# The solint in the SNR result is not usable, and instead the
# best solint for this SpW is set to achieve the minimum number
# of phase solutions (dividing exposure time by phaseupnsols).
# Note: exposure time in SNR result is a string including unit,
# so using quanta here to get the value as a float.
newsolint = quanta.quantity(snr_result.exptimes[i])['value'] / inputs.phaseupnsols
bestsolint.append(newsolint)
# Depending on whether spw combination is an option, either log
# a message or solint adjustment (=QA warning) that there were
# too few phaseup solution points.
msg = (
f"{inputs.ms.basename}: phaseup solution for spw {snr_result.spwids[i]} has only"
f" {snr_result.nphsolutions[i]} points; reducing estimated phaseup solint from"
f" {snr_result.phsolints[i]:0.3f}s to {newsolint:0.3f}s."
)
if inputs.hm_phaseup_combine == "never":
adjustments.append(
adjustment_cls(
applies_to=TargetDataSelection(vis={inputs.ms.basename}, spw={snr_result.spwids[i]}),
original=snr_result.phsolints[i],
adjusted=newsolint,
threshold=inputs.phaseupnsols,
origin=LowSNRPhaseupSolintOrigin.INSUFFICIENT_POINTS.value,
reason=f"Insufficient phase-up solution points for spw {snr_result.spwids[i]}, adjusting solint to {newsolint:0.3f}",
)
)
else:
msg += f" However, the option to combine spw will trigger a recalculation of solint."
LOG.info(msg)
# Otherwise, adopt for this SpW the solint from the SNR result.
else:
# Using quanta here to cast to same type as above.
bestsolint.append(quanta.quantity(snr_result.phsolints[i])['value'])
max_bestsolint = max(bestsolint)
min_bestsolint = min(bestsolint)
spw_of_max_bestsolint = spwids[bestsolint.index(max_bestsolint)]
# If the best solution interval for all evaluated SpWs are all
# smaller/equal to the integration time, then the required SNR can be
# reached by setting solint to "int". No SpW combination is necessary
# but may have been explicitly forced by user input. Return early with
# this outcome:
if max_bestsolint <= timedelta_integration_time:
LOG.info(f"{inputs.ms.basename}: the largest optimal solint, {max_bestsolint:.3f}s, is smaller than"
f" the integration time, {timedelta_integration_time:.3f}s, setting solint to 'int'.")
if inputs.hm_phaseup_combine == 'always':
phase_combine = 'spw'
snr_expected = numpy.linalg.norm(phintsnrs)
else:
phase_combine = ''
snr_expected = min(phintsnrs)
adjustments.append(
adjustment_cls(
applies_to=TargetDataSelection(vis={inputs.ms.basename}, spw={spw_of_max_bestsolint}),
original=max_bestsolint,
adjusted="int",
threshold=timedelta_integration_time,
origin=LowSNRPhaseupSolintOrigin.LT_INTEGRATION_TIME.value,
reason=f"Largest optimal solint in spw {spw_of_max_bestsolint} still less than integration time {timedelta_integration_time:.3f}, adjusting solint to 'int'",
)
)
return 'int', phase_combine, snr_expected, adjustments
# Otherwise, the maximum best solution interval was above the
# integration time, and it may be possible to use SpW combination to
# improve the SNR.
LOG.info(f"{inputs.ms.basename}: the largest optimal solint, {max_bestsolint:.3f}s, is larger than"
f" the integration time, {timedelta_integration_time:.3f}s.")
# these variables are used to annotate the SolintAdjustment with the
# appropriate reasons and threshold information.
original_solint: float
decision_threshold: float
metric_identifier: str
reason: str
# this will be mutated as required by the metric
applies_to = TargetDataSelection(vis={inputs.ms.basename})
# If the task input explicitly disabled the option of combining SpWs,
# then stick to the solint selection heuristic from PL2024 and earlier,
# to pick the largest optimal solint that was determined for the SpWs
# but capped to maximum set by inputs.phaseupmaxsolint.
if inputs.hm_phaseup_combine == 'never':
# Combine is explicitly disabled for phase-up.
LOG.info(f"{inputs.ms.basename}: SpW combination for phase-up is explicitly set to 'never' in task inputs"
f" and will therefore not be considered as an option for SNR improvement.")
phaseup_combine = ''
# If the largest optimal solint is greater than the maximum allowed
# value:
if max_bestsolint > inputs.phaseupmaxsolint:
# Set the solution interval to the maximum allowed value, but
# slightly adjust the value to round it to the nearest integer
# multiple of the integration time. Use quanta to turn into a
# string with unit, and restrict to 3 significant digits.
finalfactor = round_up(inputs.phaseupmaxsolint / timedelta_integration_time)
finalsolint = quanta.tos(quanta.quantity(timedelta_integration_time * finalfactor, 's'), 3)
snr_expected = numpy.sqrt(finalfactor) * min(phintsnrs)
LOG.info(
f"{inputs.ms.basename}: the largest optimal solint, {max(bestsolint):.3f}s, is greater than"
f" {inputs.phaseupmaxsolint}s, adjusting to {finalsolint}."
)
applies_to.spw |= {spw_of_max_bestsolint}
metric_identifier = LowSNRPhaseupSolintOrigin.UNCOMBINED_GT_PHASEUPMAXSOLINT.value
original_solint = max_bestsolint
decision_threshold = inputs.phaseupmaxsolint
reason = f"Largest optimal solint greater than {decision_threshold}s, adjusting to {finalsolint}"
# Identify for which SpWs the derived optimal solint was larger
# than the maximum allowed value, and then warn that those SpWs
# will have a low phaseup SNR.
low_snr_spws = [str(spwid) for bsi, spwid in zip(bestsolint, spwids) if bsi > inputs.phaseupmaxsolint]
if low_snr_spws:
LOG.info(
f"{inputs.ms.basename}: spw(s) {utils.commafy(low_snr_spws, False)} will have a"
f" low phaseup gaincal SNR = {snr_expected}, which is < input phase-up SNR threshold"
f" ({inputs.phaseupsnr})."
)
# Otherwise, the largest derived optimal solint must be above the
# integration time but below the maximum allowed value:
else:
# Set the solution interval to the largest derived optimal
# solint but slightly adjust the value to round it to the
# nearest integer multiple of the integration time. Use quanta
# to turn into a string with unit, and restrict to 3 significant
# digits.
finalfactor = round_up(max_bestsolint / timedelta_integration_time)
finalsolint = quanta.tos(quanta.quantity(timedelta_integration_time * finalfactor, 's'), 3)
snr_expected = numpy.sqrt(finalfactor) * min(phintsnrs)
metric_identifier = LowSNRPhaseupSolintOrigin.UNCOMBINED_LE_PHASEUPMAXSOLINT.value
original_solint = max_bestsolint
decision_threshold = inputs.phaseupmaxsolint
reason = f"Largest optimal solint less than {decision_threshold}s, adjusting to {finalsolint}"
LOG.info(f"{inputs.ms.basename}: setting solint to {finalsolint}.")
# Otherwise, the optimal derived solint for at least one SpW was larger
# than the integration time, and SpW combination is available as an
# option, so continue with the new PL2025 heuristic (PIPE-2442) to find
# an optimal solution interval for SpW combination, based on aggregate
# bandwidth.
else:
LOG.info(f"{inputs.ms.basename}: combining SpWs to improve phaseup SNR.")
phaseup_combine = 'spw'
# If the smallest optimal solint, to achieve the required SNR, was
# smaller than the integration time for at least one SpW, then the
# SNR will only improve with more bandwidth after combining SpWs,
# so "int" will be the minimum optimal solint for combine='spw'.
if min_bestsolint <= timedelta_integration_time:
LOG.info(f"{inputs.ms.basename}: optimal solint based on aggregate bandwidth is 'int'.")
finalsolint = 'int'
snr_expected = numpy.linalg.norm(phintsnrs)
metric_identifier = LowSNRPhaseupSolintOrigin.COMBINED_MIN_BEST_LE_INTEGRATION_TIME.value
original_solint = min_bestsolint
decision_threshold = timedelta_integration_time
reason = f"Optimal solint based on aggregate bandwidth is {finalsolint}"
# Otherwise, a new optimal solint based on the aggregate bandwidth
# through SpW combination needs to be computed here.
else:
LOG.info(f"{inputs.ms.basename}: computing optimal solint based on aggregate bandwidth.")
# Identify SpW with smallest optimal solint.
spwid_min_solint = spwids[bestsolint.index(min_bestsolint)]
# Determine the aggregate bandwidth for all SpWs originally
# under consideration, including any for which there may not
# have been a valid entry in SNR result, hence use origina
# spwindex instead of spwids.
sum_bw = sum(inputs.ms.get_spectral_window(snr_result.spwids[i]).bandwidth.value for i in spwindex)
# Set bandwidth scaling factor to bandwidth of SpW with the
# smallest optimal solint divided by the aggregate bandwidth of
# the SpWs under consideration.
bwfactor = float(inputs.ms.get_spectral_window(spwid_min_solint).bandwidth.value / sum_bw)
# Set the solint for the SpW combination gaincal by scaling the
# smallest optimal solint with the bandwidth scaling factor,
# and ensuring this is rounded up (ceil) to next nearest integer
# multiple of the integration time.
combfactor = math.ceil(min_bestsolint * bwfactor / timedelta_integration_time)
combinesolint = timedelta_integration_time * combfactor
# Check how the newly determined optimal solint for combining
# SpWs compares to the integration time and the maximum allowed
# solint.
#
# If the optimal combined solint is smaller/equal to the
# integration time, then set the solint to 'int'.
if combinesolint <= timedelta_integration_time:
finalsolint = 'int'
snr_expected = numpy.linalg.norm(phintsnrs)
metric_identifier = LowSNRPhaseupSolintOrigin.COMBINED_LE_INTEGRATION_TIME.value
original_solint = combinesolint
decision_threshold = timedelta_integration_time
reason = f"Optimal combined solint <= integration time ({timedelta_integration_time})"
# If the optimal combined solint is still greater than the
# maximum allowed value (i.e. really low SNR scenario):
elif combinesolint > inputs.phaseupmaxsolint:
# Set the solution interval to the maximum allowed value,
# but slightly adjust the value to round it to the nearest
# integer multiple of the integration time. Use quanta to
# turn into a string with unit, and restrict to 3
# significant digits.
finalfactor = round_up(inputs.phaseupmaxsolint / timedelta_integration_time)
finalsolint = quanta.tos(quanta.quantity(timedelta_integration_time * finalfactor, 's'), 3)
snr_expected = numpy.sqrt(finalfactor) * numpy.linalg.norm(phintsnrs)
LOG.info(
f"{inputs.ms.basename}: the combined spw solution interval, {combinesolint:.3f}s, is"
f" greater than {inputs.phaseupmaxsolint}s, adjusting to {finalsolint}, solution SNR"
f" = {snr_expected:.2f}, < {inputs.phaseupsnr}."
)
metric_identifier = LowSNRPhaseupSolintOrigin.COMBINED_GT_PHASEUPMAXSOLINT.value
original_solint = combinesolint
decision_threshold = inputs.phaseupmaxsolint
reason = f"Combined solution interval greater than upper limit ({inputs.phaseupmaxsolint})"
# Otherwise, proceed with the optimal combined solint.
else:
# Use quanta to turn into a string with unit, and restrict
# to 3 significant digits.
finalsolint = quanta.tos(quanta.quantity(combinesolint), 3)
snr_expected = numpy.sqrt(combfactor) * numpy.linalg.norm(phintsnrs)
metric_identifier = LowSNRPhaseupSolintOrigin.COMBINED_LT_PHASEUPMAXSOLINT.value
original_solint = combinesolint
decision_threshold = inputs.phaseupmaxsolint
reason = f"Combined solution interval less than upper limit ({inputs.phaseupmaxsolint})"
adjustments.append(
adjustment_cls(
applies_to=applies_to,
original=original_solint,
adjusted=finalsolint,
threshold=decision_threshold,
origin=metric_identifier,
reason=reason,
)
)
return finalsolint, phaseup_combine, snr_expected, adjustments
# Compute the phaseup solution.
def _do_phaseup(self, combine='', solint='int'):
inputs = self.inputs
# Set input parameters for phase gaincal.
phaseup_inputs = gaincal.GTypeGaincal.Inputs(
inputs.context,
vis=inputs.vis,
field=inputs.field,
spw=self._get_phaseup_spw(),
antenna=inputs.antenna,
intent=inputs.intent,
solint=solint,
combine=combine,
refant=inputs.refant,
minblperant=inputs.minblperant,
calmode='p',
minsnr=inputs.minsnr
)
# Create and run gaincal task.
phaseup_task = gaincal.GTypeGaincal(phaseup_inputs)
result = self._executor.execute(phaseup_task, merge=False)
# Log warning if gaincal did not return a caltable.
if not result.final:
LOG.warning(f"No bandpass {'phase offsets' if solint == 'inf' else 'phaseup'} solution for "
f" {inputs.ms.basename}.")
else:
# If the phaseup uses SpW combination, then first update the
# CalApplication to add in a spectral window mapping.
if combine == 'spw':
# Retrieve a mapping for combining the science SpWs.
scispws = inputs.ms.get_spectral_windows(task_arg=inputs.spw)
spwmap = phasespwmap.combine_spwmap(scispws)
LOG.info(f"{inputs.ms.basename} - combined spw map for phaseup solution: {spwmap}.")
# There should be only a single CalApplication, so replace that
# one with the modified CalApplication that includes the SpW
# mapping.
modified_calapp = callibrary.copy_calapplication(result.pool[0], spwmap=spwmap)
result.pool[0] = modified_calapp
result.final[0] = modified_calapp
# Register the new phase caltable in the local context, to ensure it
# is pre-applied in subsequent gaincal calls.
result.accept(inputs.context)
return result
# Compute a standard bandpass
def _do_bandpass(self) -> BandpassResults:
bandpass_task = bandpassmode.BandpassMode(self.inputs)
return self._executor.execute(bandpass_task)
# Compute the smoothed bandpass
def _do_smoothed_bandpass(self) -> BandpassResults | None:
inputs = self.inputs
# Store original values of some parameters.
orig_spw = inputs.spw
orig_solint = inputs.solint
orig_append = inputs.append
try:
# initialize the caltable and list of spws
inputs.caltable = inputs.caltable
spwlist = inputs.ms.get_spectral_windows(orig_spw)
# will hold the CalAppOrigins that record how each CalApp was
# generate. Ideally this would be a list on the CalApp itself,
# but we don't have time to do that right now.
calapp_origins = []
# Loop through the spw appending the results of each spw
# to the results of the previous one.
for spw in spwlist:
# TDM or FDM
dd = inputs.ms.get_data_description(spw=spw)
if dd is None:
LOG.debug('Missing data description for spw %s' % spw.id)
continue
ncorr = len(dd.corr_axis)
if ncorr not in [1, 2, 4]:
LOG.debug('Wrong number of correlations %s for spw %s' %
(ncorr, spw.id))
continue
# Smooth if FDM and if it makes sense
if ncorr * spw.num_channels > 256:
if (spw.num_channels // inputs.maxchannels) < 1:
LOG.info(f"{inputs.ms.basename}: Too few channels ({spw.num_channels}) in SpW {spw.id} to use"
f" smoothing (maxchannels={inputs.maxchannels}), reverting to default bandpass solint"
f" {orig_solint}.")
inputs.solint = orig_solint
else:
# PIPE-2036: work-around for potential issue caused by:
# * PL defined solint as a frequency interval, and
# this may correspond to an exact nr. of channels.
# * the frequency interval is passed with limited
# precision (typically in MHz with 6 decimals, i.e.
# a precision of Hz)
# * CASA's bandpass converts the frequency interval
# back to nr. of channels and then take the floor
#
# This can result in e.g. a required nr. of channels of
# 5 corresponding to 4.8828125 MHz but getting passed as
# 4.882812 MHz, then converted back to 4.999999
# channels, and floored to 4.
#
# As a work-around, check whether the frequency interval
# would trigger this, and if so, then round *up* the
# frequency interval to nearest Hz.
bandwidth = spw.bandwidth.to_units(otherUnits=measures.FrequencyUnits.HERTZ)
newsolint = bandwidth / inputs.maxchannels
chanwidth = bandwidth / spw.num_channels
if round(newsolint) / chanwidth < math.floor(newsolint / chanwidth):
inputs.solint = f"{orig_solint},{round_up(newsolint) * 1.e-6:f}MHz"
else:
inputs.solint = f"{orig_solint},{float(newsolint) * 1.e-6:f}MHz"
LOG.info(f"{inputs.ms.basename}: using smoothed bandpass solint {inputs.solint} for SpW"
f" {spw.id}.")
else:
inputs.solint = orig_solint
LOG.warning(f"Reverting to default bandpass solint {inputs.solint} for spw {spw.id} in MS"
f" {inputs.ms.basename}")
# PIPE-2116: if requested, set fillgaps to a quarter of the
# number of channels of current SpW.
if inputs.hm_auto_fillgaps:
inputs.fillgaps = int(spw.num_channels / 4)
else:
inputs.fillgaps = 0
# Compute and append bandpass solution
inputs.spw = spw.id
bandpass_task = bandpassmode.BandpassMode(inputs)
result = self._executor.execute(bandpass_task)
if os.path.exists(self.inputs.caltable):
self.inputs.append = True
self.inputs.caltable = result.final[-1].gaintable
calapp_origins.extend(result.final[-1].origin)
# Reset the calto spw list
result.pool[0].calto.spw = orig_spw
if result.final:
result.final[0].calto.spw = orig_spw
result.final[0].origin = calapp_origins
return result
finally:
inputs.spw = orig_spw
inputs.solint = orig_solint
inputs.append = orig_append
# Compute the bandpass using SNR estimates
def _do_snr_bandpass(self, snr_result) -> BandpassResults | None:
inputs = self.inputs
quanta = casa_tools.quanta
# Store original values of some parameters.
orig_spw = inputs.spw
orig_solint = inputs.solint
orig_append = inputs.append
adjustments: list[SolintAdjustment] = []
try:
# initialize the caltable and list of spws
inputs.caltable = inputs.caltable
spwlist = inputs.ms.get_spectral_windows(orig_spw)
# will hold the CalAppOrigins that record how each CalApp was
# generate. Ideally this would be a list on the CalApp itself,
# but we don't have time to do that right now.
calapp_origins = []
for spw in spwlist:
# TDM or FDM
dd = inputs.ms.get_data_description(spw=spw)
if dd is None:
LOG.debug('Missing data description for spw %s' % spw.id)
continue
ncorr = len(dd.corr_axis)
if ncorr not in [1, 2, 4]:
LOG.debug('Wrong number of correlations %s for spw %s' %
(ncorr, spw.id))
continue
# Find the best solint for that window
try:
solindex = snr_result.spwids.index(spw.id)
except:
solindex = -1
# Use the optimal solution if it is good enough otherwise
# revert to the default smoothing algorithm
if solindex >= 0:
if snr_result.nbpsolutions[solindex] < inputs.bpnsols:
# Recompute the solution interval to force the minimum
# number of solution channels
factor = 1.0 / inputs.bpnsols
newsolint = quanta.tos(quanta.mul(snr_result.bandwidths[solindex], factor))
inputs.solint = orig_solint + ',' + newsolint
original = str(to_frequency(snr_result.bpsolints[solindex]))
adjusted = f'{orig_solint},{str(to_frequency(newsolint))}'
vis = os.path.basename(inputs.vis)
adjustments.append(
SolintAdjustment(
applies_to=TargetDataSelection(vis={vis}, spw={spw.id}),
original=original,
threshold=inputs.bpnsols,
adjusted=adjusted,
origin=LowSNRPhaseupSolintOrigin.TOO_FEW_CHANNELS.value,
reason=f'Too few channels: changing recommended bandpass solint from {original} to {adjusted} for spw {spw.id}'
)
)
else:
inputs.solint = orig_solint + ',' + \
snr_result.bpsolints[solindex]
LOG.info('Setting bandpass solint to %s for spw %s' % (inputs.solint, spw.id))
elif ncorr * spw.num_channels > 256:
LOG.warning(f"{inputs.ms.basename}: no SNR based bandpass solint was found for spw {spw.id},"
f" reverting to smoothing algorithm.")
if (spw.num_channels // inputs.maxchannels) < 1:
LOG.info(f"{inputs.ms.basename}: Too few channels ({spw.num_channels}) in SpW {spw.id} to use"
f" smoothing (maxchannels={inputs.maxchannels}), reverting to default bandpass solint"
f" {orig_solint}.")
inputs.solint = orig_solint
else:
# PIPE-2036: work-around for potential issue caused by:
# * PL defined solint as a frequency interval, and
# this may correspond to an exact nr. of channels.
# * the frequency interval is passed with limited
# precision (typically in MHz with 6 decimals, i.e.
# a precision of Hz)
# * CASA's bandpass converts the frequency interval
# back to nr. of channels and then take the floor
#
# This can result in e.g. a required nr. of channels of
# 5 corresponding to 4.8828125 MHz but getting passed as
# 4.882812 MHz, then converted back to 4.999999
# channels, and floored to 4.
#
# As a work-around, check whether the frequency interval
# would trigger this, and if so, then round *up* the
# frequency interval to nearest Hz.
bandwidth = spw.bandwidth.to_units(otherUnits=measures.FrequencyUnits.HERTZ)
newsolint = bandwidth / inputs.maxchannels
chanwidth = bandwidth / spw.num_channels
if round(newsolint) / chanwidth < math.floor(newsolint / chanwidth):
inputs.solint = f"{orig_solint},{round_up(newsolint) * 1.e-6:f}MHz"
else:
inputs.solint = f"{orig_solint},{float(newsolint) * 1.e-6:f}MHz"
LOG.info(f"{inputs.ms.basename}: using smoothed bandpass solint {inputs.solint} for SpW"
f" {spw.id}.")
else:
inputs.solint = orig_solint
LOG.warning("Reverting to default bandpass solint %s for spw %s in MS %s" %
(inputs.solint, spw.id, inputs.ms.basename))
# PIPE-2116: if requested, set fillgaps to a quarter of the
# number of channels of current SpW.
if inputs.hm_auto_fillgaps:
inputs.fillgaps = int(spw.num_channels / 4)
else:
inputs.fillgaps = 0
# Compute and append bandpass solution
inputs.spw = spw.id
bandpass_task = bandpassmode.BandpassMode(inputs)
result = self._executor.execute(bandpass_task)
if os.path.exists(self.inputs.caltable):
self.inputs.append = True
self.inputs.caltable = result.final[-1].gaintable
calapp_origins.extend(result.final[-1].origin)
# Reset the calto spw list
result.pool[0].calto.spw = orig_spw
if result.final:
result.final[0].calto.spw = orig_spw
result.final[0].origin = calapp_origins
result.solint_adjustments.extend(adjustments)
return result
finally:
inputs.spw = orig_spw
inputs.solint = orig_solint
inputs.append = orig_append
# Compute spws using bandwidth parameters
def _get_phaseup_spw(self):
"""
ms -- measurement set object
spwstr -- comma delimited list of spw ids
bandwidth -- bandwidth in Hz of central channels used to
phaseup
"""
inputs = self.inputs
# Add the channel ranges in. Note that this currently assumes no prior
# channel selection.
if inputs.phaseupbw == '':
return inputs.spw
# Convert bandwidth input to CASA quantity and then on to pipeline
# domain Frequency object
quanta = casa_tools.quanta
bw_quantity = quanta.convert(quanta.quantity(inputs.phaseupbw), 'Hz')
bandwidth = measures.Frequency(quanta.getvalue(bw_quantity)[0],
measures.FrequencyUnits.HERTZ)
# Loop over the spws creating a new list with channel ranges
outspw = []
for spw in self.inputs.ms.get_spectral_windows(self.inputs.spw):
cen_freq = spw.centre_frequency
lo_freq = cen_freq - bandwidth / 2.0
hi_freq = cen_freq + bandwidth / 2.0
minchan, maxchan = spw.channel_range(lo_freq, hi_freq)
cmd = '{0}:{1}~{2}'.format(spw.id, minchan, maxchan)
outspw.append(cmd)
return ','.join(outspw)
[docs]
@task_registry.set_equivalent_casa_task('hifa_bandpass')
@task_registry.set_casa_commands_comment(
'The spectral response of each antenna is calibrated. A short-solint phase gain is calculated to remove '
'decorrelation of the bandpass calibrator before the bandpass is calculated.'
)
class ALMAPhcorBandpass(sessionutils.ParallelTemplate):
Inputs = ALMAPhcorBandpassInputs
Task = SerialALMAPhcorBandpass
class SessionALMAPhcorBandpassInputs(ALMAPhcorBandpassInputs):
# We want to apply bandpass calibrations from BANDPASS scans, not
# fall back to calibration against PHASE or AMPLITUDE scans
intent = vdp.VisDependentProperty(default='BANDPASS')
# use common implementation for parallel inputs argument
parallel = sessionutils.parallel_inputs_impl()
def __init__(self, context, mode=None, hm_phaseup=None, phaseupbw=None, phaseupsolint=None, phaseupsnr=None,
phaseupnsols=None, hm_bandpass=None, solint=None, maxchannels=None, evenbpints=None, bpsnr=None,
minbpsnr=None, bpnsols=None, parallel=None, **parameters):
super().__init__(context, mode=mode, hm_phaseup=hm_phaseup,
phaseupbw=phaseupbw, phaseupsolint=phaseupsolint,
phaseupsnr=phaseupsnr, phaseupnsols=phaseupnsols,
hm_bandpass=hm_bandpass, solint=solint,
maxchannels=maxchannels, evenbpints=evenbpints,
bpsnr=bpsnr, minbpsnr=minbpsnr,
bpnsols=bpnsols, **parameters)
self.parallel = parallel
BANDPASS_MISSING = '___BANDPASS_MISSING___'
[docs]
@task_registry.set_equivalent_casa_task('session_bandpass')
class SessionALMAPhcorBandpass(basetask.StandardTaskTemplate):
Inputs = SessionALMAPhcorBandpassInputs
is_multi_vis_task = True
[docs]
def prepare(self):
inputs = self.inputs
vis_list = sessionutils.as_list(inputs.vis)
assessed = []
with sessionutils.VDPTaskFactory(inputs, self._executor, SerialALMAPhcorBandpass) as factory:
task_queue = [(vis, factory.get_task(vis)) for vis in vis_list]
for (vis, (task_args, task)) in task_queue:
# only launch jobs for MSes with bandpass calibrators.
# The analyse() method will subsequently adopt the
# appropriate bandpass calibration table from one of
# the completed jobs.
ms = inputs.context.observing_run.get_ms(vis)
if 'BANDPASS' not in ms.intents:
assessed.append(sessionutils.VisResultTuple(vis, task_args, BANDPASS_MISSING))
continue
try:
worker_result = task.get_result()
except exceptions.PipelineException as e:
assessed.append(sessionutils.VisResultTuple(vis, task_args, e))
else:
assessed.append(sessionutils.VisResultTuple(vis, task_args, worker_result))
return assessed
[docs]
def analyse(self, assessed):
# all results will be added to this object
final_result = basetask.ResultsList()
context = self.inputs.context
session_groups = sessionutils.group_into_sessions(context, assessed)
for session_id, session_results in session_groups.items():
# get a list of MeasurementSet domain objects for all MSes
# in this session
session_mses = [context.observing_run.get_ms(vis_result.vis) for vis_result in session_results]
# get a list of all bandpass scans in this session,
# flattening the list of lists to form a single list of
# scan domain objects
bandpass_scans = list(itertools.chain(*[ms.get_scans(scan_intent='BANDPASS') for ms in session_mses]))
# create a dict of scan objects to name of the MS
# containing that scan
scans_to_vis = {scan: ms.name
for scan in bandpass_scans
for ms in session_mses
if scan in ms.scans}
# create a dict of scan object to results object for the MS
# containing that scan
scan_to_result = {scan: result
for vis, _, result in session_results
for scan in bandpass_scans
if vis == scans_to_vis[scan]}
for vis, task_args, vis_result in session_results:
if vis_result == BANDPASS_MISSING:
# No bandpass calibrator for this MS, so adopt the
# nearest bandpass calibration in time
# get bandpass closest in time, identified from the
# centre of the bandpass scan to the centre of this
# observation
no_bandpass_ms = context.observing_run.get_ms(vis)
centre_time = centre_datetime_from_epochs(no_bandpass_ms.start_time, no_bandpass_ms.end_time)
closest_scan = min(bandpass_scans, key=lambda scan: get_time_delta_seconds(centre_time, scan))
# identify which result contains this closest scan
# and adopt its CalApplications
result_to_add = scan_to_result[closest_scan]
adopted_ms = context.observing_run.get_ms(result_to_add.inputs['vis'])
LOG.info('Adopting calibrations from {!s} for {!s}'
''.format(adopted_ms.basename, no_bandpass_ms.basename))
fake_result = BandpassResults(applies_adopted=True)
fake_result.inputs = task_args
fake_result.stage_number = result_to_add.stage_number
for calapp in result_to_add.final:
session_calto = calapp.calto
session_calfrom = calapp.calfrom
# remap spectral windows to apply calibration
# to
my_spw = sessionutils.remap_spw_str(adopted_ms, no_bandpass_ms, session_calto.spw)
my_calto = callibrary.CalTo(vis=vis, field='', spw=my_spw, antenna='', intent='')
for cf in session_calfrom:
# remap spectral windows to take
# calibration from
my_spwmap = sessionutils.remap_spw_int(adopted_ms, no_bandpass_ms, cf.spwmap)
my_calfrom = callibrary.CalFrom(gaintable=cf.gaintable,
gainfield=cf.gainfield,
interp=cf.interp,
spwmap=my_spwmap,
caltype=cf.caltype,
calwt=cf.calwt)
remapped_calapp = callibrary.CalApplication(my_calto, my_calfrom, origin=calapp.origin)
fake_result.final.append(remapped_calapp)
final_result.append(fake_result)
elif isinstance(vis_result, Exception):
LOG.error('No bandpass solution created for {!s}'.format(os.path.basename(vis)))
fake_result = BandpassResults()
fake_result.inputs = task_args
final_result.append(fake_result)
else:
# the bandpass job for an individual MS is wrapped
# in a ResultsList, hence [0].
final_result.append(vis_result)
return final_result
def centre_datetime_from_epochs(epoch1, epoch2):
"""
Get the time midpoint between two epochs.
:param epoch1: epoch 1
:param epoch2: epoch 2
:return: time between epoch1 and epoch2
:rtype: datetime.datetime
"""
time1 = utils.get_epoch_as_datetime(epoch1)
time2 = utils.get_epoch_as_datetime(epoch2)
# use min & max so it doesn't depend on correct time ordering of
# arguments
start = min([time1, time2])
end = max([time1, time2])
return start + (end - start) / 2
def get_time_delta_seconds(time, scan):
"""
Get the absolute time difference between a time t and a scan's time
of observation. The time difference is calculated as the difference
between t and the midpoint of the scan.
:param time: time point
:type time: datetime.datetime
:param scan: scan to measure time to
:type scan: Scan domain object
:return: time between time and scan midpoint
:rtype: datetime.timedelta
"""
scan_centre = centre_datetime_from_epochs(scan.start_time, scan.end_time)
dt = time - scan_centre
return abs(dt.total_seconds())
def to_frequency(val: str) -> measures.Frequency:
"""
Converts a string representation of a frequency value to a Frequency object.
:param val: The string representation of the frequency value (e.g., '100MHz', '1GHz').
:return: A Frequency object representing the frequency value.
"""
qa = casa_tools.quanta
as_quantity = qa.quantity(val)
hz_quantity = qa.convert(as_quantity, 'Hz')
hz_val = hz_quantity['value']
as_frequency = measures.Frequency(hz_val, units=measures.FrequencyUnits.HERTZ)
return as_frequency