import ast
import math
import os
import traceback
import pipeline.hif.heuristics.findrefant as findrefant
import pipeline.hif.tasks.gaincal as gaincal
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.vdp as vdp
from pipeline.hif.tasks.polarization import polarization
from pipeline.hifv.tasks.setmodel.vlasetjy import standard_sources
from pipeline.hifv.heuristics import uvrange
from pipeline.hifv.heuristics.lib_EVLApipeutils import vla_minbaselineforcal
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry
from pipeline.infrastructure import utils
LOG = infrastructure.get_logger(__name__)
class CircfeedpolcalResults(polarization.PolarizationResults):
def __init__(self, final=None, pool=None, preceding=None, vis=None,
refant=None, calstrategy=None, caldictionary=None):
if final is None:
final = []
if pool is None:
pool = []
if preceding is None:
preceding = []
if refant is None:
refant = ''
if calstrategy is None:
calstrategy = ''
if caldictionary is None:
caldictionary = {}
super().__init__()
self.vis = vis
self.pool = pool[:]
self.final = final[:]
self.preceding = preceding[:]
self.refant = refant
self.calstrategy = calstrategy
self.caldictionary = caldictionary
def merge_with_context(self, context):
"""
See :method:`~pipeline.infrastructure.api.Results.merge_with_context`
"""
if not self.final:
LOG.warning('No circfeedpolcal results to add to the callibrary')
return
for calapp in self.final:
LOG.debug('Adding pol calibration to callibrary:\n'
'%s\n%s' % (calapp.calto, calapp.calfrom))
context.callibrary.add(calapp.calto, calapp.calfrom)
def __repr__(self):
# return 'CircfeedpolcalResults:\n\t{0}'.format(
# '\n\t'.join([ms.name for ms in self.mses]))
return 'CircfeedpolcalResults:'
class CircfeedpolcalInputs(vdp.StandardInputs):
Dterm_solint = vdp.VisDependentProperty(default='2MHz')
refantignore = vdp.VisDependentProperty(default='')
leakage_poltype = vdp.VisDependentProperty(default='')
mbdkcross = vdp.VisDependentProperty(default=True)
refant = vdp.VisDependentProperty(default='')
run_setjy = vdp.VisDependentProperty(default=True)
@vdp.VisDependentProperty
def clipminmax(self):
return [0.0, 0.25]
# docstring and type hints: supplements hifv_circfeedpolcal
def __init__(self, context, vis=None, Dterm_solint=None, refantignore=None, leakage_poltype=None,
mbdkcross=None, clipminmax=None, refant=None, run_setjy=None):
"""Initialize Inputs.
Args:
context: Pipeline context object containing state information.
vis: List of input visibility data.
Dterm_solint: D-terms spectral averaging.
Example: refantignore='ea02,ea03'.
refantignore: String list of antennas to ignore.
leakage_poltype: poltype to use in first polcal execution - blank string means use default heuristics.
mbdkcross: Run gaincal KCROSS grouped by baseband.
clipminmax: Acceptable range for leakage amplitudes, values outside will be flagged.
refant: A csv string of reference antenna(s). When used, disables ``refantignore``. Example: refant = 'ea01, ea02'
run_setjy: Run setjy for amplitude/flux calibrator, default set to True.
"""
super().__init__()
self.context = context
self.vis = vis
self.Dterm_solint = Dterm_solint
self.refantignore = refantignore
self.leakage_poltype = leakage_poltype
self.mbdkcross = mbdkcross
self.clipminmax = clipminmax
self.refant = refant
self.run_setjy = run_setjy
[docs]
@task_registry.set_equivalent_casa_task('hifv_circfeedpolcal')
class Circfeedpolcal(polarization.Polarization):
Inputs = CircfeedpolcalInputs
[docs]
def prepare(self):
self.callist = []
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
# PIPE-2164: getting setjy result stored in context
self.setjy_results = self.inputs.context.evla['msinfo'][m.name].setjy_results
intents = list(m.intents)
self.RefAntOutput = ['']
self.calstrategy = ''
self.caldictionary = {}
if [intent for intent in intents if 'POL' in intent]:
try:
self.do_prepare()
except Exception as ex:
LOG.warning(ex)
LOG.debug(traceback.format_exc())
return CircfeedpolcalResults(vis=self.inputs.vis, pool=self.callist, final=self.callist,
refant=self.RefAntOutput[0].lower(), calstrategy=self.calstrategy,
caldictionary=self.caldictionary)
[docs]
def analyse(self, results):
return results
[docs]
def do_prepare(self):
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
self.ignorerefant = self.inputs.context.evla['msinfo'][m.name].ignorerefant
refantignore = utils.build_refantignore(refantignore=self.inputs.refantignore, ignorerefant=self.ignorerefant)
refantfield = self.inputs.context.evla['msinfo'][m.name].calibrator_field_select_string
# PIPE-595: if refant list is not provided, compute refants else use provided refant list.
if len(self.inputs.refant) == 0:
refantobj = findrefant.RefAntHeuristics(vis=self.inputs.vis, field=refantfield,
geometry=True, flagging=True, intent='', spw='',
refantignore=refantignore)
self.RefAntOutput = refantobj.calculate()
else:
self.RefAntOutput = self.inputs.refant.split(",")
# setjy for amplitude/flux calibrator (VLASS 3C286 or 3C48)
fluxcalfieldname, fluxcalfieldid, fluxcal = self._do_setjy()
try:
stage_number = self.inputs.context.results[-1].read()[0].stage_number + 1
except Exception as e:
stage_number = self.inputs.context.results[-1].read().stage_number + 1
tableprefix = os.path.basename(self.inputs.vis) + '.' + 'hifv_circfeedpolcal.s'
tablesToAdd = [[tableprefix + str(stage_number) + '_1.' + 'kcross.tbl', 'kcross', []],
[tableprefix + str(stage_number) + '_2.' + 'D2.tbl', 'polarization', []],
[tableprefix + str(stage_number) + '_3.' + 'X1.tbl', 'polarization', []]]
# D-terms - do we need this?
# self.do_polcal(self.inputs.vis+'.D1', 'D+QU',field='',
# intent='CALIBRATE_POL_LEAKAGE#UNSPECIFIED',
# gainfield=[''], spwmap=[],
# solint='inf')
# First pass R-L delay
tablesToAdd[0][2] = [] # Default for KCROSS table
if self.inputs.mbdkcross:
baseband_spws_list = m.get_vla_baseband_spws(science_windows_only=True, return_select_list=True)
baseband_spwstr = [','.join(map(str, spws_list)) for spws_list in baseband_spws_list]
addcallib = False
if len(baseband_spwstr) == 1:
addcallib = True
for spws in baseband_spwstr:
LOG.info("Executing gaincal on baseband with spws={!s}".format(spws))
self.do_gaincal(tablesToAdd[0][0], field=fluxcalfieldname, spw=spws,
combine='scan,spw')
tablesToAdd[0][2] = self.do_spwmap()
else:
spwsobj = m.get_spectral_windows(science_windows_only=True)
spwslist = [str(spw.id) for spw in spwsobj]
spws = ','.join(spwslist)
addcallib = True
self.do_gaincal(tablesToAdd[0][0], field=fluxcalfieldname, spw=spws)
if os.path.exists(tablesToAdd[0][0]):
addcallib = True
if addcallib:
LOG.info("Adding " + str(tablesToAdd[0][0]) + " to callibrary.")
calfrom = callibrary.CalFrom(gaintable=tablesToAdd[0][0], interp='', calwt=False, caltype='kcross')
calto = callibrary.CalTo(self.inputs.vis)
calapp = callibrary.CalApplication(calto, calfrom)
self.inputs.context.callibrary.add(calapp.calto, calapp.calfrom)
# Determine number of scans with POLLEAKGE intent and use the first POLLEAKAGE FIELD
polleakagefield = ''
polleakagefields = m.get_fields(intent='POLLEAKAGE')
try:
polleakagefield = polleakagefields[0].name
except Exception as ex:
# If no POLLEAKAGE intent is associated with a field, then use the flux calibrator
polleakagefield = fluxcalfieldname
LOG.warning("Exception: No POLLEAKGE intents found. {!s}".format(str(ex)))
if len(polleakagefields) > 1:
# Use the first pol leakage field
polleakagefield = polleakagefields[0].name
LOG.info("More than one field with intent of POLLEAKGE. Using field {!s}".format(polleakagefield))
polleakagescans = []
poltype = 'Df+QU' # Default
for scan in m.get_scans(field=polleakagefield):
if 'POLLEAKAGE' in scan.intents:
polleakagescans.append((scan.id, scan.intents))
# Calibration Strategies
LOG.info("Number of POL_LEAKAGE scans: {!s}".format(len(polleakagescans)))
self.calstrategy = ''
if len(polleakagescans) >= 3:
poltype = 'Df+QU' # C4
self.calstrategy = "Using Calibration Strategy C4: 3 or more slices CALIBRATE_POL_LEAKAGE, KCROSS, Df+QU, Xf."
if len(polleakagescans) < 3:
poltype = 'Df' # C1
self.calstrategy = "Using Calibration Strategy C1: Less than 3 slices CALIBRATE_POL_LEAKAGE, KCROSS, Df, Xf."
LOG.info(self.calstrategy)
if self.inputs.leakage_poltype:
poltype = self.inputs.leakage_poltype
self.calstrategy = "Calibration Strategy OVERRIDE: User-defined leakage_poltype of " + str(poltype)
LOG.warning(self.calstrategy)
# Determine the first POLANGLE FIELD
polanglefield = ''
polanglefields = m.get_fields(intent='POLANGLE')
try:
polanglefield = polanglefields[0].name
except Exception as ex:
# If no POLANGLE intent is associated with a field, then use the flux calibrator
polanglefield = fluxcalfieldname
LOG.warning("Exception: No POLANGLE intents found. {!s}".format(str(ex)))
if len(polanglefields) > 1:
# Use the first pol angle field
polanglefield = polanglefields[0].name
LOG.info("More than one field with intent of POLANGLE. Using field {!s}".format(polanglefield))
# D-terms in 2MHz pieces, minsnr of 5.0
LOG.info("Polcal D-terms using solint=\'inf,{!s}\'".format(self.inputs.Dterm_solint))
self.do_polcal(tablesToAdd[1][0], kcrosstable=tablesToAdd[0][0], poltype=poltype, field=polleakagefield,
intent='CALIBRATE_POL_LEAKAGE#UNSPECIFIED',
gainfield=[''], kcrossspwmap=tablesToAdd[0][2],
solint='inf,{!s}'.format(self.inputs.Dterm_solint),
minsnr=5.0)
# Clip flagging
self._do_clipflag(tablesToAdd[1][0])
# 2MHz pieces, minsnr of 3.0
self.do_polcal(tablesToAdd[2][0], kcrosstable=tablesToAdd[0][0], poltype='Xf', field=polanglefield,
intent='CALIBRATE_POL_ANGLE#UNSPECIFIED',
gainfield=[''], kcrossspwmap=tablesToAdd[0][2], solint='inf,2MHz',
minsnr=3.0)
for (addcaltable, caltype, spwmap) in tablesToAdd:
calto = callibrary.CalTo(self.inputs.vis)
calfrom = callibrary.CalFrom(gaintable=addcaltable, interp='', calwt=False,
caltype=caltype, spwmap=spwmap)
calapp = callibrary.CalApplication(calto, calfrom)
self.callist.append(calapp)
self.caldictionary = {'fluxcalfieldname': fluxcalfieldname,
'fluxcalfieldid': fluxcalfieldid,
'fluxcal': fluxcal,
'polanglefield': polanglefield,
'polleakagefield': polleakagefield}
def _modifyGainTables(self, GainTables):
'''
Args:
GainTables: Python List of tables from the calibrary
Returns: replaces the finalphasegaincal name with the phaseshortgaincal table from hifv_finalcals
'''
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
idx = -1 # Should be last element
newtable = ''
for i, table in enumerate(GainTables):
if 'finalphasegaincal' in table:
idx = i
try:
newtable = self.inputs.context.evla['msinfo'][m.name].phaseshortgaincaltable
except AttributeError:
LOG.warning("Exception: 'phaseshortgaincaltable' is not present.")
GainTables[idx] = newtable
return GainTables
[docs]
def do_gaincal(self, caltable, field='', spw='', combine='scan'):
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
minBL_for_cal = vla_minbaselineforcal()
append = False
if os.path.exists(caltable):
append = True
LOG.info("{!s} exists. Appending to caltable.".format(caltable))
GainTables = []
interp = []
calto = callibrary.CalTo(self.inputs.vis)
calstate = self.inputs.context.callibrary.get_calstate(calto)
merged = calstate.merged()
for calto, calfroms in merged.items():
calapp = callibrary.CalApplication(calto, calfroms)
GainTables.append(calapp.gaintable)
interp.append(calapp.interp)
GainTables = GainTables[0]
interp = interp[0]
GainTables = self._modifyGainTables(GainTables)
task_inputs = gaincal.GTypeGaincal.Inputs(self.inputs.context,
output_dir='',
vis=self.inputs.vis,
caltable=caltable,
field=field,
intent='',
scan='',
spw=spw,
solint='inf',
gaintype='KCROSS',
combine=combine,
refant=self.RefAntOutput,
minblperant=minBL_for_cal,
parang=True,
append=append)
# gaincal_task = gaincal.GTypeGaincal(task_inputs)
# result = self._executor.execute(gaincal_task, merge=True)
casa_task_args = {'vis': self.inputs.vis,
'caltable': caltable,
'field': field,
'intent': 'CALIBRATE_FLUX#UNSPECIFIED,CALIBRATE_AMPLI#UNSPECIFIED,CALIBRATE_PHASE#UNSPECIFIED,CALIBRATE_BANDPASS#UNSPECIFIED,CALIBRATE_POL_ANGLE#UNSPECIFIED',
'scan': '',
'spw': spw,
'solint': 'inf',
'gaintype': 'KCROSS',
'combine': combine,
'refant': ','.join(self.RefAntOutput),
'gaintable': GainTables,
'interp': interp,
'minblperant': minBL_for_cal,
'parang': True,
'uvrange': '',
'append': append}
fieldobj = m.get_fields(name=field)[0]
fieldid = fieldobj.id
casa_task_args['uvrange'] = uvrange(self.setjy_results, fieldid)
job = casa_tasks.gaincal(**casa_task_args)
try:
self._executor.execute(job)
except Exception as ex:
LOG.warning(ex)
# return result
return True
[docs]
def do_polcal(self, caltable, kcrosstable='', poltype='', field='', intent='',
gainfield=[''], kcrossspwmap=[], solint='inf', minsnr=5.0):
GainTables = []
calto = callibrary.CalTo(self.inputs.vis)
calstate = self.inputs.context.callibrary.get_calstate(calto)
merged = calstate.merged()
for calto, calfroms in merged.items():
calapp = callibrary.CalApplication(calto, calfroms)
GainTables.append(calapp.gaintable)
GainTables = GainTables[0]
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
minBL_for_cal = vla_minbaselineforcal()
spwmap = []
for gaintable in GainTables:
if gaintable in kcrosstable:
spwmap.append(kcrossspwmap)
else:
spwmap.append([])
GainTables = self._modifyGainTables(GainTables)
task_args = {'vis': self.inputs.vis,
'caltable': caltable,
'field': field,
'intent': intent,
'refant': ','.join(self.RefAntOutput),
'gaintable': GainTables,
'poltype': poltype,
'gainfield': gainfield,
'minsnr': minsnr,
'minblperant': minBL_for_cal,
'combine': 'scan',
'spwmap': spwmap,
'solint': solint}
task = casa_tasks.polcal(**task_args)
result = self._executor.execute(task)
calfrom = callibrary.CalFrom(gaintable=caltable, interp='', calwt=False, caltype='polarization')
calto = callibrary.CalTo(self.inputs.vis)
calapp = callibrary.CalApplication(calto, calfrom)
self.inputs.context.callibrary.add(calapp.calto, calapp.calfrom)
return result
def _do_setjy(self):
"""
The code in this private class method are (for now) specific to VLASS
requirements and heuristics.
Returns: string name of the amplitude flux calibrator
"""
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
# fluxfields = m.get_fields(intent='AMPLITUDE')
# fluxfieldnames = [amp.name for amp in fluxfields]
standard_source_names, standard_source_fields = standard_sources(self.inputs.vis)
fluxcal = ''
fluxcalfieldid = None
fluxcalfieldname = ''
for i, fields in enumerate(standard_source_fields):
for myfield in fields:
if standard_source_names[i] in ('3C48', '3C286', '3C138') \
and 'POLANGLE' in m.get_fields(field_id=myfield)[0].intents:
fluxcalfieldid = myfield
fluxcalfieldname = m.get_fields(field_id=myfield)[0].name
fluxcal = standard_source_names[i]
elif standard_source_names[i] in ('3C48', '3C286', '3C138') \
and 'AMPLITUDE' in m.get_fields(field_id=myfield)[0].intents:
fluxcalfieldid = myfield
fluxcalfieldname = m.get_fields(field_id=myfield)[0].name
fluxcal = standard_source_names[i]
else: # J1800+7828
fluxcalfieldid = m.get_fields(intent='POLANGLE')[0].id
fluxcalfieldname = m.get_fields(intent='POLANGLE')[0].name
fluxcal = fluxcalfieldname
try:
task_args = {}
if fluxcal in ('3C286', '1331+3030', '"1331+305=3C286"', 'J1331+3030'):
d0 = 33.0 * math.pi / 180.0
task_args = {'vis': self.inputs.vis,
'field': fluxcalfieldname,
'standard': 'manual',
'spw': '',
'fluxdensity': [9.97326, 0, 0, 0],
'spix': [-0.582142, -0.154655],
'reffreq': '3000.0MHz',
'polindex': [0.107943, 0.01184, -0.0055, 0.0224, -0.0312],
'polangle': [d0, 0],
'rotmeas': 0,
'scalebychan': True,
'usescratch': True}
elif fluxcal in ('3C48', 'J0137+3309', '0137+3309', '"0137+331=3C48"'):
task_args = {'vis': self.inputs.vis,
'field': fluxcalfieldname,
'spw': '',
'selectdata': False,
'timerange': '',
'scan': '',
'intent': '',
'observation': '',
'scalebychan': True,
'standard': 'manual',
'model': '',
'listmodels': False,
'fluxdensity': [6.4861, -0.132, 0.0417, 0],
'spix': [-0.934677, -0.125579],
'reffreq': '3000.0MHz',
'polindex': [0.02143, 0.0392, 0.002349, -0.0230],
'polangle': [-1.7233, 1.569, -2.282, 1.49],
'rotmeas': 0, # inside polangle
'fluxdict': {},
'useephemdir': False,
'interpolation': 'nearest',
'usescratch': True}
elif fluxcal in ('3C138', '0521+1638', 'J0521+1638'):
task_args = {'vis': self.inputs.vis,
'field': fluxcalfieldname,
'spw': '',
'selectdata': False,
'timerange': '',
'scan': '',
'intent': '',
'observation': '',
'scalebychan': True,
'standard': 'manual',
'model': '',
'listmodels': False,
'fluxdensity': [5.471, 0, 0, 0],
'spix': [-0.6432, -0.082],
'reffreq': '3.0GHz',
'polindex': [0.10122, 0.01389, -0.03738, 0.0471, -0.0200],
'polangle': [-0.17557, -0.0163, 0.013, -0.0057],
'rotmeas': 0, # inside polangle
'fluxdict': {},
'useephemdir': False,
'interpolation': 'nearest',
'usescratch': True}
elif fluxcal in ('J1800+7828', '1800+7828'):
task_args = {'vis': self.inputs.vis,
'field': fluxcalfieldname,
'standard': 'manual',
'spw': '',
'fluxdensity': [2.3511, 0, 0, 0],
'spix': [0.1567, -0.104],
'reffreq': '3.0GHz',
'polindex': [0.04709, -0.00860, 0.0096, -0.00285],
'polangle': [0.900, 1.28, -3.10, 5.26, -2.7],
'rotmeas': 0,
'scalebychan': True,
'usescratch': True}
else:
LOG.error("No known flux calibrator found - please check the data.")
# PIPE-1750, added an option to disable setjy call
if self.inputs.run_setjy:
job = casa_tasks.setjy(**task_args)
self._executor.execute(job)
else:
LOG.warning("Setting the polarized flux densities for the polarization angle calibrator within the task was disabled."
"Polarization angle will not be properly calibrated unless the polarized flux densities were set prior to invoking this task."
"Check the RL phase offset vs. Freq and RL delay vs. freq plots to ensure behavior is as expected.")
except Exception as ex:
LOG.warning("Exception: Problem with circfeedpolcal setjy. {!s}".format(str(ex)))
return None
return fluxcalfieldname, fluxcalfieldid, fluxcal
[docs]
def do_spwmap(self):
"""
Returns: spwmap for use with gaintable in callibrary (polcal and applycal)
"""
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
baseband_spws_list = m.get_vla_baseband_spws(science_windows_only=False, return_select_list=True)
baseband_spwstr = [','.join(map(str, spws_list)) for spws_list in baseband_spws_list]
spwmap = []
for spwstr in baseband_spwstr:
spwlist = [int(spw) for spw in spwstr.split(',')]
basebandmap = [spwlist[0]] * len(spwlist)
spwmap.extend(basebandmap)
return spwmap
def _do_clipflag(self, dcaltable):
clipminmax = self.inputs.clipminmax
if isinstance(self.inputs.clipminmax, str):
clipminmax = ast.literal_eval(self.inputs.clipminmax)
task_args = {'vis': dcaltable,
'mode': 'clip',
'datacolumn': 'CPARAM',
'clipminmax': clipminmax,
'correlation': 'ABS_ALL',
'clipoutside': True,
'flagbackup': False,
'savepars': False,
'action': 'apply'}
job = casa_tasks.flagdata(**task_args)
return self._executor.execute(job)