Source code for pipeline.hif.tasks.transformimagedata.transformimagedata

import os
import shutil

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.tablereader as tablereader
import pipeline.infrastructure.vdp as vdp
from pipeline.domain import DataType
from pipeline.h.tasks.mstransform import mssplit
from pipeline.infrastructure import casa_tasks, casa_tools, task_registry, utils

LOG = infrastructure.get_logger(__name__)


class TransformimagedataResults(basetask.Results):
    def __init__(self, vis, outputvis):
        super().__init__()
        self.vis = vis
        self.outputvis = outputvis
        self.ms = None

    def merge_with_context(self, context):
        # Check for an output vis
        if not self.ms:
            LOG.error('No hif_transformimagedata results to merge')
            return

        obs_run = context.observing_run
        parentms_idx = None
        for index, ms in enumerate(obs_run.get_measurement_sets()):
            if ms.name == self.vis:
                parentms_idx = index
                break

        # "parentms_idx = None" means the input MS is not present in the context
        if parentms_idx is not None:
            LOG.info('Replace %s in context with %s', obs_run.measurement_sets[parentms_idx].name, self.ms.name)
            del obs_run.measurement_sets[parentms_idx]
        else:
            LOG.info('Adding %s to context', self.ms.name)
        obs_run.add_measurement_set(self.ms)

        # Update clean_list_pending with new MS paths
        outvisname = os.path.join(context.output_dir, os.path.basename(self.outputvis))
        for clean_item in context.clean_list_pending:
            for idx, vis in enumerate(clean_item['heuristics'].vislist):
                if vis == self.vis:
                    clean_item['heuristics'].vislist[idx] = self.outputvis
            for ms in clean_item['heuristics'].observing_run.measurement_sets:
                if ms.name == self.vis:
                    ms.name = outvisname
            for idx, vis in enumerate(clean_item['vis']):
                if vis == self.vis:
                    clean_item['vis'][idx] = self.outputvis

    def __str__(self):
        # Format the MsSplit results.
        s = 'Transformimagedata:\n'
        s += '\tOriginal MS {vis} transformed to {outputvis}\n'.format(
            vis=os.path.basename(self.vis),
            outputvis=os.path.basename(self.outputvis))

        return s

    def __repr__(self):
        return 'Transformimagedata({}, {})'.format(os.path.basename(self.vis), os.path.basename(self.outputvis))


