from casatasks.private import tec_maps
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.vdp as vdp
from pipeline.h.heuristics import caltable as caltable_heuristic
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry
LOG = infrastructure.get_logger(__name__)
class TecMapsInputs(vdp.StandardInputs):
show_tec_maps = vdp.VisDependentProperty(default=True)
apply_tec_correction = vdp.VisDependentProperty(default=False)
@vdp.VisDependentProperty
def caltable(self):
namer = caltable_heuristic.TecMapstable()
casa_args = self._get_task_args(ignore=('caltable',))
return namer.calculate(output_dir=self.output_dir, stage=self.context.stage, **casa_args)
@vdp.VisDependentProperty
def parameter(self):
return []
# docstring and type hints: supplements hifv_tecmaps
def __init__(self, context, output_dir=None, vis=None, show_tec_maps=None, apply_tec_correction=None,
caltable=None, caltype=None, parameter=None):
"""Initialize Inputs.
Args:
context: Pipeline context object containing state information.
output_dir: Output directory.
Defaults to None, which corresponds to the current working directory.
vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the hifv_importdata task.
show_tec_maps:
apply_tec_correction:
caltable:
caltype:
parameter:
"""
super().__init__()
self.context = context
self.output_dir = output_dir
self.vis = vis
self.show_tec_maps = show_tec_maps
self.apply_tec_correction = apply_tec_correction
self.parameter = parameter
self.caltable = caltable
self.caltype = caltype
def to_casa_args(self):
args = super().to_casa_args()
args['caltype'] = 'tecim'
return args
class TecMapsResults(basetask.Results):
def __init__(self, final=None, pool=None, preceding=None, tec_image=None, tec_rms_image=None,
tec_plotfile=None):
if final is None:
final = []
if pool is None:
pool = []
if preceding is None:
preceding = []
super().__init__()
self.vis = None
self.pool = pool[:]
self.final = final[:]
self.preceding = preceding[:]
self.error = set()
self.tec_image = tec_image
self.tec_rms_image = tec_rms_image
self.tec_plotfile = tec_plotfile
def merge_with_context(self, context):
if not self.final:
LOG.error('No results to merge')
return
for calapp in self.final:
LOG.debug('Adding calibration to callibrary:\n'
'%s\n%s' % (calapp.calto, calapp.calfrom))
context.callibrary.add(calapp.calto, calapp.calfrom)
def __repr__(self):
# Format the GainCurve results.
s = 'TecMapsResults:\n'
for calapplication in self.final:
s += '\tTecMaps caltable written to {name}\n'.format(
name=calapplication.gaintable)
return s
[docs]
@task_registry.set_equivalent_casa_task('hifv_tecmaps')
class TecMaps(basetask.StandardTaskTemplate):
Inputs = TecMapsInputs
[docs]
def prepare(self):
inputs = self.inputs
tec_image, tec_rms_image, tec_plotfile = None, None, None
callist = []
if self.inputs.show_tec_maps or self.inputs.apply_tec_correction:
try:
tec_image, tec_rms_image, tec_plotfile = tec_maps.create(vis=inputs.vis, doplot=True, imname='iono')
except UnboundLocalError as ex:
LOG.warning(' TEC map query error: %s', ex)
# IPE-2795: the return values from tec_maps.create()
# could be 'none' (string) instead of None.
if tec_image in ('none', 'None', ''):
tec_image = None
if tec_rms_image in ('none', 'None', ''):
tec_rms_image = None
if tec_plotfile in ('none', 'None', ''):
tec_plotfile = None
if not tec_image:
LOG.warning('TEC information or retrieval service is unavailable')
return TecMapsResults(
pool=callist, final=callist, tec_image=None, tec_rms_image=None, tec_plotfile=None
)
if self.inputs.apply_tec_correction:
gencal_args = inputs.to_casa_args()
gencal_args.pop('show_tec_maps')
gencal_args.pop('apply_tec_correction')
gencal_args['infile'] = tec_image
gencal_job = casa_tasks.gencal(**gencal_args)
self._executor.execute(gencal_job)
calto = callibrary.CalTo(vis=inputs.vis)
calfrom = callibrary.CalFrom(gencal_args['caltable'], caltype='tecim', interp='', calwt=False)
calapp = callibrary.CalApplication(calto, calfrom)
callist.append(calapp)
return TecMapsResults(
pool=callist, final=callist, tec_image=tec_image, tec_rms_image=tec_rms_image, tec_plotfile=tec_plotfile
)
else:
return TecMapsResults(pool=callist, final=callist, tec_image=None, tec_rms_image=None, tec_plotfile=None)
[docs]
def analyse(self, result):
# double-check that the caltable was actually generated
on_disk = [ca for ca in result.pool if ca.exists()]
result.final[:] = on_disk
missing = [ca for ca in result.pool if ca not in on_disk]
result.error.clear()
result.error.update(missing)
return result
def _do_tecmaps(self):
# Private class method for reference only
# tec_image, tec_rms_image = tec_maps.create('vlass3C48.ms')
try:
tec_maps.create(vis=self.vis, doplot=True, imname='iono')
except UnboundLocalError as e:
LOG.warning("TEC Maps error returned. CASA {!s}".format(e))
return None
# gencal_job = casa_tasks.gencal(**gencal_args)
gencal_job = casa_tasks.gencal(vis=self.vis, caltable='file.tec', caltype='tecim', infile='iono.IGS_TEC.im')
self._executor.execute(gencal_job)