Source code for pipeline.hifv.tasks.setmodel.vlasetjy

import csv
import math
import os

import numpy as np

import pipeline.domain as domain
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.h.tasks.common import commonfluxresults
from pipeline.hifv.heuristics import standard
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry

# Paper
# http://iopscience.iop.org/article/10.1088/0067-0049/204/2/19/pdf

LOG = infrastructure.get_logger(__name__)

ORIGIN = 'hifa_vlasetjy'


def find_standards(positions):
    """Function for finding standards from the original scripted EVLA pipeline
    """
    # set the max separation as ~1'
    MAX_SEPARATION = 60*2.0e-5
    position_3C48 = casa_tools.measures.direction('j2000', '1h37m41.299', '33d9m35.133')
    fields_3C48 = []
    position_3C138 = casa_tools.measures.direction('j2000', '5h21m9.886', '16d38m22.051')
    fields_3C138 = []
    position_3C147 = casa_tools.measures.direction('j2000', '5h42m36.138', '49d51m7.234')
    fields_3C147 = []
    position_3C286 = casa_tools.measures.direction('j2000', '13h31m8.288', '30d30m32.959')
    fields_3C286 = []

    for ii in range(0, len(positions)):
        position = casa_tools.measures.direction('j2000', str(positions[ii][0]) + 'rad', str(positions[ii][1]) + 'rad')
        separation = casa_tools.measures.separation(position, position_3C48)['value'] * math.pi / 180.0
        if (separation < MAX_SEPARATION):
            fields_3C48.append(ii)
        else:
            separation = casa_tools.measures.separation(position, position_3C138)['value'] * math.pi / 180.0
            if (separation < MAX_SEPARATION):
                fields_3C138.append(ii)
            else:
                separation = casa_tools.measures.separation(position, position_3C147)['value'] * math.pi / 180.0
                if (separation < MAX_SEPARATION):
                    fields_3C147.append(ii)
                else:
                    separation = casa_tools.measures.separation(position, position_3C286)['value'] * math.pi / 180.0
                    if (separation < MAX_SEPARATION):
                        fields_3C286.append(ii)

    fields = [fields_3C48, fields_3C138, fields_3C147, fields_3C286]

    return fields


def standard_sources(vis):
    """Get standard source names, fields, and positions - convenience function from
       the original EVLA scripted pipeline
    """

    with casa_tools.TableReader(vis + '/FIELD') as table:
        field_positions = table.getcol('PHASE_DIR')

    positions = []

    for ii in range(0, len(field_positions[0][0])):
        positions.append([field_positions[0][0][ii], field_positions[1][0][ii]])

    standard_source_names = ['3C48', '3C138', '3C147', '3C286']
    standard_source_fields = find_standards(positions)

    standard_source_found = False
    for standard_source_field in standard_source_fields:
        if standard_source_field:
            standard_source_found = True
    if not standard_source_found:
        standard_source_found = False
        LOG.error("ERROR: No standard flux density calibrator observed, flux density scale will be arbitrary")
        QA_calprep='Fail'

    return standard_source_names, standard_source_fields


