import numpy
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.vdp as vdp
from pipeline.h.heuristics import caltable as caltable_heuristic
from pipeline.infrastructure.tablereader import find_EVLA_band
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import casa_tools
from . import resultobjects
LOG = infrastructure.get_logger(__name__)
def _find_spw(vis, bands, context):
"""Identify spw information"""
with casa_tools.TableReader(vis+'/SPECTRAL_WINDOW') as table:
channels = table.getcol('NUM_CHAN')
# originalBBClist = table.getcol('BBC_NO')
spw_bandwidths = table.getcol('TOTAL_BANDWIDTH')
reference_frequencies = table.getcol('REF_FREQUENCY')
center_frequencies = [rf + spwbw / 2 for rf, spwbw in zip(reference_frequencies, spw_bandwidths)]
if bands == []:
bands = list(map(find_EVLA_band, center_frequencies))
unique_bands = list(numpy.unique(bands))
numSpws = len(channels)
unique_bands_string = ','.join(unique_bands)
all_spws_list = list(range(numSpws))
all_spws = ','.join(map(str, all_spws_list))
return all_spws, center_frequencies
class OpcalInputs(vdp.StandardInputs):
spw = vdp.VisDependentProperty(default='')
@vdp.VisDependentProperty
def caltable(self):
namer = caltable_heuristic.OpCaltable()
casa_args = self._get_task_args(ignore=('caltable',))
return namer.calculate(output_dir=self.output_dir, stage=self.context.stage, **casa_args)
@vdp.VisDependentProperty
def parameter(self):
return []
def __init__(self, context, output_dir=None, vis=None, caltable=None, parameter=None, spw=None):
self.context = context
self.output_dir = output_dir
self.vis = vis
self.spw = spw
self.parameter = parameter
self.caltable = caltable
def to_casa_args(self):
args = super().to_casa_args()
args['caltype'] = 'opac'
return args
[docs]
class Opcal(basetask.StandardTaskTemplate):
Inputs = OpcalInputs
[docs]
def prepare(self):
inputs = self.inputs
context = self.inputs.context
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
spw2band = m.get_vla_spw2band()
bands = list(spw2band.values())
seasonal_weight = 1.0
try:
with casa_tools.TableReader(self.inputs.vis + '/WEATHER') as table:
numRows = table.nrows()
if numRows == 0:
LOG.warning("Weather station broken during this period, using 100% seasonal model for calculating the"
" zenith opacity")
seasonal_weight = 1.0
else:
LOG.info("Using seasonal_weight of 0.5") # Standard value to use
seasonal_weight = 0.5
except:
LOG.warning("Unable to open MS weather table. Using 100% seasonal model for calculating the zenith opacity")
'''
if (((startdate >= 55918.80) and (startdate <= 55938.98)) or ((startdate >= 56253.6) and (startdate <= 56271.6))):
LOG.warning("Weather station broken during this period, using 100% seasonal model for calculating the zenith opacity")
seasonal_weight=1.0
else:
LOG.info("Using seasonal_weight of 0.5")
seasonal_weight=0.5
'''
plotweather_args = {'vis': inputs.vis, 'seasonal_weight': seasonal_weight, 'doPlot': True}
plotweather_job = casa_tasks.plotweather(**plotweather_args)
opacities = self._executor.execute(plotweather_job)
# PIPE-2370: plotweather returns tau as list[np.float64] instead of list[float].
# Convert it to list[float] for better weblog presentation and formatting in casa_commands.log.
opacities = [float(x) for x in opacities]
inputs.parameter = opacities
inputs.spw, center_frequencies = _find_spw(inputs.vis, bands, context)
gencal_args = inputs.to_casa_args()
gencal_job = casa_tasks.gencal(**gencal_args)
self._executor.execute(gencal_job)
callist = []
calto = callibrary.CalTo(vis=inputs.vis)
calfrom = callibrary.CalFrom(gencal_args['caltable'], caltype='opac', interp='', calwt=False)
calapp = callibrary.CalApplication(calto, calfrom)
callist.append(calapp)
return resultobjects.OpcalResults(pool=callist, opacities=opacities, spw=inputs.spw,
center_frequencies=center_frequencies, seasonal_weight=seasonal_weight)
[docs]
def analyse(self, result):
# With no best caltable to find, our task is simply to set the one
# caltable as the best result
# double-check that the caltable was actually generated
on_disk = [ca for ca in result.pool if ca.exists()]
result.final[:] = on_disk
missing = [ca for ca in result.pool if ca not in on_disk]
result.error.clear()
result.error.update(missing)
return result