Source code for pipeline.hifv.tasks.analyzestokescubes.analyzestokescubes

import collections
import copy

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import casa_tools, task_registry

LOG = infrastructure.get_logger(__name__)


class AnalyzestokescubesResults(basetask.Results):
    def __init__(self, stats=None):
        super().__init__()
        self.pipeline_casa_task = 'Analyzestokescubes'
        self.stats = stats

    def merge_with_context(self, context):
        """
        See :method:`~pipeline.infrastructure.api.Results.merge_with_context`
        """
        return

    def __repr__(self):
        #return 'AnalyzestokescubesResults:\n\t{0}'.format(
        #    '\n\t'.join([ms.name for ms in self.mses]))
        return 'AnalyzestokescubesResults:'


class AnalyzestokescubesInputs(vdp.StandardInputs):
    # docstring and type hints: supplements hifv_analysestokescubes
    def __init__(self, context, vis=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the hifv_importdata task.

        """
        self.context = context
        self.vis = vis


[docs] @task_registry.set_equivalent_casa_task('hifv_analyzestokescubes') @task_registry.set_casa_commands_comment('Add your task description for inclusion in casa_commands.log') class Analyzestokescubes(basetask.StandardTaskTemplate): Inputs = AnalyzestokescubesInputs is_multi_vis_task = True
[docs] def prepare(self): LOG.info("Analyzestokescubes is running.") imlist = self.inputs.context.subimlist.get_imlist() # initialize the stats container stats = collections.OrderedDict() roi_stats_default = {'stokesi': [], 'stokesq': [], 'stokesu': [], 'stokesv': [], 'stokesi_rms': [], 'spw': [], 'keep': [], 'rms': [], 'reffreq': [], 'beamarea': [], 'xy': None, 'world': None} roi_list = ['peak_stokesi', 'peak_linpolint'] # add the roi location img_stats = self._get_imstat(imlist) for idx, roi_name in enumerate(roi_list): stats.setdefault(roi_name, copy.deepcopy(roi_stats_default)) stats[roi_name]['xy'] = (img_stats[idx]['maxpos'][0], img_stats[idx]['maxpos'][1]) stats[roi_name]['world'] = img_stats[idx]['maxposf'] # get the roi property and append them into the roi properties for imageitem in imlist: img_name = utils.glob_ordered(imageitem['imagename'].replace('.subim', '.pbcor.tt0.subim'))[0] rms_name = utils.glob_ordered(imageitem['imagename'].replace('.subim', '.pbcor.tt0.rms.subim'))[0] LOG.info(f'Getting properties from {img_name} and {rms_name}') with casa_tools.ImagepolReader(img_name) as imagepol: with casa_tools.ImageReader(rms_name) as image: rms_stats = image.statistics(robust=True, axes=[0, 1])['median'] img_stokesi = imagepol.stokesi() img_stokesq = imagepol.stokesq() img_stokesu = imagepol.stokesu() img_stokesv = imagepol.stokesv() cs = img_stokesi.coordsys() bm = img_stokesi.restoringbeam(polarization=0) beamarea = bm['major']['value']*bm['minor']['value'] for idx, roi_name in enumerate(roi_list): try: npix_halfwidth = 1 blc = [stats[roi_name]['xy'][0]-npix_halfwidth, stats[roi_name]['xy'][1]-npix_halfwidth] trc = [stats[roi_name]['xy'][0]+npix_halfwidth, stats[roi_name]['xy'][1]+npix_halfwidth] rg = casa_tools.regionmanager.box(blc=blc, trc=trc) stokesi_mean = img_stokesi.statistics(robust=False, region=rg)['mean'][0] stokesq_mean = img_stokesq.statistics(robust=False, region=rg)['mean'][0] stokesu_mean = img_stokesu.statistics(robust=False, region=rg)['mean'][0] stokesv_mean = img_stokesv.statistics(robust=False, region=rg)['mean'][0] stokesi_rms = image.statistics(robust=False, region=rg, axes=[0, 1])['mean'][0] stats[roi_name]['spw'].append(imageitem['spwlist']) stats[roi_name]['keep'].append(imageitem['metadata'].get('keep', True)) stats[roi_name]['stokesi'].append(stokesi_mean) stats[roi_name]['stokesq'].append(stokesq_mean) stats[roi_name]['stokesu'].append(stokesu_mean) stats[roi_name]['stokesv'].append(stokesv_mean) stats[roi_name]['stokesi_rms'].append(stokesi_rms) stats[roi_name]['rms'].append(rms_stats) stats[roi_name]['beamarea'].append(beamarea) stats[roi_name]['reffreq'].append(cs.referencevalue(format='n')['numeric'][3]) except Exception as e: LOG.warning( 'Failed to derive the Stokes brightness at the region-of-interest (ROI): {} / spw = {!r}.'.format(roi_name, imageitem['spwlist'])) return AnalyzestokescubesResults(stats=stats)
[docs] def analyse(self, results): return results
def _get_imstat(self, imlist): """Identify the image with lowest ref. frequency and measure its 'maxpos'. See the requirement in PIPE-1356. """ frequency_list = [] imagename_list = [] for imageitem in imlist: img_name = utils.glob_ordered(imageitem['imagename'].replace('.subim', '.pbcor.tt0.subim'))[0] imagename_list.append(img_name) with casa_tools.ImageReader(img_name) as image: frequency_list.append(image.coordsys().referencevalue( format='q', type='spectral')['quantity']['*1']['value']) idx_freqlow = frequency_list.index(min(frequency_list)) idx_freqhigh = frequency_list.index(max(frequency_list)) imagename_freqlow = imagename_list[idx_freqlow] imagename_freqhigh = imagename_list[idx_freqhigh] LOG.info( f'{imagename_freqlow} has the lowest reference frequency at {frequency_list[idx_freqlow]} Hz.') LOG.info( f'{imagename_freqhigh} has the highest reference frequency at {frequency_list[idx_freqhigh]} Hz.') mask_lel = self._get_mask(imagename_freqhigh) # do the peak search in the pbcor subimage at the lowest frequency LOG.info(f'Searching the intensity peaks using mask={mask_lel!r} in {imagename_freqlow}') with casa_tools.ImagepolReader(imagename_freqlow) as imagepol: img_stokesi = imagepol.stokesi() stokesi_stats = img_stokesi.statistics(robust=False, mask=mask_lel, stretch=True) LOG.info('Found the Stokes-I intensity peak at {maxpos} / {maxposf}'.format(**stokesi_stats)) # stokesi_stats['maxradec_str'] = img_stokesi.coordsys().toworld([maxposx, maxposy], format='s')['string'] img_linpolint = imagepol.linpolint() linpolint_stats = img_linpolint.statistics(robust=False, mask=mask_lel, stretch=True) LOG.info('Found the LinearPol intensity peak at {maxpos} / {maxposf}'.format(**linpolint_stats)) return (stokesi_stats, linpolint_stats) def _get_mask(self, imagename_pbcor_subim): """Generate a mask LEL based on the specified pbcor subimage name.""" with casa_tools.ImageReader(imagename_pbcor_subim) as image: subim_cs = image.coordsys() subim_shape = image.shape() # Here we borrow .mask images from tclean() to restrict the search region. # for vlass-se-cube or vlass-se-cont: # iter2.mask/iter2.cleanmask is generated from vlassmasking (tier2-combined) # iter3.mask is from tclean(pbmask=0.4,mask='pb',...) # As below, we use the name pattern to retreive the .mask images in their original dimension # and make their subimages on-demand for the LEL expression. mask_lel = [] for mask_pat in ['.iter2.cleanmask', '.iter3.mask']: tclean_mask = imagename_pbcor_subim.replace('.iter3.image.pbcor.tt0.subim', mask_pat) tclean_mask_flatten = tclean_mask+'.subim' try: with casa_tools.ImageReader(tclean_mask) as image: rgTool = casa_tools.regionmanager region = rgTool.frombcs(csys=subim_cs.torecord(), shape=subim_shape, stokes='I', stokescontrol='a') image.subimage(outfile=tclean_mask_flatten, region=region, overwrite=True) mask_lel.append(f'"{tclean_mask_flatten}">0.5') LOG.info(f'Generated a tclean mask subimage: {tclean_mask_flatten}') except Exception as e: LOG.warning(f'Failed to obtain the tclean mask subimage: {tclean_mask_flatten}') mask_lel = '&&'.join(mask_lel) return mask_lel