import copy
import os
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.sessionutils as sessionutils
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.h.heuristics import fieldnames as fieldnames
from pipeline.h.tasks.common import commonfluxresults
from pipeline.infrastructure import task_registry
from . import setjy
LOG = infrastructure.get_logger(__name__)
class SetModelsInputs(vdp.StandardInputs):
normfluxes = vdp.VisDependentProperty(default=True)
refintent = vdp.VisDependentProperty(default = 'AMPLITUDE')
scalebycan = vdp.VisDependentProperty(default=True)
transintent = vdp.VisDependentProperty(default = 'BANDPASS')
@vdp.VisDependentProperty
def reference(self):
# this will give something like '0542+3243,0343+242'
field_fn = fieldnames.IntentFieldnames()
reference_fields = field_fn.calculate(self.ms, self.refintent)
# run the answer through a set, just in case there are duplicates
fields = utils.deduplicate(s for s in utils.safe_split(reference_fields))
return ','.join(fields)
@vdp.VisDependentProperty
def reffile(self):
value = os.path.join(self.context.output_dir, 'flux.csv')
return value
@vdp.VisDependentProperty
def transfer(self):
transfer_fn = fieldnames.IntentFieldnames()
# call the heuristic to get the transfer fields as a string
transfer_fields = transfer_fn.calculate(self.ms, self.transintent)
# remove the reference field should it also have been observed with
# the transfer intent
transfers = set(self.ms.get_fields(task_arg=transfer_fields))
references = set(self.ms.get_fields(task_arg=self.reference))
diff = transfers.difference(references)
transfer_names = sorted(f.name for f in diff)
fields_with_name = self.ms.get_fields(name=transfer_names)
if len(fields_with_name) != len(diff) or len(diff) != len(transfer_names):
return ','.join(str(f.id) for f in sorted(diff, key=lambda f: f.id))
else:
return ','.join(transfer_names)
parallel = sessionutils.parallel_inputs_impl(default=False)
# docstring and type hints: supplements hif_setmodels
def __init__(self, context, output_dir=None, vis=None, reference=None,
refintent=None, transfer=None, transintent=None,
reffile=None, normfluxes=None, scalebychan=None, parallel=None):
"""Initialize Inputs.
Args:
context: Pipeline context object containing state information.
output_dir: Output directory.
Defaults to ``None``, which corresponds to the current working directory.
vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the pipeline context.
Example: ``['M32A.ms', 'M32B.ms']``
reference: A string containing a comma delimited list of field names defining the reference calibrators. Defaults to field names with
intent 'AMPLITUDE'.
Example: ``'M82,3C273'``
refintent: A string containing a comma delimited list of intents used to select the reference calibrators. Defaults to 'AMPLITUDE'.
Example: ``'BANDPASS'``
transfer: A string containing a comma delimited list of field names defining the transfer calibrators. Defaults to field names with
intent ''.
Example: ``'J1328+041,J1206+30'``
transintent: A string containing a comma delimited list of intents defining the transfer calibrators. Defaults to ``'BANDPASS,PHASE,CHECK'``.
``''`` stands for no transfer sources.
Example: ``'PHASE'``
reffile: The reference file containing a lookup table of point source models This file currently defaults to 'flux.csv' in the working directory. This
file must conform to the standard pipeline 'flux.csv' format
Example: ``'myfluxes.csv'``
normfluxes: Normalize the transfer source flux densities.
scalebychan: Scale the flux density on a per channel basis or else on a per spw basis
parallel: Process multiple MeasurementSets in parallel using the casampi parallelization framework.
Options: ``'automatic'``, ``'true'``, ``'false'``, ``True``, ``False``
Default: ``None`` (equivalent to ``False``)
"""
super().__init__()
self.context = context
self.vis = vis
self.output_dir = output_dir
self.reference = reference
self.refintent = refintent
self.transfer = transfer
self.transintent = transintent
self.reffile = reffile
self.normfluxes = normfluxes
self.scalebychan = scalebychan
self.parallel = parallel
class SerialSetModels(basetask.StandardTaskTemplate):
Inputs = SetModelsInputs
def prepare(self, **parameters):
# Initialize the result.
result = commonfluxresults.FluxCalibrationResults(vis=self.inputs.vis)
# Set reference calibrator models.
# These models will be assigned the lookup reference frequency,
# Stokes parameters, and spix if they are available. If they are not the
# Setjy defaults (spw center frequency, [1.0, 0.0, 0.0, 0.0], 0.0)
# will be used
reference_fields = self.inputs.reference
reference_intents = self.inputs.refintent
if reference_fields not in (None, ''):
refresults = self._do_setjy(
reference_fields, reference_intents, reffile=self.inputs.reffile,
normfluxes=False, scalebychan=self.inputs.scalebychan)
# Add measurements to the results object
result.measurements.update(copy.deepcopy(refresults.measurements))
# Set transfer calibrator models.
# These models will be assigned the lookup reference frequency,
# Stokes parameters, and spix if they are available. If they are not the
# Setjy defaults (spw center frequency, [1.0, 0.0, 0.0, 0.0], 0.0)
# will be used . If normfluxes is True then the stokes parameters
# will be normalized to a value of 1
transfer_fields = self.inputs.transfer
transfer_intents = self.inputs.transintent
if transfer_fields not in (None, ''):
transresults = self._do_setjy(
transfer_fields, transfer_intents, reffile=self.inputs.reffile,
normfluxes=self.inputs.normfluxes, scalebychan=self.inputs.scalebychan)
# Add measurements to the results object
result.measurements.update(copy.deepcopy(transresults.measurements))
return result
def analyse(self, result):
return result
# Call the Setjy task
def _do_setjy(self, field, intent, reffile=None, normfluxes=None, scalebychan=None):
task_args = {
'output_dir': self.inputs.output_dir,
'vis': self.inputs.vis,
'field': field,
'intent': intent,
'fluxdensity': -1,
'reffile': reffile,
'normfluxes': normfluxes,
'scalebychan': scalebychan
}
task_inputs = vdp.InputsContainer(setjy.Setjy, self.inputs.context, **task_args)
task = setjy.Setjy(task_inputs)
results_list = self._executor.execute(task, merge=False)
return results_list[0]
[docs]
@task_registry.set_equivalent_casa_task('hif_setmodels')
class SetModels(sessionutils.ParallelTemplate):
Inputs = SetModelsInputs
Task = SerialSetModels