import collections
import os
import shutil
from typing import Any
import pipeline.hif.heuristics.findrefant as findrefant
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.hifv.heuristics import do_bandpass, getCalFlaggedSoln
from pipeline.hifv.heuristics import standard as standard
from pipeline.hifv.heuristics import uvrange, weakbp
from pipeline.hifv.heuristics.lib_EVLApipeutils import vla_minbaselineforcal
from pipeline.hifv.tasks.setmodel.vlasetjy import standard_sources
from pipeline.infrastructure import casa_tasks, casa_tools, task_registry
LOG = infrastructure.get_logger(__name__)
class FinalcalsInputs(vdp.StandardInputs):
"""Inputs class for the hifv_finalcals pipeline task. Used on VLA measurement sets.
The class inherits from vdp.StandardInputs.
"""
weakbp = vdp.VisDependentProperty(default=False)
refantignore = vdp.VisDependentProperty(default='')
refant = vdp.VisDependentProperty(default='')
# docstring and type hints: supplements hifv_finalcals
def __init__(self, context, vis=None, weakbp=None, refantignore=None, refant=None):
"""Initialize Inputs.
Args:
context (:obj:): Pipeline context
vis(str, optional): The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the hifv_importdata task.
weakbp(Boolean): Activate weak bandpass heuristics.
weak bandpass heuristics on/off - currently not used - see PIPE-104.
refantignore(str): String list of antennas to ignore.
csv string of reference antennas to ignore - 'ea24,ea15,ea08'.
refant(List): A csv string of reference antenna(s). When used, disables ``refantignore``.
Example: refant = 'ea01, ea02'
"""
super().__init__()
self.context = context
self.vis = vis
self._weakbp = weakbp
self.refantignore = refantignore
self.refant = refant
class FinalcalsResults(basetask.Results):
"""Results class for the hifv_finalcals pipeline task. Used on VLA measurement sets.
The class inherits from basetask.Results.
"""
def __init__(self, final=None, pool=None, preceding=None, vis=None, bpdgain_touse=None,
gtypecaltable=None, ktypecaltable=None, bpcaltable=None,
phaseshortgaincaltable=None, finalampgaincaltable=None,
finalphasegaincaltable=None, flaggedSolnApplycalbandpass=None,
flaggedSolnApplycaldelay=None):
"""
Args:
final(List, optional): Calibration list applied - not used
pool(List, optional): Calibration list assesed - not used
preceding(List, optional): DEPRECATED results from worker tasks executed by this task
bpdgain_touse(Dict): Dictionary of tables per band
gtypecaltable(Dict): Dictionary of tables per band
ktypecaltable(Dict): Dictionary of tables per band
bpcaltable(Dict): Dictionary of tables per band
phaseshortgaincaltable(Dict): Dictionary of short phase tables per band
finalampgaincaltable(Dict): Amp gain table to be passed to applycal
finalphasegaincaltable(Dict): Phase gain table to be paseed to applycal
flaggedSolnApplycalbandpass(Dict): returned from getCalFlaggedSoln for bpdgain_touse (per band)
flaggedSolnApplycaldelay(Dict): returned from getCalFlaggedSoln for ktypecaltable (per band)
"""
if final is None:
final = []
if pool is None:
pool = []
if preceding is None:
preceding = []
super().__init__()
self.vis = vis
self.pool = pool[:]
self.final = final[:]
self.preceding = preceding[:]
self.bpdgain_touse = bpdgain_touse
self.gtypecaltable = gtypecaltable
self.ktypecaltable = ktypecaltable
self.bpcaltable = bpcaltable
self.phaseshortgaincaltable = phaseshortgaincaltable
self.finalampgaincaltable = finalampgaincaltable
self.finalphasegaincaltable = finalphasegaincaltable
self.flaggedSolnApplycalbandpass = flaggedSolnApplycalbandpass
self.flaggedSolnApplycaldelay = flaggedSolnApplycaldelay
def merge_with_context(self, context):
if not self.final:
LOG.error('No results to merge')
return
m = context.observing_run.get_ms(self.vis)
context.evla['msinfo'][m.name].phaseshortgaincaltable = self.phaseshortgaincaltable
context.evla['msinfo'][m.name].finalampgaincaltable = self.finalampgaincaltable
for calapp in self.final:
LOG.debug('Adding calibration to callibrary:\n'
'%s\n%s' % (calapp.calto, calapp.calfrom))
context.callibrary.add(calapp.calto, calapp.calfrom)
[docs]
@task_registry.set_equivalent_casa_task('hifv_finalcals')
class Finalcals(basetask.StandardTaskTemplate):
"""Class for the finalcals pipeline task. Used on VLA measurement sets.
The class inherits from basetask.StandardTaskTemplate
"""
Inputs = FinalcalsInputs
[docs]
def prepare(self):
"""Bulk of task execution occurs here.
Args:
None
Returns:
FinalcalsResults()
"""
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
spw2band = m.get_vla_spw2band()
band2spw = collections.defaultdict(list)
spwobjlist = m.get_spectral_windows(science_windows_only=True)
listspws = [spw.id for spw in spwobjlist]
for spw, band in spw2band.items():
if spw in listspws: # Science intents only
band2spw[band].append(str(spw))
self.pool = []
self.final = []
bpdgain_touse, gtypecaltable, ktypecaltable, bpcaltable, phaseshortgaincaltable, \
finalampgaincaltable, finalphasegaincaltable, \
flaggedSolnApplycalbandpass, flaggedSolnApplycaldelay = self._do_finalscals(band2spw)
return FinalcalsResults(vis=self.inputs.vis, pool=self.pool, final=self.final,
bpdgain_touse=bpdgain_touse, gtypecaltable=gtypecaltable,
ktypecaltable=ktypecaltable, bpcaltable=bpcaltable,
phaseshortgaincaltable=phaseshortgaincaltable,
finalampgaincaltable=finalampgaincaltable,
finalphasegaincaltable=finalphasegaincaltable,
flaggedSolnApplycalbandpass=flaggedSolnApplycalbandpass,
flaggedSolnApplycaldelay=flaggedSolnApplycaldelay)
def _do_finalscals(self, band2spw):
"""Execute finalcals heuristics
Args:
band2spw(Dict): Band dictionary with key/value as band/spws
Returns:
bpdgain_touse(str): bp'd gain table used
gtypecaltable(str): G-type table from gaincal
ktypecaltable(str): K-type table from gaincal
bpcaltable(str): BP cal table
phaseshortgaincaltable(Dict): Dictionary of short phase tables per band
finalampgaincaltable(Dict): Amp gain table to be passed to applycal
finalphasegaincaltable(Dict): Phase gain table to be paseed to applycal
flaggedSolnApplycalbandpass(Dict): returned from getCalFlaggedSoln for bpdgain_tous
flaggedSolnApplycaldelay(Dict): returned from getCalFlaggedSoln for ktypecaltable
"""
self.parang = True
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
# PIPE-2164: getting setjy result stored in context
self.setjy_results = self.inputs.context.evla['msinfo'][m.name].setjy_results
try:
stage_number = self.inputs.context.results[-1].read()[0].stage_number + 1
except Exception as e:
stage_number = self.inputs.context.results[-1].read().stage_number + 1
tableprefix = os.path.basename(self.inputs.vis) + '.' + 'hifv_finalcals.s'
gtypecaltable = tableprefix + str(stage_number) + '_1.' + 'finaldelayinitialgain.tbl'
ktypecaltable = tableprefix + str(stage_number) + '_2.' + 'finaldelay.tbl'
bpcaltable = tableprefix + str(stage_number) + '_4.' + 'finalBPcal.tbl'
tablebase = tableprefix + str(stage_number) + '_3.' + 'finalBPinitialgain'
table_suffix = ['.tbl', '3.tbl', '10.tbl']
self.ignorerefant = self.inputs.context.evla['msinfo'][m.name].ignorerefant
refantignore = utils.build_refantignore(refantignore=self.inputs.refantignore, ignorerefant=self.ignorerefant)
refantfield = self.inputs.context.evla['msinfo'][m.name].calibrator_field_select_string
# PIPE-595: if refant list is not provided, compute refants else use provided refant list.
if len(self.inputs.refant) == 0:
refantobj = findrefant.RefAntHeuristics(vis=self.inputs.vis, field=refantfield,
geometry=True, flagging=True, intent='',
spw='', refantignore=refantignore)
RefAntOutput = refantobj.calculate()
else:
RefAntOutput = self.inputs.refant.split(",")
refAnt = ','.join(RefAntOutput)
LOG.info("The pipeline will use antenna(s) " + refAnt + " as the reference")
for band, spwlist in band2spw.items():
LOG.info("Executing G-type delaycal for band {!s} spws: {!s}".format(band, ','.join(spwlist)))
self._do_gtype_delaycal(caltable=gtypecaltable, refAnt=refAnt, spwlist=spwlist)
for band, spwlist in band2spw.items():
LOG.info("Executing K-type delaycal for band {!s} spws: {!s}".format(band, ','.join(spwlist)))
self._do_ktype_delaycal(caltable=ktypecaltable, addcaltable=gtypecaltable, refAnt=refAnt,
spw=','.join(spwlist))
LOG.info("Delay calibration complete")
# Do initial gaincal on BP calibrator then semi-final BP calibration
for band, spwlist in band2spw.items():
gain_solint1 = self.inputs.context.evla['msinfo'][m.name].gain_solint1[band]
self._do_gtype_bpdgains(tablebase + table_suffix[0], addcaltable=ktypecaltable,
solint=gain_solint1, refAnt=refAnt, spwlist=spwlist)
bpdgain_touse = tablebase + table_suffix[0]
LOG.info("Initial BP gain calibration complete")
for band, spwlist in band2spw.items():
append = False
isdir = os.path.isdir(bpcaltable)
if isdir:
append = True
LOG.info("Appending to existing table: {!s}".format(bpcaltable))
if self.inputs.weakbp:
LOG.debug("USING WEAKBP HEURISTICS")
interp = weakbp(self.inputs.vis, bpcaltable, context=self.inputs.context, RefAntOutput=RefAntOutput,
ktypecaltable=ktypecaltable, bpdgain_touse=bpdgain_touse, solint='inf', append=append,
executor=self._executor, spw=','.join(spwlist))
else:
LOG.debug("Using REGULAR heuristics")
interp = ''
do_bandpass(self.inputs.vis, bpcaltable, context=self.inputs.context, RefAntOutput=RefAntOutput,
spw=','.join(spwlist), ktypecaltable=ktypecaltable, bpdgain_touse=bpdgain_touse, solint='inf', append=append,
executor=self._executor)
LOG.info("Bandpass calibration complete")
# Derive an average phase solution for the bandpass calibrator to apply
# to all data to make QA plots easier to interpret.
refantmode = 'flex'
intents = list(m.intents)
if [intent for intent in intents if 'POL' in intent]:
# set to strict
refantmode = 'strict'
avgpgain = tableprefix + str(stage_number) + '_5.' + 'averagephasegain.tbl'
for band, spwlist in band2spw.items():
self._do_avgphasegaincal(avgpgain, refAnt,
ktypecaltable=ktypecaltable, bpcaltable=bpcaltable, refantmode=refantmode,
spw=','.join(spwlist))
# In case any antenna is flagged by this process, unflag all solutions
# in this gain table (if an antenna does exist or has bad solutions from
# other steps, it will be flagged by those gain tables).
self._do_unflag(avgpgain)
self._do_applycal(ktypecaltable=ktypecaltable,
bpcaltable=bpcaltable, avgphasegaincaltable=avgpgain, interp=interp)
# ---------------------------------------------------
calMs = 'finalcalibrators.ms'
isdir = os.path.isdir(calMs)
if isdir:
shutil.rmtree(calMs)
split_result = self._do_split(calMs, '') # , ','.join(spwlist))
# Run setjy execution per band
all_sejy_result = self._doall_setjy(calMs)
LOG.info("Using power-law fits results from fluxscale.")
for fs_result in self.inputs.context.evla['msinfo'][m.name].fluxscale_result:
powerfit_setjy = self._do_powerfitsetjy(calMs, fs_result)
phaseshortgaincaltable = tableprefix + str(stage_number) + '_6.' + 'phaseshortgaincal.tbl'
finalampgaincaltable = tableprefix + str(stage_number) + '_7.' + 'finalampgaincal.tbl'
finalphasegaincaltable = tableprefix + str(stage_number) + '_8.' + 'finalphasegaincal.tbl'
for band, spwlist in band2spw.items():
try:
new_gain_solint1 = self.inputs.context.evla['msinfo'][m.name].new_gain_solint1[band]
self._do_calibratorgaincal(calMs, phaseshortgaincaltable,
new_gain_solint1, 3.0, 'p', [''], refAnt,
refantmode=refantmode, spw=','.join(spwlist))
gain_solint2 = self.inputs.context.evla['msinfo'][m.name].gain_solint2[band]
self._do_calibratorgaincal(calMs, finalampgaincaltable, gain_solint2, 5.0,
'ap', [phaseshortgaincaltable], refAnt,
refantmode=refantmode, spw=','.join(spwlist))
self._do_calibratorgaincal(calMs, finalphasegaincaltable, gain_solint2,
3.0, 'p', [finalampgaincaltable], refAnt,
refantmode=refantmode, spw=','.join(spwlist))
except KeyError as ex:
LOG.warning("No data found for {!s} band".format(ex))
except Exception as ex:
LOG.warning(str(ex))
tablesToAdd = [(ktypecaltable, '', ''), (bpcaltable, 'linear,linearflag', ''),
(avgpgain, '', ''), (finalampgaincaltable, '', ''),
(finalphasegaincaltable, '', '')]
# tablesToAdd = [(table, interp, gainfield) for table, interp, gainfield in tablesToAdd]
callist = []
for addcaltable, interp, gainfield in tablesToAdd:
if not os.path.exists(addcaltable):
continue
LOG.info("Finalcals stage: Adding " + addcaltable + " to callibrary.")
calto = callibrary.CalTo(self.inputs.vis)
calfrom = callibrary.CalFrom(gaintable=addcaltable, interp=interp, calwt=False,
caltype='finalcal', gainfield=gainfield)
calapp = callibrary.CalApplication(calto, calfrom)
callist.append(calapp)
self.pool.append(calapp)
self.final.append(calapp)
flaggedSolnApplycaldelay = None
flaggedSolnApplycalbandpass = None
if os.path.exists(bpdgain_touse):
flaggedSolnApplycalbandpass = getCalFlaggedSoln(bpdgain_touse)
if os.path.exists(ktypecaltable):
flaggedSolnApplycaldelay = getCalFlaggedSoln(ktypecaltable)
return bpdgain_touse, gtypecaltable, ktypecaltable, bpcaltable, phaseshortgaincaltable, \
finalampgaincaltable, finalphasegaincaltable, flaggedSolnApplycalbandpass, flaggedSolnApplycaldelay
[docs]
def analyse(self, results):
"""Determine the best parameters by analysing the given jobs before returning any final jobs to execute.
Override method of basetask.StandardTaskTemplate.analyze()
Args:
results (list of class: `~pipeline.infrastructure.jobrequest.JobRequest`):
the job requests generated by :func:`~SimpleTask.prepare`
Returns:
class:`~pipeline.api.Result`
"""
return results
def _do_gtype_delaycal(
self, caltable: str | None = None, refAnt: str | None = None, spwlist: list[str] | None = None
) -> bool:
"""Perform G-Type delay calibration using CASA task gaincal.
This function executes a G-Type delay calibration, which is used to determine and correct
for instrumental delays in radio interferometry data. The calibration can either create
a new calibration table or append to an existing one.
Args:
caltable: Name of the calibration table to create or append to. If None, a default
name will be generated.
refAnt: Reference antenna specification in format 'ea01,ea24,...'. If None, CASA
will automatically select reference antennas.
spwlist: Spectral window identifiers for the target band, e.g., ['0', '1', '2'].
If None, all available spectral windows will be used.
Returns:
True if calibration completed successfully, False otherwise.
"""
if spwlist is None:
spwlist = []
# Determine whether to append to existing table or create new one
append = False
if caltable and os.path.isdir(caltable):
append = True
LOG.info('Appending to existing calibration table: %s', caltable)
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
delay_field_select_string = self.inputs.context.evla['msinfo'][m.name].delay_field_select_string
tst_delay_spw = m.get_vla_tst_bpass_spw(spwlist=spwlist)
delay_scan_select_string = self.inputs.context.evla['msinfo'][m.name].delay_scan_select_string
minBL_for_cal = vla_minbaselineforcal()
delaycal_task_args = {'vis': self.inputs.vis,
'caltable': caltable,
'field': '',
'spw': tst_delay_spw,
'intent': '',
'selectdata': True,
'uvrange': '',
'scan': delay_scan_select_string,
'solint': 'int',
'combine': 'scan',
'preavg': -1.0,
'refant': refAnt.lower(),
'minblperant': minBL_for_cal,
'minsnr': 3.0,
'solnorm': False,
'gaintype': 'G',
'smodel': [],
'calmode': 'p',
'append': append,
'docallib': False,
'gaintable': sorted(self.inputs.context.callibrary.active.get_caltable()),
'gainfield': [''],
'interp': [''],
'spwmap': [],
'parang': self.parang}
fields = delay_field_select_string.split(',')
for fieldidstring in fields:
fieldid = int(fieldidstring)
uvrangestring = uvrange(self.setjy_results, fieldid)
delaycal_task_args['field'] = fieldidstring
delaycal_task_args['uvrange'] = uvrangestring
if os.path.exists(caltable):
delaycal_task_args['append'] = True
job = casa_tasks.gaincal(**delaycal_task_args)
self._executor.execute(job)
return True
def _do_ktype_delaycal(self, caltable: str = None, addcaltable: str = None,
refAnt: str = None, spw: str = '') -> bool:
"""Perform a K-Type delay calibration with CASA task gaincal
Args:
caltable(str): Name of the caltable to be created
addcaltable(str): String name of table to temporarily be added to the gaincal gaintable parameter
refAnt(str): Antenna values to use as reference antennas - 'ea01,ea24,...'
spw(str): csv string values for spws pertaining to the particular band - '0,1,2,3,4,5,6'
Returns:
Boolean
"""
append = False
isdir = os.path.isdir(caltable)
if isdir:
append = True
LOG.info("Appending to existing table: {!s}".format(caltable))
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
delay_field_select_string = self.inputs.context.evla['msinfo'][m.name].delay_field_select_string
delay_scan_select_string = self.inputs.context.evla['msinfo'][m.name].delay_scan_select_string
minBL_for_cal = vla_minbaselineforcal()
GainTables = sorted(self.inputs.context.callibrary.active.get_caltable())
GainTables.append(addcaltable)
GainTables_present = []
for gt in GainTables:
if os.path.exists(gt):
GainTables_present.append(gt)
else:
LOG.warning(f"{gt} not found, removing it from list of gain tables")
GainTables = GainTables_present
delaycal_task_args = {'vis': self.inputs.vis,
'caltable': caltable,
'field': '',
'spw': spw,
'intent': '',
'selectdata': True,
'uvrange': '',
'scan': delay_scan_select_string,
'solint': 'inf',
'combine': 'scan',
'preavg': -1.0,
'refant': refAnt.lower(),
'minblperant': minBL_for_cal,
'minsnr': 3.0,
'solnorm': False,
'gaintype': 'K',
'smodel': [],
'calmode': 'p',
'append': append,
'docallib': False,
'gaintable': GainTables,
'gainfield': [''],
'interp': [''],
'spwmap': [],
'parang': self.parang}
for fieldidstring in delay_field_select_string.split(','):
fieldid = int(fieldidstring)
uvrangestring = uvrange(self.setjy_results, fieldid)
delaycal_task_args['field'] = fieldidstring
delaycal_task_args['uvrange'] = uvrangestring
if os.path.exists(caltable):
delaycal_task_args['append'] = True
job = casa_tasks.gaincal(**delaycal_task_args)
self._executor.execute(job)
return True
def _do_gtype_bpdgains(
self,
caltable: str,
addcaltable: str | None = None,
solint: str = 'int',
refAnt: str | None = None,
spwlist: list[str] | None = None,
) -> bool:
"""Perform G-Type calibration with CASA task gaincal on the bandpass-corrected gain table.
Args:
caltable: Name of the calibration table to be created.
addcaltable: Name of table to temporarily add to the gaincal gaintable parameter.
solint: Solution interval value for CASA task gaincal.
refAnt: Reference antenna values in format 'ea01,ea24,...'.
spwlist: List of spectral window IDs for the specific band, e.g., ['0', '1', '2'].
Returns:
Success status of the calibration operation.
"""
if spwlist is None:
spwlist = []
append = False
isdir = os.path.isdir(caltable)
if isdir:
append = True
LOG.info('Appending to existing table: %s', caltable)
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
tst_bpass_spw = m.get_vla_tst_bpass_spw(spwlist=spwlist)
bandpass_scan_select_string = self.inputs.context.evla['msinfo'][m.name].bandpass_scan_select_string
minBL_for_cal = vla_minbaselineforcal()
GainTables = sorted(self.inputs.context.callibrary.active.get_caltable())
GainTables.append(addcaltable)
GainTables_present = []
for gt in GainTables:
if os.path.exists(gt):
GainTables_present.append(gt)
else:
LOG.warning(f"{gt} not found, removing it from list of gain tables")
GainTables = GainTables_present
bpdgains_task_args = {'vis': self.inputs.vis,
'caltable': caltable,
'field': '',
'spw': tst_bpass_spw,
'intent': '',
'selectdata': True,
'uvrange': '',
'scan': '',
'solint': solint,
'combine': 'scan',
'preavg': -1.0,
'refant': refAnt.lower(),
'minblperant': minBL_for_cal,
'minsnr': 3.0,
'solnorm': False,
'gaintype': 'G',
'smodel': [],
'calmode': 'p',
'append': append,
'docallib': False,
'gaintable': GainTables,
'gainfield': [''],
'interp': [''],
'spwmap': [],
'parang': self.parang}
bpscanslist = list(map(int, bandpass_scan_select_string.split(',')))
scanobjlist = m.get_scans(scan_id=bpscanslist)
fieldidlist = []
for scanobj in scanobjlist:
fieldobj, = scanobj.fields
if str(fieldobj.id) not in fieldidlist:
fieldidlist.append(str(fieldobj.id))
for fieldidstring in fieldidlist:
fieldid = int(fieldidstring)
uvrangestring = uvrange(self.setjy_results, fieldid)
bpdgains_task_args['field'] = fieldidstring
bpdgains_task_args['uvrange'] = uvrangestring
if os.path.exists(caltable):
bpdgains_task_args['append'] = True
job = casa_tasks.gaincal(**bpdgains_task_args)
self._executor.execute(job)
return True
def _do_avgphasegaincal(self, caltable: str, refAnt: str, ktypecaltable: str = None,
bpcaltable: str = None, refantmode: str = 'flex',
spw: str = '') -> bool:
"""Perform a G-Type cal with CASA task gaincal for average phase
Args:
caltable(str): Name of the caltable to be created
refAnt(str): Antenna values to use as reference antennas - 'ea01,ea24,...'
ktypecaltable(str): K cal table
bpcaltable(str): BP caltable
refantmode(str): Default of flex
spw(str): spw string
"""
append = False
isdir = os.path.isdir(caltable)
if isdir:
append = True
LOG.info("Appending to existing table: {!s}".format(caltable))
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
bandpass_field_select_string = self.inputs.context.evla['msinfo'][m.name].bandpass_field_select_string
bandpass_scan_select_string = self.inputs.context.evla['msinfo'][m.name].bandpass_scan_select_string
minBL_for_cal = vla_minbaselineforcal()
AllCalTables = sorted(self.inputs.context.callibrary.active.get_caltable())
AllCalTables.append(ktypecaltable)
AllCalTables.append(bpcaltable)
GainTables_present = []
for gt in AllCalTables:
if os.path.exists(gt):
GainTables_present.append(gt)
else:
LOG.warning(f"{gt} not found, removing it from list of gain tables")
AllCalTables = GainTables_present
avgphasegaincal_task_args = {'vis': self.inputs.vis,
'caltable': caltable,
'field': '',
'spw': spw,
'selectdata': True,
'uvrange': '',
'scan': '',
'solint': 'inf',
'combine': 'scan',
'preavg': -1.0,
'refant': refAnt.lower(),
'minblperant': minBL_for_cal,
'minsnr': 1.0,
'solnorm': False,
'gaintype': 'G',
'smodel': [],
'calmode': 'p',
'append': append,
'docallib': False,
'gaintable': AllCalTables,
'gainfield': [''],
'interp': [''],
'spwmap': [],
'parang': self.parang,
'refantmode': refantmode}
bpscanslist = list(map(int, bandpass_scan_select_string.split(',')))
scanobjlist = m.get_scans(scan_id=bpscanslist)
allfieldidlist = []
for scanobj in scanobjlist:
fieldobj, = scanobj.fields
if str(fieldobj.id) not in allfieldidlist:
allfieldidlist.append(str(fieldobj.id))
# See vlascanheuristics - only use the first bandpass calibrator
fieldidlist = [fieldid for fieldid in allfieldidlist if fieldid in bandpass_field_select_string]
for fieldidstring in fieldidlist:
fieldid = int(fieldidstring)
uvrangestring = uvrange(self.setjy_results, fieldid)
avgphasegaincal_task_args['field'] = fieldidstring
avgphasegaincal_task_args['uvrange'] = uvrangestring
if os.path.exists(caltable):
avgphasegaincal_task_args['append'] = True
job = casa_tasks.gaincal(**avgphasegaincal_task_args)
self._executor.execute(job)
return True
def _do_unflag(self, gaintable: str):
"""Execute flagdata with mode unflag
Args:
gaintable(str): Name of table for parameter vis
Returns:
Executed job
"""
task_args = {'vis': gaintable,
'mode': 'unflag',
'action': 'apply',
'display': '',
'flagbackup': False,
'savepars': False}
job = casa_tasks.flagdata(**task_args)
return self._executor.execute(job)
def _do_applycal(self, ktypecaltable: str = None, bpcaltable: str = None,
avgphasegaincaltable: str = None, interp: str = None, spw: str = ''):
"""Run CASA task applycal
Args:
ktypecaltable(str): output from K-type gaincal
bpcaltable(str): BP caltable to use
avgphasegaincaltable(str): gain table
interp(str): applycal CASA task keyword
spw(str): csv string values for spws pertaining to the particular band - '0,1,2,3,4,5,6'
Returns:
Executed job
"""
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
calibrator_scan_select_string = self.inputs.context.evla['msinfo'][m.name].calibrator_scan_select_string
AllCalTables = sorted(self.inputs.context.callibrary.active.get_caltable())
AllCalTables.append(ktypecaltable)
AllCalTables.append(bpcaltable)
AllCalTables.append(avgphasegaincaltable)
GainTables_present = []
for gt in AllCalTables:
if os.path.exists(gt):
GainTables_present.append(gt)
else:
LOG.warning(f"{gt} not found, removing it from list of gain tables")
AllCalTables = GainTables_present
ntables = len(AllCalTables)
applycal_task_args = {'vis': self.inputs.vis,
'field': '',
'spw': spw,
'intent': '',
'selectdata': True,
'scan': calibrator_scan_select_string,
'docallib': False,
'gaintable': AllCalTables,
'gainfield': [''],
'interp': [interp],
'spwmap': [],
'calwt': [False] * ntables,
'parang': self.parang,
'applymode': 'calflagstrict',
'flagbackup': True}
job = casa_tasks.applycal(**applycal_task_args)
return self._executor.execute(job)
def _do_split(self, calMs: str, spw: str = ''):
"""Split off the corrected data column / cal scans
Args:
calMs(str): name of output MS
spw(str): spw selection
Returns:
Executed job
"""
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
calibrator_scan_select_string = self.inputs.context.evla['msinfo'][m.name].calibrator_scan_select_string
task_args = {'vis': m.name,
'outputvis': calMs,
'datacolumn': 'corrected',
'keepmms': True,
'field': '',
'spw': spw,
'width': 1,
'antenna': '',
'timebin': '0s',
'timerange': '',
'scan': calibrator_scan_select_string,
'intent': '',
'array': '',
'uvrange': '',
'correlation': '',
'observation': '',
'keepflags': False}
job = casa_tasks.split(**task_args)
return self._executor.execute(job)
def _doall_setjy(self, calMs: str) -> bool:
"""Execute setjy with the appropriate model image
Args:
calMS: calibrator MS
Returns:
Boolean
"""
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
field_spws = m.get_vla_field_spws()
spw2band = m.get_vla_spw2band()
standard_source_names, standard_source_fields = standard_sources(calMs)
# Look in spectral window domain object as this information already exists!
with casa_tools.TableReader(self.inputs.vis + '/SPECTRAL_WINDOW') as table:
spw_bandwidths = table.getcol('TOTAL_BANDWIDTH')
reference_frequencies = table.getcol('REF_FREQUENCY')
center_frequencies = [rf + spwbw / 2 for rf, spwbw in zip(reference_frequencies, spw_bandwidths)]
for i, fields in enumerate(standard_source_fields):
for myfield in fields:
domainfield = m.get_fields(myfield)[0]
if 'AMPLITUDE' in domainfield.intents:
jobs = []
VLAspws = field_spws[myfield]
strlistVLAspws = ','.join(str(spw) for spw in VLAspws)
spws = [spw for spw in m.get_spectral_windows(strlistVLAspws)]
for spw in spws:
reference_frequency = center_frequencies[spw.id]
EVLA_band = spw2band[spw.id]
LOG.info("Center freq for spw " + str(spw.id) + " = " + str(
reference_frequency) + ", observing band = " + EVLA_band)
model_image = standard_source_names[i] + '_' + EVLA_band + '.im'
LOG.info(
"Setting model for field " + str(myfield) + " spw " + str(spw.id) + " using " + model_image)
# Double check, but the fluxdensity=-1 should not matter since
# the model image take precedence
try:
job = self._do_setjy(calMs, str(myfield), str(spw.id), model_image)
jobs.append(job)
# result.measurements.update(setjy_result.measurements)
except Exception:
# something has gone wrong, return an empty result
LOG.warning(
"SetJy issue with field id=" + str(job.kw['field']) + " and spw=" + str(job.kw['spw']))
LOG.info("Merging flux scaling operation for setjy jobs for " + self.inputs.vis)
jobs_and_components = utils.merge_jobs(jobs, casa_tasks.setjy, merge=('spw',))
for job, _ in jobs_and_components:
try:
self._executor.execute(job)
except Exception:
LOG.warning(
"SetJy issue with field id=" + str(job.kw['field']) + " and spw=" + str(job.kw['spw']))
return True
def _do_setjy(self, calMs: str, field: str, spw: str, model_image: str) -> bool:
"""Execute CASA task setjy
Args:
field(str): field id
spw(str): spw id
model_image(str): Model name on disk
Returns:
Job to execute
"""
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
# PIPE-2904: To work with multiple sources with Flux calibration
# intent, doing a safe split.
fluxcalfieldlist = utils.safe_split(self.inputs.context.evla['msinfo'][m.name].flux_field_select_string)
# PIPE-1729, setting fluxdensity to 1 for calibrators failed in hifv_fluxboot.
fluxdensity, setjy_standard = [-1, standard.Standard()(field)] if os.path.isdir(
'fluxgaincalFcal_{!s}.g'.format(field)) or field in fluxcalfieldlist else [1, 'manual']
if fluxdensity == 1:
LOG.warning(f'Running setjy for field {field} with 1.0 Jy fluxdensity.')
try:
task_args = {'vis': calMs,
'field': field,
'spw': spw,
'selectdata': False,
'model': model_image,
'listmodels': False,
'scalebychan': True,
'fluxdensity': fluxdensity,
'standard': setjy_standard,
'usescratch': True}
job = casa_tasks.setjy(**task_args)
return job
except Exception as e:
LOG.info(str(e))
return None
def _do_powerfitsetjy(self, calMs: str, fluxscale_result: dict[str, Any]) -> bool:
"""Execute setjy with results from fluxscale.
Args:
calMs: Input name of calibrator MS
fluxscale_result: Output result from CASA task fluxscale
Returns:
Success status of the operation
"""
LOG.info('Setting power-law fit in the model column')
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
# fluxscale_result = self.inputs.context.evla['msinfo'][m.name].fluxscale_result
dictkeys = list(fluxscale_result.keys())
keys_to_remove = ['freq', 'spwName', 'spwID']
dictkeys = [field_id for field_id in dictkeys if field_id not in keys_to_remove]
for fieldid in dictkeys:
jobs_calMs = []
spws = list(fluxscale_result['spwID'])
scispws = [spw.id for spw in m.get_spectral_windows(science_windows_only=True)]
newspws = [str(spwint) for spwint in list(set(scispws) & set(spws))]
try:
LOG.info('Running setjy for field ' + str(fieldid) + ': ' + str(fluxscale_result[fieldid]['fieldName']))
task_args = {'vis': calMs,
'field': fluxscale_result[fieldid]['fieldName'],
'spw': ','.join(newspws),
'selectdata': False,
'model': '',
'listmodels': False,
'scalebychan': True,
'fluxdensity': [fluxscale_result[fieldid]['fitFluxd'], 0, 0, 0],
'spix': list(fluxscale_result[fieldid]['spidx'][1:]),
'reffreq': str(fluxscale_result[fieldid]['fitRefFreq']) + 'Hz',
'standard': 'manual',
'usescratch': True}
# job = casa_tasks.setjy(**task_args)
jobs_calMs.append(casa_tasks.setjy(**task_args))
except Exception as e:
LOG.info(e)
# merge identical jobs into one job with a multi-spw argument
LOG.info("Merging setjy jobs for finalcalibrators.ms")
jobs_and_components_calMs = utils.merge_jobs(jobs_calMs, casa_tasks.setjy, merge=('spw',))
for job, _ in jobs_and_components_calMs:
self._executor.execute(job)
return True
def _do_calibratorgaincal(
self,
calMs: str,
caltable: str,
solint: str,
minsnr: float,
calmode: str,
gaintablelist: list[str],
refAnt: str,
refantmode: str = 'flex',
spw: str = '',
) -> bool:
"""Execute the CASA task gaincal with the calibrator MS using a provided list of gaintables.
Args:
calMs: Input name of calibrator MS
caltable: Output caltable
solint: Gaincal solution interval
minsnr: Minimum SNR threshold
calmode: Specified calibration mode
gaintablelist: List of gaintable names to apply
refAnt: CSV string of reference antennas
refantmode: Reference antenna mode, defaults to 'flex'
spw: Spectral window selection string
Returns:
True if calibration succeeds, False otherwise
"""
append = False
isdir = os.path.isdir(caltable)
if isdir:
append = True
LOG.info('Appending to existing table: %s', caltable)
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
calibrator_scan_select_string = self.inputs.context.evla['msinfo'][m.name].calibrator_scan_select_string
scanlist = [int(scan) for scan in calibrator_scan_select_string.split(',')]
scanids_perband = ','.join([str(scan.id) for scan in m.get_scans(scan_id=scanlist, spw=spw)])
minBL_for_cal = vla_minbaselineforcal()
task_args = {'vis': calMs,
'caltable': caltable,
'field': '',
'spw': spw,
'intent': '',
'selectdata': False,
'solint': solint,
'combine': 'scan',
'preavg': -1.0,
'refant': refAnt.lower(),
'minblperant': minBL_for_cal,
'minsnr': minsnr,
'solnorm': False,
'gaintype': 'G',
'smodel': [],
'calmode': calmode,
'append': append,
'gaintable': gaintablelist,
'gainfield': [''],
'interp': [''],
'spwmap': [],
'parang': self.parang,
'uvrange': '',
'refantmode': refantmode}
calscanslist = list(map(int, scanids_perband.split(',')))
scanobjlist = m.get_scans(scan_id=calscanslist,
scan_intent=['AMPLITUDE', 'BANDPASS', 'POLLEAKAGE', 'POLANGLE',
'PHASE', 'POLARIZATION', 'CHECK'])
fieldidlist = []
for scanobj in scanobjlist:
fieldobj, = scanobj.fields
if str(fieldobj.id) not in fieldidlist:
fieldidlist.append(str(fieldobj.id))
for fieldidstring in fieldidlist:
taql = (f"FIELD_ID == {fieldidstring}")
if utils.get_row_count(calMs, taql) != 0:
fieldid = int(fieldidstring)
uvrangestring = uvrange(self.setjy_results, fieldid)
task_args['field'] = fieldidstring
task_args['uvrange'] = uvrangestring
task_args['selectdata'] = True
if os.path.exists(caltable):
task_args['append'] = True
job = casa_tasks.gaincal(**task_args)
self._executor.execute(job)
return True