import operator
import os
import shutil
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.sessionutils as sessionutils
import pipeline.infrastructure.tablereader as tablereader
import pipeline.infrastructure.vdp as vdp
from pipeline.domain import DataType
from pipeline.hif.heuristics.auto_selfcal.selfcal_helpers import get_calinfo_from_ms
from pipeline.infrastructure import casa_tasks, task_registry
LOG = infrastructure.get_logger(__name__)
# Define the minimum set of parameters required to split out
# the TARGET data from the complete and fully calibrated
# original MS. Other parameters will be added here as more
# capabilities are added to hif_mstransform.
class MstransformInputs(vdp.StandardInputs):
# Search order of input vis
processing_data_type = [DataType.REGCAL_CONTLINE_ALL, DataType.RAW]
@vdp.VisDependentProperty
def outputvis(self):
vis_root = os.path.splitext(self.vis)[0]
return vis_root + '_targets.ms'
# By default find all the fields with TARGET intent
@vdp.VisDependentProperty
def field(self):
# Find fields in the current ms that have been observed
# with the desired intent
fields = self.ms.get_fields(intent=self.intent)
# When an observation is terminated the final scan can be a TARGET scan but may
# not contain any TARGET data for the science spectral windows, e.g. there is square
# law detector data but nothing else. The following code removes fields that do not contain
# data for the requested spectral windows - the science spectral windows by default.
if fields:
fields_by_id = sorted(fields, key=operator.attrgetter('id'))
last_field = fields_by_id[-1]
# While we're here, remove any fields that are not linked with a source. This should
# not occur since the CAS-9499 tablereader bug was fixed, but check anyway.
if getattr(last_field, 'source', None) is None:
fields.remove(last_field)
LOG.info('Truncated observation detected (no source for field): '
'removing Field {!s}'.format(last_field.id))
else:
# .. and then any fields that do not contain the
# requested spectral windows. This should prevent
# aborted scans from being split into the TARGET
# measurement set.
requested_spws = set(self.ms.get_spectral_windows(self.spw))
if last_field.valid_spws.isdisjoint(requested_spws):
LOG.info('Truncated observation detected (missing spws): '
'removing Field {!s}'.format(last_field.id))
fields.remove(last_field)
unique_field_names = {f.name for f in fields}
field_ids = {f.id for f in fields}
# Fields with different intents may have the same name. Check for this
# and return the ids rather than the names if necessary to resolve any
# ambiguities
if len(unique_field_names) == len(field_ids):
return ','.join(unique_field_names)
else:
return ','.join([str(i) for i in field_ids])
# Select TARGET data by default
intent = vdp.VisDependentProperty(default='TARGET')
# Find all the spws with TARGET intent. These may be a subset of the
# science spws which include calibration spws.
@vdp.VisDependentProperty
def spw(self):
science_target_intents = set(self.intent.split(','))
science_target_spws = []
science_spws = [spw for spw in self.ms.get_spectral_windows(science_windows_only=True)]
for spw in science_spws:
if spw.intents.intersection(science_target_intents):
science_target_spws.append(spw)
return ','.join([str(spw.id) for spw in science_target_spws])
@spw.convert
def spw(self, value):
science_target_intents = set(self.intent.split(','))
science_target_spws = []
science_spws = [spw for spw in self.ms.get_spectral_windows(task_arg=value, science_windows_only=True)]
for spw in science_spws:
if spw.intents.intersection(science_target_intents):
science_target_spws.append(spw)
return ','.join([str(spw.id) for spw in science_target_spws])
chanbin = vdp.VisDependentProperty(default=1)
timebin = vdp.VisDependentProperty(default='0s')
parallel = sessionutils.parallel_inputs_impl(default=False)
# docstring and type hints: supplements hif_mstransform
def __init__(self, context, output_dir=None, vis=None, outputvis=None, field=None, intent=None, spw=None,
chanbin=None, timebin=None, parallel=None):
"""Initialize Inputs.
Args:
context: Pipeline context object containing state information.
output_dir: Output directory.
Defaults to None, which corresponds to the current working directory.
vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the <hifa,hifv>_importdata task.
'': use all MeasurementSets in the context
Examples: ``'ngc5921.ms'``, ``['ngc5921a.ms', ngc5921b.ms', 'ngc5921c.ms']``
outputvis: A list of output MeasurementSets for line detection and imaging,. This list must have
the same length as the input list.
Default Naming: By default, an input MS named `<msrootname>.ms`
will produce an output named `<msrootname>_targets.ms`.
Examples:
- ``outputvis='ngc5921_targets.ms'``
- ``outputvis=['ngc5921a_targets.ms', 'ngc5921b_targets.ms', 'ngc5921c_targets.ms']``
field: Select fields name(s) or id(s) to transform. Only fields with data matching the intent will be selected.
Examples: ``'3C279'``, ``'Centaurus*'``, ``'3C279,J1427-421'``
intent: Select intents for which associated fields will be imaged. By default only TARGET data is selected.
Examples: ``'PHASE,BANDPASS'``
spw: Select spectral window/channels to image. By default all science spws for which the specified intent is valid are
selected.
chanbin: Width (bin) of input channels to average to form an output channel. If chanbin > 1 then chanaverage is automatically
switched to True.
timebin: Bin width for time averaging. If timebin > 0s then timeaverage is automatically switched to True.
parallel: Process multiple MeasurementSets in parallel using the casampi parallelization framework.
Options: ``'automatic'``, ``'true'``, ``'false'``, ``True``, ``False``
Default: ``None`` (equivalent to ``False``)
"""
super().__init__()
# set the properties to the values given as input arguments
self.context = context
self.vis = vis
self.output_dir = output_dir
self.outputvis = outputvis
self.field = field
self.intent = intent
self.spw = spw
self.chanbin = chanbin
self.timebin = timebin
self.parallel = parallel
def to_casa_args(self):
# Get parameter dictionary.
d = super().to_casa_args()
# Force the data column to be 'corrected' and the
# new (with casa 4.6) reindex parameter to be False
d['datacolumn'] = 'corrected'
d['reindex'] = False
if self.chanbin > 1:
d['chanaverage'] = True
else:
d['chanaverage'] = False
if self.timebin > '0s':
d['timeaverage'] = True
else:
d['timeaverage'] = False
# Remove parallel if it exists, as mstransform does not support it.
d.pop('parallel', None)
return d
class SerialMstransform(basetask.StandardTaskTemplate):
Inputs = MstransformInputs
def prepare(self):
inputs = self.inputs
# Create the results structure
result = MstransformResults(vis=inputs.vis, outputvis=inputs.outputvis)
# Run CASA task
mstransform_args = inputs.to_casa_args()
mstransform_job = casa_tasks.mstransform(**mstransform_args)
try:
self._executor.execute(mstransform_job)
except OSError as ee:
LOG.warning(f"Caught mstransform exception: {ee}")
# Copy across requisite XML files.
self._copy_xml_files(inputs.vis, inputs.outputvis)
# Update output MS history.
self._update_history(inputs.vis, inputs.outputvis)
return result
def analyse(self, result):
# Check for existence of the output vis.
if not os.path.exists(result.outputvis):
LOG.debug('Error creating science targets cont+line MS %s' % (os.path.basename(result.outputvis)))
return result
# Import the new measurement set.
to_import = os.path.relpath(result.outputvis)
observing_run = tablereader.ObservingRunReader.get_observing_run(to_import)
# Adopt same session as source measurement set
for ms in observing_run.measurement_sets:
LOG.debug('Setting session to %s for %s', self.inputs.ms.session, ms.basename)
ms.session = self.inputs.ms.session
LOG.debug('Setting data_column and origin_ms.')
ms.origin_ms = self.inputs.ms.origin_ms
ms.set_data_column(DataType.REGCAL_CONTLINE_SCIENCE, 'DATA')
result.mses.extend(observing_run.measurement_sets)
return result
@staticmethod
def _copy_xml_files(vis, outputvis):
for xml_filename in ['SpectralWindow.xml', 'DataDescription.xml']:
vis_source = os.path.join(vis, xml_filename)
outputvis_targets_contline = os.path.join(outputvis, xml_filename)
if os.path.exists(vis_source) and os.path.exists(outputvis):
LOG.info('Copying %s from original MS to science targets cont+line MS', xml_filename)
LOG.trace('Copying %s: %s to %s', xml_filename, vis_source, outputvis_targets_contline)
shutil.copyfile(vis_source, outputvis_targets_contline)
@staticmethod
def _update_history(vis, outputvis):
get_calinfo_from_ms(vis, save_to_ms=outputvis)
class MstransformResults(basetask.Results):
def __init__(self, vis, outputvis):
super().__init__()
self.vis = vis
self.outputvis = outputvis
self.mses = []
def merge_with_context(self, context):
# Check for an output vis
if not self.mses:
LOG.error('No hif_mstransform results to merge')
return
target = context.observing_run
# Adding mses to context
for ms in self.mses:
LOG.info('Adding {} to context'.format(ms.name))
target.add_measurement_set(ms)
# Create targets flagging template file if it does not already exist
for ms in self.mses:
template_flagsfile = os.path.join(
self.inputs['output_dir'], os.path.splitext(os.path.basename(self.vis))[0] + '.flagtargetstemplate.txt')
self._make_template_flagfile(template_flagsfile, 'User flagging commands file for the imaging pipeline')
# Initialize callibrary
for ms in self.mses:
calto = callibrary.CalTo(vis=ms.name)
LOG.info('Registering {} with callibrary'.format(ms.name))
context.callibrary.add(calto, [])
def _make_template_flagfile(self, outfile, titlestr):
# Create a new file if overwrite is true and the file
# does not already exist.
if not os.path.exists(outfile):
template_text = FLAGGING_TEMPLATE_HEADER.replace('___TITLESTR___', titlestr)
with open(outfile, 'w') as f:
f.writelines(template_text)
def __str__(self):
# Format the Mstransform results.
s = 'MstransformResults:\n'
s += '\tOriginal MS {vis} transformed to {outputvis}\n'.format(
vis=os.path.basename(self.vis),
outputvis=os.path.basename(self.outputvis))
return s
def __repr__(self):
return 'MstranformResults({}, {})'.format(os.path.basename(self.vis), os.path.basename(self.outputvis))
FLAGGING_TEMPLATE_HEADER = '''#
# ___TITLESTR___
#
# Examples
# Note: Do not put spaces inside the reason string !
#
# mode='manual' correlation='YY' antenna='DV01;DV08;DA43;DA48&DV23' spw='21:1920~2880' autocorr=False reason='bad_channels'
# mode='manual' spw='25:0~3;122~127' reason='stage8_2'
# mode='manual' antenna='DV07' timerange='2013/01/31/08:09:55.248~2013/01/31/08:10:01.296' reason='quack'
#
'''