from __future__ import annotations
import collections
import copy
import os
from typing import TYPE_CHECKING
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.sessionutils as sessionutils
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.domain import DataType
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry
from ...heuristics.fieldnames import IntentFieldnames
if TYPE_CHECKING:
from collections.abc import Callable
from pipeline.infrastructure.callibrary import IntervalCalState
__all__ = [
'Applycal',
'SerialApplycal',
'ApplycalInputs',
'ApplycalResults',
]
LOG = infrastructure.logging.get_logger(__name__)
class ApplycalInputs(vdp.StandardInputs):
"""
ApplycalInputs defines the inputs for the Applycal pipeline task.
"""
# PIPE-1691: hif_applycal is now implicitly a parallel task, but by default
# running with parallel=False.
parallel = sessionutils.parallel_inputs_impl(default=False)
@vdp.VisDependentProperty
def antenna(self):
return ''
@antenna.convert
def antenna(self, value):
antennas = self.ms.get_antenna(value)
# if all antennas are selected, return ''
if len(antennas) == len(self.ms.antennas):
return ''
return utils.find_ranges([a.id for a in antennas])
applymode = vdp.VisDependentProperty(default='calflagstrict')
@vdp.VisDependentProperty
def field(self):
# this will give something like '0542+3243,0343+242'
field_finder = IntentFieldnames()
intent_fields = field_finder.calculate(self.ms, self.intent)
# run the answer through a set, just in case there are duplicates
fields = set()
fields.update(utils.safe_split(intent_fields))
return ','.join(fields)
flagbackup = vdp.VisDependentProperty(default=True)
flagdetailedsum = vdp.VisDependentProperty(default=False)
flagsum = vdp.VisDependentProperty(default=True)
intent = vdp.VisDependentProperty(default='TARGET,PHASE,BANDPASS,AMPLITUDE,CHECK')
parang = vdp.VisDependentProperty(default=False)
@vdp.VisDependentProperty
def spw(self):
science_spws = self.ms.get_spectral_windows(with_channels=True)
return ','.join([str(spw.id) for spw in science_spws])
# docstring and type hints: supplements h_applycal
def __init__(self, context, output_dir=None, vis=None, field=None, spw=None, antenna=None, intent=None,
parang=None, applymode=None, flagbackup=None, flagsum=None, flagdetailedsum=None,
parallel=None):
"""Initialize Inputs.
Args:
context: Pipeline context object containing state information.
output_dir: Output directory.
Defaults to None, which corresponds to the current working directory.
vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets in the pipeline context.
example: ``['X227.ms']``
field: A string containing the list of field names or field ids to which the calibration will be applied. Defaults to all fields in the pipeline
context.
example: ``'3C279'``, ``'3C279,M82'``
spw: The list of spectral windows and channels to which the calibration will be applied. Defaults to all science windows in the pipeline
context.
example: ``'17'``, ``'11,15'``
antenna: The selection of antennas to which the calibration will be applied. Defaults to all antennas. Not currently supported.
intent: A string containing the list of intents against which the selected fields will be matched. Defaults to all supported intents
in the pipeline context.
example: ``'*TARGET*'``
parang: Apply parallactic angle correction
applymode: Calibration apply mode
- 'calflag': calibrate data and apply flags from solutions
- 'calflagstrict': (default) same as above except flag spws for which calibration is
unavailable in one or more tables (instead of allowing them to pass
uncalibrated and unflagged)
- 'trial': report on flags from solutions, dataset entirely unchanged
- 'flagonly': apply flags from solutions only, data not calibrated
- 'flagonlystrict': same as above except flag spws for which calibration is
unavailable in one or more tables
- 'calonly': calibrate data only, flags from solutions NOT applied
flagbackup: Backup the flags before the apply
flagsum: Compute before and after flagging summary statistics
flagdetailedsum: Compute detailed before and after flagging statistics summaries. Parameter available only when if flagsum is ``True``.
parallel: Process multiple MeasurementSets in parallel using the casampi parallelization framework.
Options: ``'automatic'``, ``'true'``, ``'false'``, ``True``, ``False``
Default: ``None`` (equivalent to ``False``)
"""
super().__init__()
# pipeline inputs
self.context = context
# vis must be set first, as other properties may depend on it
self.vis = vis
self.output_dir = output_dir
# data selection arguments
self.field = field
self.spw = spw
self.antenna = antenna
self.intent = intent
# solution parameters
self.parang = parang
self.applymode = applymode
self.flagbackup = flagbackup
self.flagsum = flagsum
self.flagdetailedsum = flagdetailedsum
self.parallel = parallel
def to_casa_args(self):
casa_args = super().to_casa_args()
del casa_args['flagsum']
del casa_args['flagdetailedsum']
del casa_args['parallel']
return casa_args
class ApplycalResults(basetask.Results):
"""
ApplycalResults is the results class for the pipeline Applycal task.
"""
def __init__(self, applied=None, callib_map: dict[str, str]=None,
data_type: DataType | None=None):
"""
Construct and return a new ApplycalResults.
The resulting object should be initialized with a list of
CalibrationTables corresponding to the caltables applied by this task.
:param applied: caltables applied by this task
:type applied: List of :class:`~pipeline.domain.caltable.CalibrationTable`
"""
if applied is None:
applied = []
if callib_map is None:
callib_map = {}
super().__init__()
self.applied = set()
self.applied.update(applied)
self.callib_map = dict(callib_map)
self.data_type = data_type
def merge_with_context(self, context):
"""
Merges these results with the given context by examining the context
and marking any applied caltables, so removing them from subsequent
on-the-fly calibration calculations.
See :method:`~pipeline.Results.merge_with_context`
"""
if not self.applied:
LOG.error('No results to merge')
for calapp in self.applied:
LOG.trace('Marking %s as applied' % calapp.as_applycal())
context.callibrary.mark_as_applied(calapp.calto, calapp.calfrom)
# Update data_column
if self.data_type is not None:
msobj = context.observing_run.get_ms(self.inputs['vis'])
colname = 'CORRECTED_DATA'
# Temporal workaround: restoredata merges context twice
if msobj.get_data_column(self.data_type) != colname:
msobj.set_data_column(self.data_type, colname)
def __repr__(self):
s = 'ApplycalResults:\n'
for caltable in self.applied:
if isinstance(caltable.gaintable, list):
basenames = [os.path.basename(x) for x in caltable.gaintable]
s += '\t{name} applied to {vis} spw #{spw}\n'.format(
spw=caltable.spw, vis=os.path.basename(caltable.vis),
name=','.join(basenames))
else:
s += '\t{name} applied to {vis} spw #{spw}\n'.format(
name=caltable.gaintable, spw=caltable.spw,
vis=os.path.basename(caltable.vis))
return s
class SerialApplycal(basetask.StandardTaskTemplate):
"""
Applycal executes CASA applycal tasks for the current active context
state, applying calibrations registered with the pipeline context to the
target measurement set.
Applying the results from this task to the context marks the referred
tables as applied. As a result, they will not be included in future
on-the-fly calibration arguments.
"""
Inputs = ApplycalInputs
# DataType to be set for a new column
applied_data_type = DataType.REGCAL_CONTLINE_ALL
def __init__(self, inputs):
super().__init__(inputs)
def modify_task_args(self, task_args):
task_args['antenna'] = '*&*'
return task_args
def _get_flagsum_arg(self, args):
return args
def _tweak_flagkwargs(self, template):
return template
def prepare(self):
inputs = self.inputs
# Get the target data selection for this task as a CalTo object
calto = callibrary.get_calto_from_inputs(inputs)
# Now get the calibration state for that CalTo data selection. The
# returned dictionary of CalTo:CalFroms specifies the calibrations to
# be applied and the data selection to apply them to. While the CASA
# callibrary could use the full, untrimmed calibration state for the
# whole MS, we need the trimmed version for 1) the web log, where it
# is used to state what specific calibrations were applied, and 2),
# the pipeline callibrary, where we mark the target data selection as
# having calibration applied.
#
# Note that no 'ignore' argument is given to get_calstate
# (specifically, we don't say ignore=['calwt'] like many other tasks)
# as applycal is a task that can handle calwt and so different values
# of calwt should in this case result in different tasks.
calstate = inputs.context.callibrary.get_calstate(calto)
calstate = callibrary.fix_cycle0_data_selection(self.inputs.context, calstate)
# The 'hide_empty=True' is important here. The working calstate
# contains empty state for MSes outside the scope of this task, i.e.,
# MSes in the context that are not specified in inputs.vis for this
# task. These extra MSes cause problems in weblog code downstream,
# which expects results.applied to refer to just the MSes specified in
# the inputs. Using hide_empty gives the data the expected shape.
merged = calstate.merged(hide_empty=True)
# Read callibrary toggle: config.yaml default, env var overrides
from pipeline.config import get_pipeline_config
_disable_callib = get_pipeline_config(
'heuristics.disable_casa_callibrary', False,
env_var='DISABLE_CASA_CALLIBRARY', as_type=bool)
if _disable_callib:
LOG.info('CASA callibrary disabled: reverting to non-callibrary applycal call')
jobs = jobs_without_calapply(merged, inputs, self.modify_task_args)
elif contains_uvcont_table(merged):
LOG.info('Calibration state contains uvcont tables: reverting to non-callibrary applycal call')
jobs = jobs_without_calapply(merged, inputs, self.modify_task_args)
else:
LOG.info('No uvcont tables in calibration state: using CASA callibrary applycal.')
jobs = jobs_with_calapply(calstate, inputs, self.modify_task_args)
# if requested, schedule additional flagging tasks to determine
# statistics
if inputs.flagsum:
summary_args = dict(vis=inputs.vis, mode='summary')
# give subclasses a chance to tweak flag summary arguments
summary_args = self._get_flagsum_arg(summary_args)
# schedule a flagdata summary jobs either side of the applycal jobs
jobs.insert(0, casa_tasks.flagdata(name='before', **summary_args))
jobs.append(casa_tasks.flagdata(name='applycal', **summary_args))
if inputs.flagdetailedsum:
ms = inputs.context.observing_run.get_ms(inputs.vis)
# Schedule a flagdata job to determine flagging stats per spw
# and per field
flagkwargs = ["spw='{!s}' fieldcnt=True mode='summary' name='AntSpw{:0>3}'".format(spw.id, spw.id)
for spw in ms.get_spectral_windows()]
# give subclasses a change to tweak flag arguments
flagkwargs = self._tweak_flagkwargs(flagkwargs)
jobs.append(casa_tasks.flagdata(vis=inputs.vis, mode='list', inpfile=flagkwargs, flagbackup=False))
# execute the jobs and capture the output
job_results = [self._executor.execute(job) for job in jobs]
flagdata_results = [job_result for job, job_result in zip(jobs, job_results) if job.fn_name == 'flagdata']
applied_calapps = [callibrary.CalApplication(calto, calfroms) for calto, calfroms in merged.items()]
# give a dict like {'abc123.ms': 'path/to/callibrary'}. The use of
# dict assumes that there is only one jobrequest per MS, which is true
# when the CASA callibrary is used.
vis_to_callib = {job.kw['vis']: job.kw['callib'] for job in jobs
if job.fn_name == 'applycal' and 'callib' in job.kw}
result = ApplycalResults(applied_calapps, callib_map=vis_to_callib,
data_type=self.applied_data_type)
# add and reshape the flagdata results if required
if inputs.flagsum:
result.summaries = [flagdata_results[0], flagdata_results[1]]
if inputs.flagdetailedsum:
reshaped_flagsummary = reshape_flagdata_summary(flagdata_results[2])
processed_flagsummary = self.process_flagsummary(reshaped_flagsummary)
result.flagsummary = processed_flagsummary
return result
def analyse(self, result):
return result
def process_flagsummary(self, flagsummary):
"""
Template entry point for processing flagdata summary dicts. Override
this function to filter or otherwise process the flagdata summary
results.
:param flagsummary: the unfiltered, unprocessed flagsummary dict
:return:
"""
return flagsummary
def reshape_flagdata_summary(flagdata_result):
"""
Reshape a flagdata result so that results are grouped by field.
:param flagdata_result:
:return:
"""
# Set into single dictionary report (single spw) if only one dict returned
if not all([key.startswith('report') for key in flagdata_result]):
flagdata_result = {'report0': flagdata_result}
flagsummary = collections.defaultdict(dict)
for report_level, report in flagdata_result.items():
report_name = report['name']
report_type = report['type']
# report keys are all fieldnames with the exception of 'name' and
# 'type', which are in there too.
for field_name in [key for key in report if key not in ('name', 'type')]:
# deepcopy to avoid modifying the results dict
flagsummary[field_name][report_level] = copy.deepcopy(report[field_name])
flagsummary[field_name][report_level]['name'] = '{!s}Field_{!s}'.format(report_name, field_name)
flagsummary[field_name][report_level]['type'] = report_type
return flagsummary
[docs]
@task_registry.set_equivalent_casa_task('h_applycal')
@task_registry.set_casa_commands_comment('Calibrations are applied to the data. Final flagging summaries are computed')
class Applycal(sessionutils.ParallelTemplate):
Inputs = ApplycalInputs
Task = SerialApplycal
def jobs_without_calapply(merged, inputs, mod_fn):
jobs = []
# sort for a stable applycal order, to make diffs easier to parse
for calto, calfroms in sorted(merged.items()):
# if there's nothing to apply for this data selection, continue. This
# should never be seen as merged is called with hide_empty=True
if not calfroms:
LOG.info('There is no calibration information for field %s intent %s spw %s in %s' %
(str(calto.field), str(calto.intent), str(calto.spw), inputs.ms.basename))
continue
# arrange a calibration job for the unique data selection
inputs.spw = calto.spw
inputs.field = calto.field
inputs.intent = calto.intent
task_args = inputs.to_casa_args()
# set the on-the-fly calibration state for the data selection.
calapp = callibrary.CalApplication(calto, calfroms)
task_args['gaintable'] = calapp.gaintable
task_args['gainfield'] = calapp.gainfield
task_args['spwmap'] = calapp.spwmap
task_args['interp'] = calapp.interp
task_args['calwt'] = calapp.calwt
task_args['applymode'] = inputs.applymode
# give subclasses a chance to modify the task arguments
task_args = mod_fn(task_args)
jobs.append(casa_tasks.applycal(**task_args))
return jobs
def jobs_with_calapply(calstate: IntervalCalState, inputs: ApplycalInputs, mod_fn: Callable):
callibrary_file = '{}.s{}.{}.callibrary'.format(inputs.vis,
inputs.context.task_counter,
inputs.context.subtask_counter)
ms = inputs.context.observing_run.get_ms(inputs.vis)
calstate.export_to_casa_callibrary(ms, callibrary_file)
# No callibrary file will be created when the merged calstate does not
# require the application of calibrations.
if not os.path.exists(callibrary_file):
LOG.info('No applycal job required for CASA callibrary: {}'.format(callibrary_file))
return []
calstate_file = '{}.s{}.{}.calstate'.format(inputs.vis,
inputs.context.task_counter,
inputs.context.subtask_counter)
with open(calstate_file, "w") as applyfile:
applyfile.write('# Apply file for %s\n' % (os.path.basename(inputs.vis)))
applyfile.write(calstate.as_applycal())
task_args = inputs.to_casa_args()
# Don't delete spw, field, or intents as the inputs may request
# calibration of a subset of the total MS. The CASA callibrary can
# still define calibration for the whole MS, that's not a problem.
for a in ['gaintable', 'gainfield', 'spwmap', 'interp', 'calwt']:
if a in task_args:
del task_args[a]
task_args['applymode'] = inputs.applymode
task_args['docallib'] = True
task_args['callib'] = callibrary_file
mod_fn(task_args)
return [casa_tasks.applycal(**task_args)]
def contains_uvcont_table(merged):
return 'uvcont' in [calfrom.caltype for calfroms in merged.values() for calfrom in calfroms]