Source code for pipeline.h.tasks.mstransform.mssplit

from __future__ import annotations

import os
import shutil
from typing import TYPE_CHECKING

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.tablereader as tablereader
import pipeline.infrastructure.vdp as vdp
from pipeline.domain import DataType
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry

if TYPE_CHECKING:
    from pipeline.domain import MeasurementSet

LOG = infrastructure.get_logger(__name__)

__all__ = [
    'MsSplitInputs',
    'MsSplit',
    'MsSplitResults'
]


# Define the minimum set of parameters required to split out
# the requested data (defined by field, spw, intent) from the
# original MS and optionally average in time and channel.
#
# If replace is True replace the parameter MS with the transformed
# one on disk and in the context

class MsSplitInputs(vdp.StandardInputs):
    chanbin = vdp.VisDependentProperty(default=1)
    datacolumn = vdp.VisDependentProperty(default='data')
    field = vdp.VisDependentProperty(default='')
    intent = vdp.VisDependentProperty(default='')
    replace = vdp.VisDependentProperty(default=True)
    spw = vdp.VisDependentProperty(default='')
    timebin = vdp.VisDependentProperty(default='0s')

    @vdp.VisDependentProperty
    def outputvis(self):
        vis_root = os.path.splitext(self.vis)[0]
        return vis_root + '_split.ms'

    # docstring and type hints: supplements h_mssplit
    def __init__(self, context, vis=None, output_dir=None, outputvis=None, field=None, intent=None, spw=None,
                 datacolumn=None, chanbin=None, timebin=None, replace=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            vis: The list of input MeasurementSets to be transformed. Defaults to the list of MeasurementSets specified in the pipeline import data task.
                default '': Split all MeasurementSets in the context.
                example: 'ngc5921.ms', ['ngc5921a.ms', ngc5921b.ms', 'ngc5921c.ms']

            output_dir: Output directory.
                Defaults to None, which corresponds to the current working directory.

            outputvis: The list of output split MeasurementSets. The output list must be the same length as the input list and the output names must be different
                from the input names.
                default '', The output name defaults to <msrootname>_split.ms
                example: 'ngc5921.ms', ['ngc5921a.ms', ngc5921b.ms', 'ngc5921c.ms']

            field: Set of data selection field names or ids, '' for all

            intent: Select intents to split default: '', All data is selected.
                example: 'TARGET'

            spw: Select spectral windows to split. default: '', All spws are selected
                example: '9', '9,13,15'

            datacolumn: Select spectral windows to split. The standard CASA options are supported.
                example: 'corrected', 'model'

            chanbin: The channel binning factor. 1 for no binning, otherwise 2, 4, 8, or 16.
                example: 2, 4

            timebin: The time binning factor. '0s' for no binning.
                example: '10s' for 10 second binning

            replace: If a split was performed delete the parent MS and remove it from the context.
        """
        super().__init__()

        self.context = context
        self.vis = vis
        self.output_dir = output_dir

        self.outputvis = outputvis
        self.field = field
        self.intent = intent
        self.spw = spw
        self.datacolumn = datacolumn
        self.chanbin = chanbin
        self.timebin = timebin
        self.replace = replace

    def to_casa_args(self):
        d = super().to_casa_args()

        if d['chanbin'] > 1:
            d['chanaverage'] = True
        if d['timebin'] != '0s':
            d['timeaverage'] = True

        # Filter out unwanted parameters
        del d['replace']

        return d


[docs] @task_registry.set_equivalent_casa_task('h_mssplit') @task_registry.set_casa_commands_comment( 'The parent MS is split by field, intent, or spw and/or averaged by channel and time.' ) class MsSplit(basetask.StandardTaskTemplate): Inputs = MsSplitInputs
[docs] def prepare(self): inputs = self.inputs # Test whether or not a split has been requested if inputs.field == '' and inputs.spw == '' and inputs.intent == '' and \ inputs.chanbin == 1 and inputs.timebin == '0s': result = MsSplitResults(vis=inputs.vis, outputvis=inputs.outputvis) LOG.warning('Output MS equals input MS %s' % (os.path.basename(inputs.vis))) return # Split is required so create the results structure result = MsSplitResults(vis=inputs.vis, outputvis=inputs.outputvis) # Run CASA task # Does this need a try / except block mstransform_args = inputs.to_casa_args() mstransform_job = casa_tasks.mstransform(**mstransform_args) self._executor.execute(mstransform_job) return result
[docs] def analyse(self, result): # Check for existence of the output vis. if not os.path.exists(result.outputvis): return result inputs = self.inputs # There seems to be a rerendering issue with replace. Fir now just # remove the old file. if inputs.replace: shutil.rmtree(result.vis) # Import the new MS to_import = os.path.abspath(result.outputvis) observing_run = tablereader.ObservingRunReader.get_observing_run(to_import) # Adopt same session as source measurement set for ms in observing_run.measurement_sets: LOG.debug('Setting session to %s for %s', self.inputs.ms.session, ms.basename) ms.session = self.inputs.ms.session ms.origin_ms = self.inputs.ms.origin_ms self._set_data_column_to_ms(ms) # Note there will be only 1 MS in the temporary observing run structure result.ms = observing_run.measurement_sets[0] return result
def _set_data_column_to_ms(self, msobj: MeasurementSet): """ Set data_column to input MeasurementSet domain object. This method sets data_column information of output MS depending on intent and datacolumn selection of the Inputs class. Args: msobj: MS domain object to set data_column information. """ datacolumn = self.inputs.datacolumn in_column = datacolumn.upper() if datacolumn != 'corrected' else 'CORRECTED_DATA' LOG.debug('in_column = %s' % in_column) #if self.inputs.replace and not os.path.exists(msobj.origin_ms): # # Replace RAW column if original MS was replaced. # data_type = DataType.RAW #el data_type = None if self.inputs.intent == 'TARGET': data_type = DataType.REGCAL_CONTLINE_SCIENCE else: for t, c in self.inputs.ms.data_column.items(): if c == in_column: data_type = t LOG.debug(f'Identified data type {data_type}') break if data_type is None: data_type = DataType.RAW LOG.warning( f'The datatype of the requested datacolumn is unknown, and a fallback value of {data_type} is used.') out_column = in_column if datacolumn != 'corrected' else 'DATA' LOG.info(f'Setting {data_type} to {out_column}') msobj.set_data_column(data_type, out_column)
class MsSplitResults(basetask.Results): def __init__(self, vis, outputvis): super().__init__() self.vis = vis self.outputvis = outputvis self.ms = None def merge_with_context(self, context): # Check for an output vis if not self.ms: LOG.error('No h_mssplit results to merge') return target = context.observing_run parentms = None # The parent MS has been removed. if not os.path.exists(self.vis): for index, ms in enumerate(target.get_measurement_sets()): if ms.name == self.vis: parentms = index break if self.ms: if parentms is not None: LOG.info('Replace {} in context'.format(self.ms.name)) del target.measurement_sets[parentms] target.add_measurement_set(self.ms) else: LOG.info('Adding {} to context'.format(self.ms.name)) target.add_measurement_set(self.ms) def __str__(self): # Format the MsSplit results. s = 'MsSplitResults:\n' s += '\tOriginal MS {vis} transformed to {outputvis}\n'.format( vis=os.path.basename(self.vis), outputvis=os.path.basename(self.outputvis)) return s def __repr__(self): return 'MsSplitResults({}, {})'.format(os.path.basename(self.vis), os.path.basename(self.outputvis))