Source code for pipeline.hifa.tasks.flagging.flagtargetsalma

import os
import string

from casatasks.private import flaghelper

import pipeline.infrastructure as infrastructure
#import pipeline.infrastructure.api as api
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.domain import DataType
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry
from pipeline.infrastructure.filenamer import sanitize_for_ms
import pipeline.infrastructure.sessionutils as sessionutils

# the logger for this module
LOG = infrastructure.get_logger(__name__)

__all__ = [
    'FlagTargetsALMA',
    'FlagTargetsALMAInputs',
    'FlagTargetsALMAResults'
]


"""
Flag ALMA target science target data.
"""


class FlagTargetsALMAInputs(vdp.StandardInputs):
    """Manages the inputs for the FlagTargetsALMA task.

    Attributes:
        context: The pipeline context holding all pipeline state.
        vis: String or list of strings containing the MS name(s) on which to operate.
        output_dir: The directory to which pipeline data should be sent.
        flagbackup: Whether existing flags should be backed up before new flagging begins.
        template: Whether flagging templates are to be applied.
        filetemplate: The filename of the ASCII file that contains the flagging template.
    """
    # Search order of input vis
    processing_data_type = [DataType.REGCAL_CONTLINE_SCIENCE, DataType.REGCAL_CONTLINE_ALL, DataType.RAW]

    flagbackup = vdp.VisDependentProperty(default=False)
    template = vdp.VisDependentProperty(default=True)

    @vdp.VisDependentProperty
    def filetemplate(self):
        vis_root = sanitize_for_ms(self.vis)
        return vis_root + '.flagtargetstemplate.txt'

    @vdp.VisDependentProperty
    def inpfile(self):
        vis_root = sanitize_for_ms(self.vis)
        return os.path.join(self.output_dir, vis_root + '.flagtargetscmds.txt')
    
    parallel = sessionutils.parallel_inputs_impl(default=False)

    # docstring and type hints: supplements hifa_flagtargets
    def __init__(self, context, vis=None, output_dir=None, flagbackup=None, template=None, filetemplate=None, parallel=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            vis: The list of input MeasurementSets. Defaults to the list
                of MeasurementSets defined in the pipeline context.

            output_dir: Output directory.
                Defaults to None, which corresponds to the current working directory.

            flagbackup: Back up any pre-existing flags; defaults to False.

            template: Apply flagging templates; defaults to True.

            filetemplate: The name of a text file that contains the flagging
                template for issues with the science target data etc.
                If the template flags files is undefined a name of the
                form 'msname_flagtargetstemplate.txt' is assumed.

            parallel: Process multiple MeasurementSets in parallel using the casampi parallelization framework.

                Options: ``'automatic'``, ``'true'``, ``'false'``, ``True``, ``False``

                Default: ``None`` (equivalent to ``False``)
        """
        super().__init__()

        # pipeline inputs
        self.context = context
        # vis must be set first, as other properties may depend on it
        self.vis = vis
        self.output_dir = output_dir

        self.flagbackup = flagbackup
        self.template = template
        self.filetemplate = filetemplate
        self.parallel = parallel

    def to_casa_args(self):
        """
        Translate the input parameters of this class to task parameters
        required by the CASA task flagdata. The returned object is a
        dictionary of flagdata arguments as keyword/value pairs.

        :rtype: dict
        """
        return {'vis': self.vis,
                'mode': 'list',
                'action': 'apply',
                'inpfile': self.inpfile,
                'flagbackup': self.flagbackup}


# tell the infrastructure to prefentially apply the targets
# flags to the split MS(s)
#api.ImagingMeasurementSetsPreferred.register(FlagTargetsALMAInputs)


class FlagTargetsALMAResults(basetask.Results):
    def __init__(self, summaries, flagcmds):
        super().__init__()
        self.summaries = summaries
        self._flagcmds = flagcmds

    def flagcmds(self):
        return self._flagcmds

    def merge_with_context(self, context):
        # nothing to do
        pass

    def __repr__(self):
        # Step through the summary list and print a few things.
        # SUBTRACT flag counts from previous agents, because the counts are
        # cumulative.
        s = 'Target flagging results:\n'

        for idx in range(0, len(self.summaries)):
            flagcount = int(self.summaries[idx]['flagged'])
            totalcount = int(self.summaries[idx]['total'])

            # From the second summary onwards, subtract counts from the previous
            # one
            if idx > 0:
                flagcount = flagcount - int(self.summaries[idx-1]['flagged'])

            s += '\tSummary %s (%s) :  Flagged : %s out of %s (%0.2f%%)\n' % (
                    idx, self.summaries[idx]['name'], flagcount, totalcount,
                    100.0*flagcount/totalcount)

        return s



class SerialFlagTargetsALMA(basetask.StandardTaskTemplate):
    """
    FlagTargetsALMA is a class for target flagging. It can perform

    - Template flags

    """

    # link the accompanying inputs to this task
    Inputs = FlagTargetsALMAInputs

    def prepare(self):
        """
        Prepare and execute a flagdata flagging job appropriate to the
        task inputs.

        """
        # create a local alias for inputs, so we're not saying 'self.inputs'
        # everywhere
        inputs = self.inputs

        # get the flagdata command string required for the results
        flag_cmds = self._get_flag_commands()
        flag_str = '\n'.join(flag_cmds)

        # write the flag commands to the file
        with open(inputs.inpfile, 'w') as stream:
            stream.writelines(flag_str)

        # to save inspecting the file, also log the flag commands
        LOG.debug('Flag commands for %s:\n%s', inputs.vis, flag_str)

        # Map the pipeline inputs to a dictionary of CASA task arguments
        task_args = inputs.to_casa_args()

        # create and execute a flagdata job using these task arguments
        job = casa_tasks.flagdata(**task_args)
        summary_dict = self._executor.execute(job)

        agent_summaries = dict((v['name'], v) for v in summary_dict.values())

        ordered_agents = ['before', 'template']

        summary_reps = [agent_summaries[agent]
                        for agent in ordered_agents
                        if agent in agent_summaries]

        # return the results object, which will be used for the weblog
        return FlagTargetsALMAResults(summary_reps, flag_cmds)

    def analyse(self, results):
        """
        Analyse the results of the flagging operation.

        This method does not perform any analysis, so the results object is
        returned exactly as-is, with no data massaging or results items
        added. If additional statistics needed to be calculated based on the
        post-flagging state, this would be a good place to do it.
        """
        return results

    def _get_flag_commands(self):
        """
        Get the flagging commands as a string suitable for flagdata.
        """
        # create a local variable for the inputs associated with this instance
        inputs = self.inputs

        # create list which will hold the flagging commands
        flag_cmds = ["mode='summary' name='before'"]

        # flag template?
        if inputs.template:
            if not os.path.exists(inputs.filetemplate):
                LOG.warning('Template flag file \'%s\' for \'%s\' not found.'
                            % (inputs.filetemplate, inputs.ms.basename))
            else:
                flag_cmds.extend(self._read_flagfile(inputs.filetemplate))
            flag_cmds.append("mode='summary' name='template'")

        return flag_cmds

    @staticmethod
    def _read_flagfile(filename):
        if not os.path.exists(filename):
            LOG.warning('%s does not exist' % filename)
            return []

        # strip out comments and empty lines to leave the real commands.
        # This is so we can compare the number of valid commands to the number
        # of commands specified in the file and complain if they differ
        return [cmd for cmd in flaghelper.readFile(filename)
                if not cmd.strip().startswith('#')
                and not all(c in string.whitespace for c in cmd)]


[docs] @task_registry.set_equivalent_casa_task('hifa_flagtargets') class FlagTargetsALMA(sessionutils.ParallelTemplate): Inputs = FlagTargetsALMAInputs Task = SerialFlagTargetsALMA