Source code for pipeline.hif.tasks.rawflagchans.rawflagchans

import os

import numpy as np

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.h.tasks.common import commonhelpermethods
from pipeline.h.tasks.common import commonresultobjects
from pipeline.h.tasks.common import viewflaggers
from pipeline.h.tasks.flagging.flagdatasetter import FlagdataSetter
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry
from .resultobjects import RawflagchansResults, RawflagchansDataResults, RawflagchansViewResults
import pipeline.infrastructure.sessionutils as sessionutils

__all__ = [
    'Rawflagchans',
    'RawflagchansInputs'
]

LOG = infrastructure.get_logger(__name__)


class RawflagchansInputs(vdp.StandardInputs):
    """
    RawflagchansInputs defines the inputs for the Rawflagchans pipeline task.
    """
    fbq_antenna_frac_limit = vdp.VisDependentProperty(default=0.2)
    fbq_baseline_frac_limit = vdp.VisDependentProperty(default=1.0)
    fbq_hilo_limit = vdp.VisDependentProperty(default=8.0)
    flag_bad_quadrant = vdp.VisDependentProperty(default=True)
    flag_hilo = vdp.VisDependentProperty(default=True)
    fhl_limit = vdp.VisDependentProperty(default=20.0)
    fhl_minsample = vdp.VisDependentProperty(default=5)

    @vdp.VisDependentProperty
    def intent(self):
        # if the spw was set, look to see which intents were observed in that
        # spectral window and return the intent based on our order of
        # preference: BANDPASS
        preferred_intents = ('BANDPASS',)
        if self.spw:
            for spw in self.ms.get_spectral_windows(self.spw):
                for intent in preferred_intents:
                    if intent in spw.intents:
                        if intent != preferred_intents[0]:
                            LOG.warning('%s spw %s: %s not present, %s used instead' %
                                        (os.path.basename(self.vis), spw.id,
                                         preferred_intents[0], intent))
                        return intent

        # spw was not set, so look through the spectral windows
        for intent in preferred_intents:
            intentok = True
            for spw in self.ms.spectral_windows:
                if intent not in spw.intents:
                    intentok = False
                    break
                if not intentok:
                    break
            if intentok:
                if intent != preferred_intents[0]:
                    LOG.warning('%s %s: %s not present, %s used instead' %
                                (os.path.basename(self.vis), spw.id, preferred_intents[0],
                                 intent))
                return intent

        # As a fallback, return 'BANDPASS' as the intent.
        return 'BANDPASS'

    niter = vdp.VisDependentProperty(default=1)

    @vdp.VisDependentProperty
    def spw(self):
        science_spws = self.ms.get_spectral_windows(with_channels=True)
        return ','.join([str(spw.id) for spw in science_spws])

    parallel = sessionutils.parallel_inputs_impl(default=False)

    # docstring and type hints: supplements hif_rawflagchans
    def __init__(self, context, output_dir=None, vis=None, spw=None, intent=None, flag_hilo=None,
                 fhl_limit=None, fhl_minsample=None, flag_bad_quadrant=None, fbq_hilo_limit=None,
                 fbq_antenna_frac_limit=None, fbq_baseline_frac_limit=None, niter=None, parallel=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            output_dir: Output directory.
                Defaults to None, which corresponds to the current working directory.

            vis: List of input MeasurementSets. default: [] - Use the MeasurementSets currently known to the pipeline
                context.

            spw: The list of spectral windows and channels to which the calibration will be applied. Defaults to all science windows in the pipeline
                context.

                Example: spw='17', spw='11, 15'

            intent: A string containing the list of intents to be checked for antennas with deviant gains. The default is blank, which causes the task to select
                the 'BANDPASS' intent.

                Example: intent='`*BANDPASS*`'

            flag_hilo: True to flag channel/baseline data further from the view median than fhl_limit * MAD.

            fhl_limit: If flag_hilo is True then flag channel/baseline data further from the view median than fhl_limit * MAD.

            fhl_minsample: Do no flagging if the view median and MAD are derived from fewer than fhl_minsample view pixels.

            flag_bad_quadrant: True to search for and flag bad antenna quadrants and baseline quadrants. Here a /'quadrant/' is one
                quarter of the channel axis.

            fbq_hilo_limit: If flag_bad_quadrant is True then channel/baselines further from the view median than fbq_hilo_limit * MAD will be noted as
                'suspect'. If there are enough of them to indicate that an antenna or
                baseline quadrant is bad then all channel/baselines in that quadrant will
                be flagged.

            fbq_antenna_frac_limit: If flag_bad_quadrant is True and the fraction of suspect channel/baselines in a particular antenna/quadrant exceeds
                fbq_antenna_frac_limit then all data for that antenna/quadrant will
                be flagged.

            fbq_baseline_frac_limit: If flag_bad_quadrant is True and the fraction of suspect channel/baselines in a particular baseline/quadrant exceeds
                fbq_baseline_frac_limit then all data for that baseline/quadrant will
                be flagged.

            niter:

            parallel: Process multiple MeasurementSets in parallel using the casampi parallelization framework.

                Options: ``'automatic'``, ``'true'``, ``'false'``, ``True``, ``False``

                Default: ``None`` (equivalent to ``False``)
        """
        super().__init__()

        # pipeline inputs
        self.context = context
        # vis must be set first, as other properties may depend on it
        self.vis = vis
        self.output_dir = output_dir

        # data selection arguments
        self.spw = spw
        self.intent = intent

        # flagging parameters
        self.flag_hilo = flag_hilo
        self.fhl_limit = fhl_limit
        self.fhl_minsample = fhl_minsample
        self.flag_bad_quadrant = flag_bad_quadrant
        self.fbq_hilo_limit = fbq_hilo_limit
        self.fbq_antenna_frac_limit = fbq_antenna_frac_limit
        self.fbq_baseline_frac_limit = fbq_baseline_frac_limit
        self.niter = niter
        self.parallel = parallel


class SerialRawflagchans(basetask.StandardTaskTemplate):
    Inputs = RawflagchansInputs

    def prepare(self):
        inputs = self.inputs

        # Initialize result and store vis in result
        result = RawflagchansResults()
        result.vis = inputs.vis

        # Construct the task that will read the data.
        datainputs = RawflagchansDataInputs(
            context=inputs.context, vis=inputs.vis, spw=inputs.spw,
            intent=inputs.intent)
        datatask = RawflagchansData(datainputs)

        # Construct the generator that will create the view of the data
        # that is the basis for flagging.
        viewtask = RawflagchansView()

        # Construct the task that will set any flags raised in the
        # underlying data.
        flagsetterinputs = FlagdataSetter.Inputs(
            context=inputs.context,
            vis=inputs.vis, table=inputs.vis, inpfile=[])
        flagsettertask = FlagdataSetter(flagsetterinputs)

        # Define which type of flagger to use.
        flagger = viewflaggers.MatrixFlagger

        # Translate the input flagging parameters to a more compact
        # list of rules.
        rules = flagger.make_flag_rules(
            flag_hilo=inputs.flag_hilo, fhl_limit=inputs.fhl_limit,
            fhl_minsample=inputs.fhl_minsample,
            flag_bad_quadrant=inputs.flag_bad_quadrant,
            fbq_hilo_limit=inputs.fbq_hilo_limit,
            fbq_antenna_frac_limit=inputs.fbq_antenna_frac_limit,
            fbq_baseline_frac_limit=inputs.fbq_baseline_frac_limit)

        # Construct the flagger task around the data view task and the
        # flagger task.
        flaggerinputs = flagger.Inputs(
            context=inputs.context, output_dir=inputs.output_dir,
            vis=inputs.vis, datatask=datatask, viewtask=viewtask,
            flagsettertask=flagsettertask, rules=rules, niter=inputs.niter,
            iter_datatask=False)
        flaggertask = flagger(flaggerinputs)

        # Execute the flagger task.
        flaggerresult = self._executor.execute(flaggertask)

        # Import views, flags, and "measurement set or caltable to flag"
        # into final result.
        result.importfrom(flaggerresult)

        # Copy flagging summaries to final result, and update
        # names to match expectations by renderer.
        result.summaries = flaggerresult.summaries
        result.summaries[0]['name'] = 'before'
        result.summaries[-1]['name'] = 'after'

        return result

    def analyse(self, result):
        return result


[docs] @task_registry.set_equivalent_casa_task('hif_rawflagchans') class Rawflagchans(sessionutils.ParallelTemplate): Inputs = RawflagchansInputs Task = SerialRawflagchans
class RawflagchansDataInputs(vdp.StandardInputs): def __init__(self, context, vis=None, spw=None, intent=None): """ RawflagchansDataInputs defines the inputs for the RawflagchansData task. Keyword arguments: spw -- views are created for these spws. intent -- views are created for this intent. """ super().__init__() # pipeline inputs self.context = context # vis must be set first, as other properties may depend on it self.vis = vis # data selection arguments self.intent = intent self.spw = spw class RawflagchansData(basetask.StandardTaskTemplate): Inputs = RawflagchansDataInputs def __init__(self, inputs): super().__init__(inputs) def prepare(self): # Initialize result structure result = RawflagchansDataResults() result.data = {} result.intent = self.inputs.intent result.new = True # Take vis from inputs, and add to result. result.table = self.inputs.vis LOG.info('Reading flagging view data for vis {0}'.format( self.inputs.vis)) # Get the MS object. ms = self.inputs.context.observing_run.get_ms(name=self.inputs.vis) # Get antenna names, ids, store in result. _, antenna_ids = commonhelpermethods.get_antenna_names(ms) ants = np.array(antenna_ids) # Create a list of antenna baselines. baselines = [] for ant1 in ants: for ant2 in ants: baselines.append('%s&%s' % (ant1, ant2)) nbaselines = len(baselines) # Create a separate flagging view for each spw. for spwid in map(int, self.inputs.spw.split(',')): LOG.info('Reading flagging view data for spw {0}'.format(spwid)) # Get the correlation products. corrs = commonhelpermethods.get_corr_products(ms, spwid) # Get the spw object from the MS and the number of channels # within the spw. spw = ms.get_spectral_window(spwid) nchans = spw.num_channels # Initialize the data, and number of data points # used, for the flagging view. data = np.zeros([len(corrs), nchans, nbaselines], complex) ndata = np.zeros([len(corrs), nchans, nbaselines], int) # Open MS and read in required data. with casa_tools.MSReader(ms.name) as openms: try: openms.msselect({'scanintent': '*%s*' % self.inputs.intent, 'spw': str(spwid)}) except: LOG.warning('Unable to compute flagging view for spw %s' % spwid) openms.close() # Continue to next spw. continue # Read in the MS data in chunks of limited number of rows at a time. openms.iterinit(maxrows=500) openms.iterorigin() iterating = True while iterating: rec = openms.getdata( ['data', 'flag', 'antenna1', 'antenna2']) if 'data' not in rec: break for row in range(np.shape(rec['data'])[2]): ant1 = rec['antenna1'][row] ant2 = rec['antenna2'][row] if ant1 == ant2: continue baseline1 = ant1 * (ants[-1] + 1) + ant2 baseline2 = ant2 * (ants[-1] + 1) + ant1 for icorr in range(len(corrs)): data[icorr, :, baseline1][ rec['flag'][icorr, :, row] == False]\ += rec['data'][icorr, :, row][ rec['flag'][icorr, :, row] == False] ndata[icorr, :, baseline1][ rec['flag'][icorr, :, row] == False] += 1 data[icorr, :, baseline2][ rec['flag'][icorr, :, row] == False]\ += rec['data'][icorr, :, row][ rec['flag'][icorr, :, row] == False] ndata[icorr, :, baseline2][ rec['flag'][icorr, :, row] == False] += 1 iterating = openms.iternext() # Calculate the average data value, suppress any divide by 0 # error messages (RuntimeWarning). with np.errstate(divide='ignore', invalid='ignore'): data /= ndata # Take the absolute value of the data points. data = np.abs(data) # Set flagging state to "True" wherever no data points were available. flag = ndata == 0 # Store data and related information for this spwid. result.data[spwid] = { 'data': data, 'flag': flag, 'baselines': baselines, 'nchans': nchans, 'corrs': corrs } return result def analyse(self, result): return result class RawflagchansView: def __init__(self): """ Creates an RawflagchansView instance. """ # Initialize result structure. By initializing here, # it is ensured that repeated calls to this instance will # have access to all previously generated results, # which is used to "refine" the data view in case the # incoming data task result is not new. self.result = RawflagchansViewResults() def __call__(self, dataresult): """ When called, the RawflagchansView object calculates flagging views for the vis / table provided by RawflagchansDataResults. dataresult -- RawflagchansDataResults object. Returns: RawflagchansViewResults object containing the flagging view. """ # Take vis from data task results, and add to result. self.result.vis = dataresult.table LOG.info('Computing flagging metrics for vis {0}'.format( self.result.vis)) # Check if dataresult is new for current iteration, or created during # a previous iteration. if dataresult.new: # If the dataresult is from a newly run datatask, then # create the flagging view. # Get intent from dataresult intent = dataresult.intent # The dataresult should have stored separate results # for each spwid it could find data for in the MS. # Create a separate flagging view for each spw. for spwid, spwdata in dataresult.data.items(): LOG.info('Calculating flagging view for spw %s' % spwid) # Create axes for flagging view. axes = [ commonresultobjects.ResultAxis( name='channels', units='', data=np.arange(spwdata['nchans'])), commonresultobjects.ResultAxis( name='Baseline', units='', data=np.array(spwdata['baselines']), channel_width=1) ] # From the data array, create a view for each polarisation. for icorr, corrlist in enumerate(spwdata['corrs']): corr = corrlist[0] self.refine_view(spwdata['data'][icorr], spwdata['flag'][icorr]) viewresult = commonresultobjects.ImageResult( filename=self.result.vis, data=spwdata['data'][icorr], flag=spwdata['flag'][icorr], axes=axes, datatype='Mean amplitude', spw=spwid, pol=corr, intent=intent) # add the view results to the result structure self.result.addview(viewresult.description, viewresult) else: # If the dataresult is from a previously run datatask, then # the datatask is not being iterated, and instead the # flagging view will be created as a refinement from an # earlier flagging view. # Check in the result structure stored in this instance of RawflagchansView # for the presence of previous result descriptions. prev_descriptions = self.result.descriptions() if prev_descriptions: # If result descriptions are present, then this instance was called # before, and views were already calculated previously. # Since the current view will be very similar to the last, it is # approximated to be identical, which saves having to recalculate. for description in prev_descriptions: # Get a deep copy of the last result belonging to the description. copy_of_prev_result = self.result.last(description) # re-refine the data to take into account possible # new flagging self.refine_view(copy_of_prev_result.data, copy_of_prev_result.flag) # Store the refined view as the new view in the result structure. self.result.addview(description, copy_of_prev_result) else: LOG.error('Not iterating datatask, but no previous flagging ' ' views available to create a refined view from. ' 'Cannot create flagging views.') return self.result @staticmethod def refine_view(data, flag): """Refine the data view by subtracting the median from each baseline row, then subtracting the median from each channel column. """ for baseline in range(np.shape(data)[1]): valid = data[:, baseline][flag[:, baseline] == False] if len(valid): data[:, baseline] -= np.median(valid) for chan in range(np.shape(data)[0]): valid = data[chan, :][flag[chan, :] == False] if len(valid): data[chan, :] -= np.median(valid)