class VLASetjyInputs(vdp.StandardInputs):

    @vdp.VisDependentProperty
    def field(self):

        # Get field ids in the current ms that have been observed
        # with the desired intent
        fields = self.ms.get_fields(intent=self.intent)
        unique_field_names = {f.name for f in fields}
        field_ids = {f.id for f in fields}

        # Fields with different intents may have the same name. Check for this
        # and return the field ids rather than the names to resolve any
        # ambiguities.
        if len(unique_field_names) is len(field_ids):
            return ','.join(unique_field_names)
        else:
            return ','.join([str(i) for i in field_ids])

    intent = vdp.VisDependentProperty(default='AMPLITUDE')

    @vdp.VisDependentProperty
    def refspectra(self):

        # If flux density was explicitly set and is not equal to -1, which is
        # hard-coded as the default value in the task interface return the
        # default tuple which is composed of the reference frequency, the
        # Stokes fluxdensity and the spectral index
        if self.fluxdensity != -1:
            return (self.reffreq, self.fluxdensity, self.spix)

        # There is no ms object.
        if not self.ms:
            return

        # The fluxdensity parameter is set to -1 which indicates that we must do a
        # flux lookup. The # lookup order is:
        #     1) from file, unless it's a solar system object
        #     2) from CASA

        # TODO: Replace with reading directly from the context

        # Read the reference flux values from a file
        ref_flux = []
        if os.path.exists(self.reffile):
            with open(self.reffile, 'rt') as f:

                # First row is header row
                reader = csv.reader(f)
                next(reader)

                # Loop over rows trying the new CSV format first
                for row in reader:
                    try:
                        try:
                            (ms_name, field_id, spw_id, I, Q, U, V, spix, uvmin, uvmax, comment) = row
                        except:
                            (ms_name, field_id, spw_id, I, Q, U, V, comment) = row
                            spix = str(self.spix)
                    except ValueError:
                        LOG.warning('Invalid flux statement in %s: \'%s'
                                    '\'' % (self.reffile, row))
                        continue

                    # Check that the entry is for the correct MS
                    if os.path.basename(ms_name) != self.ms.basename:
                        continue

                    # Add the value
                    spw_id = int(spw_id)
                    ref_flux.append((field_id, spw_id, float(I), float(Q),
                                     float(U), float(V), float(spix), float(uvmin), float(uvmax)))

                    # TODO sort the flux values in spw order?

        # Issue warning if the reference file was specified but not found.
        if not os.path.exists(self.reffile) and self.reffile not in ('', None):
            LOG.warning('Flux reference file not found: {!s}'.format(self.reffile))

        # Get the spectral window ids for the spws specified by the inputs
        spws = self.ms.get_spectral_windows(self.spw)
        spw_ids = sorted(spw.id for spw in spws)

        # In order to print flux densities in the same order as the fields, we
        # must retrieve the flux density for each field in turn
        field_flux = []
        for field_arg in utils.safe_split(self.field):

            # Field names may resolve to multiple field IDs
            fields = self.ms.get_fields(task_arg=field_arg, intent=self.intent)
            field_ids = {str(field.id) for field in fields}
            field_names = {field.name for field in fields}

            # Find fluxes
            flux_by_spw = []
            for spw_id in spw_ids:
                reffreq = str(self.ms.get_spectral_window(spw_id).centre_frequency)
                if self.normfluxes:
                    flux = [(reffreq, [I / I, Q / I, U / I, V / I], spix)
                            for (ref_field_id, ref_spw_id, I, Q, U, V, spix, uvmin, uvmax) in ref_flux
                            if (ref_field_id in field_ids
                                or ref_field_id in field_names)
                            and ref_spw_id == spw_id]
                else:
                    flux = [(reffreq, [I, Q, U, V], spix)
                            for (ref_field_id, ref_spw_id, I, Q, U, V, spix, uvmin, uvmax) in ref_flux
                            if (ref_field_id in field_ids
                                or ref_field_id in field_names)
                            and ref_spw_id == spw_id]

                # No flux measurements found for the requested field/spws, so do
                # either a CASA model look-up (-1) or reset the flux to 1.
                if not flux:
                    if any('AMPLITUDE' in f.intents for f in fields):
                        flux = (reffreq, -1, self.spix)
                    else:
                        flux = (reffreq, [-1], self.spix)

                # If this is a solar system calibrator, ignore any catalogue
                # flux and request a CASA model lookup
                if not field_names.isdisjoint(standard.Standard.ephemeris_fields):
                    LOG.debug('Ignoring records from file for solar system '
                              'calibrator')
                    flux = (reffreq, -1, 0.0)

                flux_by_spw.append(flux[0] if len(flux) == 1 else flux)

            field_flux.append(flux_by_spw[0] if len(flux_by_spw) == 1 else flux_by_spw)

        return field_flux[0] if len(field_flux) == 1 else field_flux

    @vdp.VisDependentProperty
    def reffile(self):
        value = os.path.join(self.context.output_dir, 'flux.csv')
        return value

    normfluxes = vdp.VisDependentProperty(default=False)
    reffreq = vdp.VisDependentProperty(default='1GHz')
    fluxdensity = vdp.VisDependentProperty(default=-1)
    spix = vdp.VisDependentProperty(default=0.0)
    scalebychan = vdp.VisDependentProperty(default=True)
    model = vdp.VisDependentProperty(default='')

    @vdp.VisDependentProperty
    def standard(self):

        # Get the standard heuristics function.
        heu_standard = standard.Standard()

        # The field may be an integer, but the standard heuristic operates on
        # strings so determine the corresponding name of the fields
        field_names = []
        for field in utils.safe_split(self.field):
            if str(field).isdigit():
                matching_fields = self.ms.get_fields(field)
                assert len(matching_fields) == 1
                field_names.append(matching_fields[0].name)
            else:
                field_names.append(field)

        standards = [heu_standard(field) for field in field_names]
        return standards[0] if len(standards) == 1 else standards

    # docstring and type hints: supplements hifv_vlasetjy
    def __init__(self, context, output_dir=None, vis=None, field=None, intent=None, spw=None, model=None,
                 scalebychan=None, fluxdensity=None, spix=None, reffreq=None, standard=None,
                 refspectra=None, reffile=None, normfluxes=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            output_dir: Output directory.
                Defaults to None, which corresponds to the current working directory.

            vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the hifv_importdata task.

            field: List of field names or ids.

            intent: Observing intent of flux calibrators.

            spw: List of spectral window ids.

            model: File location for field model.

            scalebychan: Scale the flux density on a per channel basis or else on a per spw basis.

            fluxdensity: Specified flux density [I,Q,U,V]; -1 will lookup values.

            spix: Spectral index of fluxdensity.  Can be set when fluxdensity is not -1.

            reffreq: Reference frequency for spix.  Can be set when fluxdensity is not -1.

            standard: Flux density standard.

            refspectra:

            reffile: Path to file with fluxes for non-solar system calibrators.

            normfluxes:

        """

        super().__init__()

        self.context = context
        self.vis = vis
        self.output_dir = output_dir
        self.field = field
        self.intent = intent
        self.spw = spw
        self.model = model
        self.scalebychan = scalebychan
        self.fluxdensity = fluxdensity
        self.spix = spix
        self.reffreq = reffreq
        self.standard = standard
        self.refspectra = refspectra
        self.reffile = reffile
        self.normfluxes = normfluxes

    def to_casa_args(self):
        d = super().to_casa_args()

        d['fluxdensity'] = d['refspectra'][1]
        try:
            np.testing.assert_almost_equal(d['refspectra'][2], 0.0)
        except:
            d['reffreq'] = 'TOPO ' + d['refspectra'][0].replace(" ", "")
            d['spix'] = d['refspectra'][2]

        # Filter out reffile. Note that the , is required
        for ignore in ('reffile', 'refspectra', 'normfluxes',):
            if ignore in d:
                del d[ignore]

        # Enable intent selection in CASA.
        d['selectdata'] = True

        # Force usescratch to True for now
        d['usescratch'] = True

        return d


[docs] @task_registry.set_equivalent_casa_task('hifv_vlasetjy') class VLASetjy(basetask.StandardTaskTemplate): Inputs = VLASetjyInputs
[docs] def prepare(self): inputs = self.inputs result = commonfluxresults.FluxCalibrationResults(vis=inputs.vis) # Return early if the field has no data of the required intent. This # could be the case when given multiple MSes, one of which could be # without an amplitude calibrator for instance. if not inputs.ms.get_fields(inputs.field, intent=inputs.intent): LOG.warning('Field(s) \'%s\' in %s have no data with intent %s' % (inputs.field, inputs.ms.basename, inputs.intent)) return result # get the spectral windows for the spw inputs argument # This is done later in the loop for the VLA, just keeping here for history memory # print "Type of inputs.spw: ", inputs.spw, type(inputs.spw) # spws = [spw for spw in inputs.ms.get_spectral_windows(inputs.spw)] # Multiple spectral windows spawn multiple setjy jobs. Set a unique # marker in the CASA log so that we can identify log entries from this # particular task # start_marker = self._add_marker_to_casa_log() # loop over fields so that we can use Setjy for sources with different # standards setjy_dicts = [] standard_source_names, standard_source_fields = standard_sources(self.inputs.vis) m = self.inputs.context.observing_run.get_ms(self.inputs.vis) field_spws = m.get_vla_field_spws() spw2band = m.get_vla_spw2band() # bands = spw2band.values() # Look in spectral window domain object as this information already exists! with casa_tools.TableReader(self.inputs.vis + '/SPECTRAL_WINDOW') as table: channels = table.getcol('NUM_CHAN') originalBBClist = table.getcol('BBC_NO') spw_bandwidths = table.getcol('TOTAL_BANDWIDTH') reference_frequencies = table.getcol('REF_FREQUENCY') center_frequencies = [rf + spwbw / 2 for rf, spwbw in zip(reference_frequencies, spw_bandwidths)] # LOG.info("STANDARD SOURCE FIELDS:") # print standard_source_fields for i, fields in enumerate(standard_source_fields): for myfield in fields: domainfield = m.get_fields(myfield)[0] if 'AMPLITUDE' in domainfield.intents: inputs.field = myfield # Use the domain object to get the actual field id, ***NOT*** the index # setjyfield = m.get_fields()[myfield].id jobs = [] VLAspws = field_spws[myfield] strlistVLAspws = ','.join(str(spw) for spw in VLAspws) spws = [spw for spw in inputs.ms.get_spectral_windows(strlistVLAspws)] for spw in spws: inputs.spw = spw.id reference_frequency = center_frequencies[spw.id] EVLA_band = spw2band[spw.id] LOG.info("Center freq for spw " + str(spw.id) + " = " + str(reference_frequency) + ", observing band = " + EVLA_band) model_image = standard_source_names[i] + '_' + EVLA_band + '.im' LOG.info("Setting model for field " + str(m.get_fields()[myfield].id) + " spw "+str(spw.id) + " using " + model_image) if self.inputs.model: LOG.warning("Model override input. Using {!s}".format(self.inputs.model)) model_image = self.inputs.model task_args = {'vis' : inputs.vis, 'field' : str(myfield), 'spw' : str(spw.id), 'selectdata' : False, 'model' : model_image, 'intent' : '', 'listmodels' : False, 'scalebychan' : inputs.scalebychan, 'fluxdensity' : inputs.fluxdensity, 'standard' : inputs.standard, 'usescratch' : True} jobs.append(casa_tasks.setjy(**task_args)) # Flux densities coming from a non-lookup are added to the # results so that user-provided calibrator fluxes are # committed back to the domain objects if inputs.refspectra[1] != -1: try: (I, Q, U, V) = inputs.refspectra[1] flux = domain.FluxMeasurement(spw_id=spw.id, I=I, Q=Q, U=U, V=V, origin=ORIGIN) except: I = inputs.refspectra[1][0] flux = domain.FluxMeasurement(spw_id=spw.id, I=I, origin=ORIGIN) result.measurements[str(myfield)].append(flux) # merge identical jobs into one job with a multi-spw argument jobs_and_components = utils.merge_jobs(jobs, casa_tasks.setjy, merge=('spw',)) for job, _ in jobs_and_components: try: setjy_dicts.append(self._executor.execute(job)) except: LOG.warning("SetJy issue with field id=" + str(job.kw['field']) + " and spw=" + str(job.kw['spw'])) spw_seen = set() for setjy_dict in setjy_dicts: try: setjy_dict.pop('format') except: LOG.warning("'format' key not found") for field_id in setjy_dict: setjy_dict[field_id].pop('fieldName') field = self.inputs.ms.get_fields(field_id)[0] if inputs.refspectra[1] == -1: for spw_id in setjy_dict[field_id]: I = setjy_dict[field_id][spw_id]['fluxd'][0] Q = setjy_dict[field_id][spw_id]['fluxd'][1] U = setjy_dict[field_id][spw_id]['fluxd'][2] V = setjy_dict[field_id][spw_id]['fluxd'][3] flux = domain.FluxMeasurement(spw_id, I, Q=Q, U=U, V=V, origin=ORIGIN) if spw_id not in spw_seen: result.measurements[field.identifier].append(flux) spw_seen.add(spw_id) return result
[docs] def analyse(self, result): return result