Source code for pipeline.hifv.tasks.mstransform.mstransform

import os
import traceback

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.tablereader as tablereader
import pipeline.infrastructure.vdp as vdp
from pipeline.domain import DataType
from pipeline.hif.tasks.mstransform import mstransform as mst
from pipeline.infrastructure import casa_tasks, task_registry

LOG = infrastructure.get_logger(__name__)


class VlaMstransformInputs(mst.MstransformInputs):

    @vdp.VisDependentProperty
    def outputvis(self):
        vis_root = os.path.splitext(self.vis)[0]
        return vis_root + '_targets_cont.ms'

    @vdp.VisDependentProperty
    def outputvis_for_line(self):
        vis_root = os.path.splitext(self.vis)[0]
        return vis_root + '_targets.ms'

    spw_line = vdp.VisDependentProperty(default='')
    omit_contline_ms = vdp.VisDependentProperty(default=False)

    # docstring and type hints: supplements hifv_mstransform
    def __init__(self, context, output_dir=None, vis=None, outputvis=None, field=None, intent=None, spw=None,
                 spw_line=None, chanbin=None, timebin=None, outputvis_for_line=None, omit_contline_ms=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            output_dir: Output directory.
                Defaults to None, which corresponds to the current working directory.

            vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the hifv_importdata task.
                '': use all MeasurementSets in the context

                Examples: ``'ngc5921.ms'``, ``['ngc5921a.ms', ngc5921b.ms', 'ngc5921c.ms']``

            outputvis: A list of output MeasurementSets that will contain the transformed and flagged data 
                for continuum imaging. This list must have the same length as the input list.
                
                Default Naming: By default, an input MS named `<msrootname>.ms`
                will produce an output named `<msrootname>_targets_cont.ms`.

                Examples:
                    - ``outputvis='ngc5921_targets_cont.ms'``
                    - ``outputvis=['ngc5921a_targets_cont.ms', 'ngc5921b_targets_cont.ms', 'ngc5921c_targets_cont.ms']``

            field: Select fields name(s) or id(s) to transform. Only fields with data matching the intent will be selected.

                Examples: ``'3C279'``, ``'Centaurus*'``, ``'3C279,J1427-421'``

            intent: Select intents for which associated fields will be imaged. By default only TARGET data is selected.

                Examples: ``'PHASE,BANDPASS'``

            spw: Select spectral window/channels to include for continuum imaging. By default all science spws for which the specified intent is valid are
                selected.

            spw_line: Select spectral window/channels to include for line imaging. If specified, these will override the default, which
                is to use the spws identified as specline_windows in hifv_importdata
                or hifv_restoredata.

            chanbin: Width (bin) of input channels to average to form an output channel. If chanbin > 1 then chanaverage is automatically
                switched to True.

            timebin: Bin width for time averaging. If timebin > 0s then timeaverage is automatically switched to True.

            outputvis_for_line: A list of output MeasurementSets for line detection
                and imaging, created without RFI flagging. This list must have
                the same length as the input list.
                
                Default Naming: By default, an input MS named `<msrootname>.ms`
                will produce an output named `<msrootname>_targets.ms`.

                Examples:
                    - ``outputvis_for_line='ngc5921_targets.ms'``
                    - ``outputvis_for_line=['ngc5921a_targets.ms', 'ngc5921b_targets.ms', 'ngc5921c_targets.ms']``
            
            omit_contline_ms: If ``True``, don't make the contline ms (_targets.ms). Only make cont MS (_targets_cont.ms). Default is ``False``.

        """
        super().__init__(context, output_dir, vis, outputvis, field, intent, spw, chanbin, timebin)
        self.spw_line = spw_line
        self.outputvis = outputvis
        self.outputvis_for_line = outputvis_for_line
        self.omit_contline_ms = omit_contline_ms


[docs] @task_registry.set_equivalent_casa_task('hifv_mstransform') class VlaMstransform(mst.SerialMstransform): Inputs = VlaMstransformInputs
[docs] def prepare(self): inputs = self.inputs # Run CASA task to create the output MS for continuum data mstransform_args = inputs.to_casa_args() # Remove input member variables that don't belong as input to the mstransform task mstransform_args.pop('outputvis_for_line', None) mstransform_args.pop('spw_line', None) mstransform_args.pop('omit_contline_ms', None) mstransform_args.pop('parallel', None) mstransform_job = casa_tasks.mstransform(**mstransform_args) try: self._executor.execute(mstransform_job) except OSError as ee: LOG.warning(f"Caught mstransform exception: {ee}") # Copy across requisite XML files. mst.SerialMstransform._copy_xml_files(inputs.vis, inputs.outputvis) # Update ms history. mst.SerialMstransform._update_history(inputs.vis, inputs.outputvis) if not self.inputs.omit_contline_ms: # Create output MS for line data (_target.ms) self._create_targets_ms(inputs, mstransform_args) # Create the results structure result = VlaMstransformResults(vis=inputs.vis, outputvis=inputs.outputvis, outputvis_for_line=inputs.outputvis_for_line) return result
[docs] def analyse(self, result): # Check for existence of the output vis. if not os.path.exists(result.outputvis): LOG.debug('Could not create science targets cont+line MS for continuum: %s' % (os.path.basename(result.outputvis))) return result # Check for existence of the output vis for line processing. if not os.path.exists(result.outputvis_for_line): LOG.info('Did not create science targets cont+line MS for line imaging: %s. Subsequent stages will not do line imaging.' % (os.path.basename(result.outputvis_for_line))) # Import the new measurement sets. try: to_import = os.path.relpath(result.outputvis) self._import_new_ms(result, to_import, datatype=DataType.REGCAL_CONT_SCIENCE) if os.path.exists(result.outputvis_for_line): to_import_for_line = os.path.relpath(result.outputvis_for_line) self._import_new_ms(result, to_import_for_line, datatype=DataType.REGCAL_CONTLINE_SCIENCE) except Exception: traceback.print_exc() msg = "Failed to import new measurement sets." raise Exception(msg) return result
def _create_targets_ms(self, inputs, mstransform_args) -> bool: """Create _targets.ms for line imaging. This will be created if pre-RFI flags exist and there are spectral lines in any spws. Returns True if targets.ms was created, else False. """ produce_lines_ms = False # Split off non-RFI flagged target data # The main goal is to get an MS with the same shape as the _target.ms to # get the flags for non-RFI flagged data # Identify flags from before RFI flagging was applied pre_rfi_flagversion_name = None flags_list_task = casa_tasks.flagmanager(vis=inputs.vis, mode="list") flags_dict = self._executor.execute(flags_list_task) for value in flags_dict.values(): if 'name' in value: if 'hifv_checkflag_target-vla' in value['name']: pre_rfi_flagversion_name = value['name'] if pre_rfi_flagversion_name is None: msg = "For {}: could not locate the pre-RFI flags to restore, so no targets.ms file will be created".format(inputs.vis) LOG.warning(msg) return False # Restore flags from before RFI flagging was applied task = casa_tasks.flagmanager(vis=inputs.vis, mode='restore', versionname=pre_rfi_flagversion_name) self._executor.execute(task) # Run CASA task to create the output MS for the line data mstransform_args['outputvis'] = inputs.outputvis_for_line if inputs.spw_line: mstransform_args['spw'] = inputs.spw_line produce_lines_ms = True else: # check to see if any spws have been identified as spectral lines for this MS specline_spws = [] for spw in inputs.ms.get_spectral_windows(science_windows_only=True): if spw.specline_window: specline_spws.append(spw) if specline_spws: mstransform_args['spw'] = ','.join([str(spw.id) for spw in specline_spws]) produce_lines_ms = True if produce_lines_ms: mstransform_job = casa_tasks.mstransform(**mstransform_args) try: self._executor.execute(mstransform_job) except OSError as ee: LOG.warning(f"Caught mstransform exception: {ee}") # Save flags from line MS without rfi flagging task = casa_tasks.flagmanager(vis=inputs.outputvis_for_line, mode='save', versionname=pre_rfi_flagversion_name) self._executor.execute(task) # Copy across requisite XML files. mst.SerialMstransform._copy_xml_files(inputs.vis, inputs.outputvis_for_line) # Update ms history mst.SerialMstransform._update_history(inputs.vis, inputs.outputvis_for_line) # Restore RFI flags to main MS task = casa_tasks.flagmanager(vis=inputs.vis, mode='restore', versionname='rfi_flagged_statwt') self._executor.execute(task) return produce_lines_ms def _import_new_ms(self, result, to_import, datatype): observing_run = tablereader.ObservingRunReader.get_observing_run(to_import) # Adopt same session as source measurement set for ms in observing_run.measurement_sets: LOG.debug('Setting session to %s for %s', self.inputs.ms.session, ms.basename) ms.session = self.inputs.ms.session LOG.debug('Setting data_column and origin_ms.') ms.origin_ms = self.inputs.ms.origin_ms ms.set_data_column(datatype, 'DATA') # Propagate spectral line spw designation from source MS for spw in ms.get_all_spectral_windows(): spw.specline_window = self.inputs.ms.get_spectral_window(spw.id).specline_window result.mses.extend(observing_run.measurement_sets)
class VlaMstransformResults(mst.MstransformResults): def __init__(self, vis, outputvis, outputvis_for_line): super().__init__(vis, outputvis) self.outputvis_for_line = outputvis_for_line def merge_with_context(self, context): # Check for an output vis if not self.mses: LOG.error('No hif_mstransform results to merge') return target = context.observing_run # Adding mses to context for ms in self.mses: LOG.info('Adding {} to context'.format(ms.name)) target.add_measurement_set(ms) # Create targets flagging template file if it does not already exist for ms in self.mses: template_flagsfile = os.path.join( self.inputs['output_dir'], os.path.splitext(os.path.basename(self.vis))[0] + '.flagtargetstemplate.txt') self._make_template_flagfile(template_flagsfile, 'User flagging commands file for the imaging pipeline') # Initialize callibrary for ms in self.mses: calto = callibrary.CalTo(vis=ms.name) LOG.info('Registering {} with callibrary'.format(ms.name)) context.callibrary.add(calto, []) def __str__(self): # Format the Mstransform results. s = 'VlaMstransformResults:\n' s += '\tOriginal MS {vis} transformed to {outputvis} and {outputvis_for_line} \n'.format( vis=os.path.basename(self.vis), outputvis=os.path.basename(self.outputvis), outputvis_for_line=os.path.basename(self.outputvis_for_line)) return s def __repr__(self): return 'VlaMstranformResults({}, {} + {})'.format(os.path.basename(self.vis), os.path.basename(self.outputvis), os.path.basename(self.outputvis_for_line)) FLAGGING_TEMPLATE_HEADER = '''# # ___TITLESTR___ # # Examples # Note: Do not put spaces inside the reason string ! # # mode='manual' correlation='YY' antenna='DV01;DV08;DA43;DA48&DV23' spw='21:1920~2880' autocorr=False reason='bad_channels' # mode='manual' spw='25:0~3;122~127' reason='stage8_2' # mode='manual' antenna='DV07' timerange='2013/01/31/08:09:55.248~2013/01/31/08:10:01.296' reason='quack' # '''