class TransformimagedataInputs(mssplit.MsSplitInputs):
    # Search order of input vis
    processing_data_type = [DataType.REGCAL_CONTLINE_ALL, DataType.RAW]

    clear_pointing = vdp.VisDependentProperty(default=True)
    modify_weights = vdp.VisDependentProperty(default=False)
    wtmode = vdp.VisDependentProperty(default='')
    replace = vdp.VisDependentProperty(default=False)
    datacolumn = vdp.VisDependentProperty(default='corrected')

    @vdp.VisDependentProperty
    def outputvis(self):

        output_dir = self.context.output_dir
        if isinstance(self._outputvis, vdp.NullMarker):
            # Need this to be in the working directory
            # vis_root = os.path.splitext(self.vis)[0]
            vis_root = os.path.splitext(os.path.basename(self.vis))[0]
            return os.path.join(output_dir, vis_root + '_split.ms')
        else:
            return os.path.join(output_dir, os.path.basename(self.outputvis))

    @outputvis.convert
    def outputvis(self, value=''):
        return value

    # docstring and type hints: supplements hif_transformimagedata
    def __init__(self, context, vis=None, output_dir=None,
                 outputvis=None, field=None, intent=None, spw=None,
                 datacolumn=None, chanbin=None, timebin=None, replace=None,
                 clear_pointing=None, modify_weights=None, wtmode=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            vis: List of visibility data files. These may be ASDMs, tar files of ASDMs, MSs, or tar files of MSs, If ASDM files are specified, they will be
                converted  to MS format.

                Example: ``vis=['X227.ms', 'asdms.tar.gz']``

            output_dir: Output directory.
                Defaults to None, which corresponds to the current working directory.

            outputvis: The output MeasurementSet.

            field: Set of data selection field names or ids, ``''`` for all.

            intent: Set of data selection intents, ``''`` for all.

            spw: Set of data selection spectral window ids ``''`` for all.

            datacolumn: Select spectral windows to split. The standard CASA options are supported

                Example: ``'data'``, ``'model'``

            chanbin: Bin width for channel averaging.

            timebin: Bin width for time averaging.

            replace: If a split was performed delete the parent MS and remove it from the context. example: True or False

            clear_pointing: Clear the pointing table.

            modify_weights: Re-initialize the weights.

            wtmode: optional weight initialization mode when modify_weights=True

        """
        # set the properties to the values given as input arguments
        self.context = context
        self.vis = vis
        self.output_dir = output_dir

        self.outputvis = outputvis
        self.field = field
        self.intent = intent
        self.spw = spw
        self.datacolumn = datacolumn
        self.chanbin = chanbin
        self.timebin = timebin
        self.replace = replace

        if clear_pointing is not False:
            clear_pointing = True
        self.clear_pointing = clear_pointing

        if modify_weights is not True:
            modify_weights = False
        self.modify_weights = modify_weights

        self.wtmode = wtmode


[docs] @task_registry.set_equivalent_casa_task('hif_transformimagedata') class Transformimagedata(mssplit.MsSplit): Inputs = TransformimagedataInputs
[docs] def prepare(self): inputs = self.inputs # Test whether or not a split has been requested """ if inputs.field == '' and inputs.spw == '' and inputs.intent == '' and \ inputs.chanbin == 1 and inputs.timebin == '0s': result = TransformimagedataResults(vis=inputs.vis, outputvis=inputs.outputvis) LOG.warning('Output MS equals input MS %s' % (os.path.basename(inputs.vis))) return """ # Split is required so create the results structure result = TransformimagedataResults(vis=inputs.vis, outputvis=inputs.outputvis) visfields = [] visspws = [] for imageparam in inputs.context.clean_list_pending: vis_idx = imageparam['vis'].index(inputs.vis) visfields.extend(imageparam['field'][vis_idx].split(',')) visspws.extend(imageparam['spw'].split(',')) visfields = ','.join(utils.deduplicate(visfields)) visspws = ','.join(utils.deduplicate(visspws)) mstransform_args = inputs.to_casa_args() mstransform_args['field'] = visfields mstransform_args['reindex'] = False mstransform_args['spw'] = visspws for dictkey in ('clear_pointing', 'modify_weights', 'wtmode'): try: del mstransform_args[dictkey] except KeyError: pass mstransform_job = casa_tasks.mstransform(**mstransform_args) self._executor.execute(mstransform_job) return result
[docs] def analyse(self, result): # Check for existence of the output vis. if not os.path.exists(result.outputvis): return result inputs = self.inputs # There seems to be a rerendering issue with replace. For now just # remove the old file. if inputs.replace: shutil.rmtree(result.vis) #shutil.move (result.outputvis, result.vis) #result.outputvis = result.vis # Import the new MS rel_to_import = result.outputvis observing_run = tablereader.ObservingRunReader.get_observing_run(rel_to_import) # Adopt same session as source measurement set for ms in observing_run.measurement_sets: LOG.debug('Setting session to %s for %s', self.inputs.ms.session, ms.basename) ms.session = self.inputs.ms.session ms.origin_ms = self.inputs.ms.origin_ms self._set_data_column_to_ms(ms) # Note there will be only 1 MS in the temporary observing run structure result.ms = observing_run.measurement_sets[0] if inputs.clear_pointing: LOG.info('Removing POINTING table from ' + ms.name) with casa_tools.TableReader(ms.name + '/POINTING', nomodify=False) as table: rows = table.rownumbers() table.removerows(rows) if inputs.modify_weights: LOG.info('Re-initializing the weights in ' + ms.name) if inputs.wtmode: task = casa_tasks.initweights(vis=ms.name, wtmode=inputs.wtmode) else: task = casa_tasks.initweights(vis=ms.name) self._executor.execute(task) return result