Source code for pipeline.hif.tasks.makermsimages.makermsimages

import os

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.daskhelpers as daskhelpers
import pipeline.infrastructure.imagelibrary as imagelibrary
import pipeline.infrastructure.mpihelpers as mpihelpers
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.domain import DataType
from pipeline.infrastructure import casa_tasks, task_registry

LOG = infrastructure.get_logger(__name__)


class MakermsimagesResults(basetask.Results):
    def __init__(self, rmsimagelist=None, rmsimagenames=None):
        super().__init__()

        if rmsimagelist is None:
            rmsimagelist = []
        if rmsimagenames is None:
            rmsimagenames = []

        self.rmsimagelist = rmsimagelist[:]
        self.rmsimagenames = rmsimagenames[:]

    def merge_with_context(self, context):
        """
        See :method:`~pipeline.infrastructure.api.Results.merge_with_context`
        """

        # rmsimagelist is a list of dictionaries
        # Use the same format and information from sciimlist, save for the image name and image plot
        for rmsitem in self.rmsimagelist:
            try:
                imageitem = imagelibrary.ImageItem(
                    imagename=rmsitem['imagename'] + '.rms', sourcename=rmsitem['sourcename'],
                    spwlist=rmsitem['spwlist'], specmode=rmsitem['specmode'],
                    sourcetype=rmsitem['sourcetype'],
                    stokes=rmsitem['stokes'],
                    datatype=rmsitem['datatype'],
                    multiterm=rmsitem['multiterm'],
                    metadata=rmsitem['metadata'],
                    imageplot=rmsitem['imageplot'])
                if 'TARGET' in rmsitem['sourcetype']:
                    context.rmsimlist.add_item(imageitem)
            except:
                pass

    def __repr__(self):
        return 'MakermsimagesResults:'


class MakermsimagesInputs(vdp.StandardInputs):
    # Search order of input vis
    processing_data_type = [DataType.REGCAL_CONTLINE_ALL, DataType.RAW]

    # docstring and type hints: supplements hif_makermsimages
    def __init__(self, context, vis=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            vis: List of visibility data files. These may be ASDMs, tar files of ASDMs, MSs, or tar files of MSs, If ASDM files are specified, they will be
                converted  to MS format.

                Example: ``vis=['X227.ms', 'asdms.tar.gz']``

        """
        super().__init__()
        # set the properties to the values given as input arguments
        self.context = context
        self.vis = vis


[docs] @task_registry.set_equivalent_casa_task('hif_makermsimages') class Makermsimages(basetask.StandardTaskTemplate): Inputs = MakermsimagesInputs is_multi_vis_task = True
[docs] def prepare(self): imlist = self.inputs.context.sciimlist.get_imlist() imagenames = [] for imageitem in imlist: if imageitem['multiterm']: imagenames.extend(utils.glob_ordered(imageitem['imagename'] + '.pbcor.tt0')) imagenames.extend(utils.glob_ordered(imageitem['imagename'] + '.pbcor.tt1')) else: imagenames.extend(utils.glob_ordered(imageitem['imagename'] + '.pbcor')) tier0_imdev_enabled = True rmsimagenames = [] queued_job_rmsimagename = [] for imagename in imagenames: rmsimagename = imagename + '.rms' if not os.path.exists(rmsimagename) and 'residual' not in imagename: LOG.info(f"Generating RMS image {rmsimagename} from {imagename}") job_to_execute = casa_tasks.imdev(**self._get_imdev_args(imagename)) if tier0_imdev_enabled and daskhelpers.is_dask_ready(): executable = mpihelpers.Tier0JobRequest( casa_tasks.imdev, job_to_execute.kw, executor=self._executor) queued_job = daskhelpers.FutureTask(executable) elif tier0_imdev_enabled and mpihelpers.is_mpi_ready(): executable = mpihelpers.Tier0JobRequest( casa_tasks.imdev, job_to_execute.kw, executor=self._executor) queued_job = mpihelpers.AsyncTask(executable) else: queued_job = mpihelpers.SyncTask(job_to_execute, self._executor) queued_job_rmsimagename.append((queued_job, rmsimagename)) for queue_job, rmsimagename in queued_job_rmsimagename: queue_job.get_result() if os.path.exists(rmsimagename): rmsimagenames.append(rmsimagename) LOG.info("RMS image list: " + ','.join(rmsimagenames)) return MakermsimagesResults(rmsimagelist=imlist, rmsimagenames=rmsimagenames)
[docs] def analyse(self, results): return results
def _get_imdev_args(self, imagename): """Get default CASA/imdev parameters.""" imdevparams = {'imagename': imagename, 'outfile': imagename + '.rms', 'region': "", 'box': "", 'chans': "", 'stokes': "", 'mask': "", 'overwrite': True, 'stretch': False, 'grid': [10, 10], 'anchor': "ref", 'xlength': "60arcsec", 'ylength': "60arcsec", 'interp': "cubic", 'stattype': "xmadm", 'statalg': "chauvenet", 'zscore': -1, 'maxiter': -1 } return imdevparams def _do_imdev(self, imagename): # Quicklook parameters imdevparams = {'imagename': imagename, 'outfile': imagename + '.rms', 'region': "", 'box': "", 'chans': "", 'stokes': "", 'mask': "", 'overwrite': True, 'stretch': False, 'grid': [10, 10], 'anchor': "ref", 'xlength': "60arcsec", 'ylength': "60arcsec", 'interp': "cubic", 'stattype': "xmadm", 'statalg': "chauvenet", 'zscore': -1, 'maxiter': -1 } task = casa_tasks.imdev(**imdevparams) return self._executor.execute(task)