"""The k2jycal task to perform the calibration of Jy/K conversion."""
from __future__ import annotations
import copy
import os
import numpy as np
from typing import TYPE_CHECKING, Any
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry
from pipeline.infrastructure.utils import relative_path
from pipeline.h.heuristics import caltable as caltable_heuristic
from pipeline.hsd.tasks.common import observatory_policy
from . import jyperkreader
if TYPE_CHECKING:
from pipeline.infrastructure.launcher import Context
from pipeline.infrastructure.callibrary import CalApplication
LOG = infrastructure.logging.get_logger(__name__)
QUERIED_FACTOR_FILE = 'jyperk_query.csv' # filename of the queried factor file
class SDK2JyCalInputs(vdp.StandardInputs):
"""Inputs class for SDK2JyCal task."""
reffile = vdp.VisDependentProperty(default='jyperk.csv')
dbservice = vdp.VisDependentProperty(default=True)
endpoint = vdp.VisDependentProperty(default='asdm')
caltype = vdp.VisDependentProperty(default='amp', readonly=True)
@vdp.VisDependentProperty
def infiles(self) -> str:
"""Return name of MS. Alias for "vis" attribute."""
return self.vis
@infiles.convert
def infiles(self, value: str | list[str]) -> str | list[str]:
"""Convert value into expected type.
Currently, no conversion is performed.
Args:
value: Name of MS, or the list of names
Returns:
Converted value. Currently return input value as is.
"""
self.vis = value
return value
@vdp.VisDependentProperty
def caltable(self):
"""Get the caltable argument for these inputs.
If set to a table-naming heuristic, this should give a sensible name
considering the current CASA task arguments.
"""
namer = caltable_heuristic.AmpCaltable()
# ignore caltable to avoid circular reference
casa_args = self._get_task_args(ignore=('caltable',))
return relative_path(namer.calculate(output_dir=self.output_dir,
stage=self.context.stage,
**casa_args))
def to_casa_args(self) -> dict[str, str]:
"""Convert Inputs instance into dictionary.
Returns:
kwargs for CASA task
"""
return {'vis': self.vis,
'caltable': self.caltable,
'caltype': self.caltype,
'endpoint': self.endpoint}
# docstring and type hints: supplements hsd_k2jycal
def __init__(
self,
context: Context,
output_dir: str | None = None,
infiles: str | list[str] | None = None,
caltable: str | list[str] | None = None,
reffile: str | None = None,
dbservice: bool | None = None,
endpoint: str | None = None
) -> None:
"""Initialize SDK2JyCalInputs instance.
Args:
context: Pipeline context
output_dir: Output directory.
Defaults to None, which corresponds to the current working directory.
infiles: List of input MeasurementSets.
Example: vis='ngc5921.ms'
caltable: Name of output gain calibration tables.
Name is automatically created from infiles if None is given.
Example: caltable='ngc5921.gcal'
reffile: Path to a file containing Jy/K factors for science data,
which must be provided by associating calibrator reduction or the observatory
measurements. Jy/K factor must take into account all efficiencies, i.e.,
it must be a direct conversion factor from Ta* to Jy. The file must be
in either MS-based or session-based format. The MS-based format must
be in an CSV format with five fields: MS name, antenna name, spectral
window id, polarization string, and Jy/K conversion factor. Example for
the file is as follows::
MS,Antenna,Spwid,Polarization,Factor
uid___A002_X316307_X6f.ms,CM03,5,XX,10.0
uid___A002_X316307_X6f.ms,CM03,5,YY,12.0
uid___A002_X316307_X6f.ms,PM04,5,XX,2.0
uid___A002_X316307_X6f.ms,PM04,5,YY,5.0
The first line in the above example is a header which may or may not
exist. Example for the session-based format is as follows::
#OUSID=XXXXXX
#OBJECT=Uranus
#FLUXJY=yy,zz,aa
#FLUXFREQ=YY,ZZ,AA
#sessionID,ObservationStartDate(UTC),ObservationEndDate(UTC),
Antenna,BandCenter(MHz),BandWidth(MHz),POL,Factor
1,2011-11-11 01:00:00,2011-11-11 01:30:00,CM02,86243.0,500.0,I,10.0
1,2011-11-11 01:00:00,2011-11-11 01:30:00,CM02,86243.0,1000.0,I,30.0
1,2011-11-11 01:00:00,2011-11-11 01:30:00,CM03,86243.0,500.0,I,50.0
1,2011-11-11 01:00:00,2011-11-11 01:30:00,CM03,86243.0,1000.0,I,70.0
1,2011-11-11 01:00:00,2011-11-11 01:30:00,ANONYMOUS,86243.0,500.0,I,30.0
1,2011-11-11 01:00:00,2011-11-11 01:30:00,ANONYMOUS,86243.0,1000.0,I,50.0
2,2011-11-13 01:45:00,2011-11-13 02:15:00,PM04,86243.0,500.0,I,90.0
2,2011-11-13 01:45:00,2011-11-13 02:15:00,PM04,86243.0,1000.0,I,110.0
2,2011-11-13 01:45:00,2011-11-13 02:15:00,ANONYMOUS,86243.0,500.0,I,90.0
2,2011-11-13 01:45:00,2011-11-13 02:15:00,ANONYMOUS,86243.0,1000.0,I,110.0
Lines starting with '#' are meta data and header.
The header must exist. The factor to apply is identified by matching the
session ID, antenna name, frequency and polarization of data in each line of
the file. Note the observation date is supplementary information and not used
for the matching so far. The lines whose antenna name is 'ANONYMOUS' are used
when there is no measurement for specific antenna in the session. In the above
example, if science observation of session 1 contains the antenna PM04, Jy/K
factor for ANONYMOUS antenna will be applied since there is no measurement for
PM04 in session 1.
If no file name is specified or specified file doesn't exist, all Jy/K factors
are set to 1.0.
Example: reffile='', reffile='working/jyperk.csv'
dbservice: Whether or not accessing Jy/K DB to retrieve conversion factors.
Default: None (equivalent to True)
endpoint: Which endpoints to use for query.
Options: 'asdm', 'model-fit', 'interpolation'
Default: None (equivalent to 'asdm')
"""
super().__init__()
# context and vis/infiles must be set first so that properties that require
# domain objects can be function
self.context = context
self.infiles = infiles
self.output_dir = output_dir
# set the properties to the values given as input arguments
self.caltable = caltable
self.reffile = reffile
self.dbservice = dbservice
self.endpoint = endpoint
class SDK2JyCalResults(basetask.Results):
"""Class to hold processing result of SDK2JyCal task."""
def __init__(
self,
vis: str | None = None,
final: list[CalApplication] = [],
pool: Any = [],
reffile: str | None = None,
factors: dict[str, dict[int, dict[str, dict[str, float]]]] = {},
all_ok: bool = False,
dbstatus: bool | None = None
) -> None:
"""Initialize SDK2JyCalResults instance.
Args:
vis: Name of MS. Defaults to None.
final: List of final CalApplication instances. Defaults to [].
pool: List of all CalApplication instances. Defaults to [].
reffile: Name of Jy/K factor file. Defaults to None.
factors: Dictionary of Jy/K factors. Defaults to {}.
all_ok: Boolean flag for availability of factors. Defaults to False.
dbstatus: Status of DB access. Defaults to None.
"""
super().__init__()
self.vis = vis
self.pool = pool[:]
self.final = final[:]
self.error = set()
self.reffile = reffile
self.factors = factors
self.all_ok = all_ok
self.dbstatus = dbstatus
def merge_with_context(self, context: Context) -> None:
"""Merge result instance into context.
Merge of the result instance of Jy/K calibration task includes
the following updates to Pipeline context,
- register CalApplication instances to callibrary
- register Jy/K conversion factors to MS domain object
Args:
context: Pipeline context
"""
if not self.final:
LOG.error('No results to merge')
return
for calapp in self.final:
LOG.debug('Adding calibration to callibrary:\n'
'%s\n%s' % (calapp.calto, calapp.calfrom))
context.callibrary.add(calapp.calto, calapp.calfrom)
# merge k2jy factor to context assing the value as an attribute of MS
for vis, valid_k2jy in self.factors.items():
msobj = context.observing_run.get_ms(name=vis)
msobj.k2jy_factor = {}
for spwid, spw_k2jy in valid_k2jy.items():
for ant, ant_k2jy in spw_k2jy.items():
for pol, pol_k2jy in ant_k2jy.items():
msobj.k2jy_factor[(spwid, ant, pol)] = pol_k2jy
def __repr__(self) -> str:
"""Return string representation of the instance."""
# Format the Tsyscal results.
s = 'SDK2JyCalResults:\n'
for calapplication in self.final:
s += '\tBest caltable for spw #{spw} in {vis} is {name}\n'.format(
spw=calapplication.spw, vis=os.path.basename(calapplication.vis),
name=calapplication.gaintable)
return s
[docs]
@task_registry.set_equivalent_casa_task('hsd_k2jycal')
@task_registry.set_casa_commands_comment('The Kelvin to Jy calibration tables are generated.')
class SDK2JyCal(basetask.StandardTaskTemplate):
"""Generate calibration table of Jy/K factors."""
Inputs = SDK2JyCalInputs
[docs]
def execute( self, **parameters) -> SDK2JyCalResults:
"""
remove existing QUERIED_FACTOR_FILE before the first run
Args:
parameters: parameters
Returns
SDK2JyCalResults
"""
filename = QUERIED_FACTOR_FILE
if self.inputs.context.subtask_counter == 0:
if os.path.isfile(filename):
LOG.info( "File {} exists, will rename to {}_orig".format(filename, filename) )
if os.path.isfile(filename+"_orig"):
LOG.info( "Existing {}_orig will be overwritten".format(filename) )
os.rename( filename, "{}_orig".format(filename) )
results = super().execute( **parameters )
return results
[docs]
def prepare(self) -> SDK2JyCalResults:
"""
Retrieve Jy/K facors from the DB and save them in QUERIED_FACTOR_FILE if dbaccess is True
Returns:
SDK2JyCalResults
"""
inputs = self.inputs
vis = inputs.vis
reffile = inputs.reffile
caltable_status = None
if not os.path.exists(vis):
LOG.error( "Could not find MS '{}'".format(vis) )
return SDK2JyCalResults(os.path.basename(vis))
vis = os.path.basename(vis)
# make a note of the current inputs state before we start fiddling
# with it. This origin will be attached to the final CalApplication.
origin = callibrary.CalAppOrigin(task=SDK2JyCal,
inputs=inputs.to_casa_args())
common_params = inputs.to_casa_args()
# create caltable and extract data for pipeline
caltable_status = self._create_caltable(common_params)
if caltable_status is False:
LOG.error("No Jy/K scaling factors available")
return SDK2JyCalResults(os.path.basename(vis))
factors_used = self._extract_factors( inputs.context, vis, common_params['caltable'], caltable_status )
if factors_used is None:
LOG.error("MS and caltable are inconsistent")
return SDK2JyCalResults(os.path.basename(vis))
# check whether the factors are within the valid range
calibration_policy = observatory_policy.get_calibration_policy( inputs.context )
ms = inputs.context.observing_run.get_ms( vis )
for spw, factors_spw in factors_used.items():
for ant_name, factors_ant in factors_spw.items():
for pol_name, factor in factors_ant.items():
valid_range = calibration_policy.get_jyperk_valid_range( ms, spw, ant_name, pol_name )
if factor < valid_range[0] or factor > valid_range[1]:
LOG.warning(
"Invalid Jy/K factor %f for %s, %s, spw%s, pol %s. (should be within %f and %f).",
factor, vis, ant_name, spw, pol_name, valid_range[0], valid_range[1] )
# write jyperk data to file if fetched from DB
if caltable_status is True:
reffile = QUERIED_FACTOR_FILE
export_jyperk( reffile, vis, factors_used )
# generate callibrary for the caltable
callist = []
valid_factors = {}
calto = callibrary.CalTo(vis=common_params['vis'])
calfrom = callibrary.CalFrom(common_params['caltable'],
caltype=inputs.caltype,
gainfield='', spwmap=None,
interp='nearest,nearest')
calapp = callibrary.CalApplication(calto, calfrom, origin)
if calapp is not None:
callist = [calapp]
factors_ok = True
else:
callist = []
factors_ok = False
valid_factors[vis] = factors_used
return SDK2JyCalResults(vis=vis, pool=callist, reffile=reffile,
factors=valid_factors, all_ok=factors_ok,
dbstatus=caltable_status)
def _extract_factors( self, context: Context, vis: str, caltable: str, dbstatus: bool ) -> dict[str, dict[str, dict[str, float] | None]]:
"""
extract Jy/K factors
Args:
context : Pipeline context
vis : Name of MS
caltable : Name of caltable
dbstatus : status of DB service
Returns:
Jy/K factors / None if MS and caltable are inconsistent
"""
# get list of antennas and science_windows from ms
ms = context.observing_run.get_ms(vis)
antennas = [ x.name for x in ms.get_antenna() ]
science_windows = [ x.id for x in ms.get_spectral_windows(science_windows_only=True) ]
# get antenna list from caltable
with casa_tools.TableReader(caltable+"/ANTENNA") as tb:
caltable_antlist = tb.getcol("NAME")
antidx = {} # index of antenna in caltable
for ant in antennas:
if ant not in caltable_antlist:
LOG.error( "{}: antenna {} does not exist in caltable".format(vis, ant) )
return None
antidx[ant] = np.where( caltable_antlist == ant )[0][0]
# fetch Jy/K factors from caltable file
factors_table = {}
with casa_tools.TableReader(caltable) as tb:
for spw in science_windows:
factors_table[spw] = {}
ddid = ms.get_data_description(spw=spw)
pol_list = list(map(ddid.get_polarization_label, range(ddid.num_polarizations)))
for ant in antennas:
subtb = tb.query( "ANTENNA1={} && SPECTRAL_WINDOW_ID={}".format(antidx[ant], spw) )
factors = subtb.getcol("CPARAM")[:,0,0].real
subtb.close()
if len(factors) < len(pol_list):
LOG.error( "{}: insufficient pols in caltable (MS:{} caltable:{})".format(vis, len(pol_list), len(factors)) )
return None
else:
factors_table[spw][ant] = {}
for polid, pol in enumerate(pol_list):
factor = factors[polid]
factors_table[spw][ant][pol] = 1.0/(factor*factor)
# remove parameters not found in reffile
# doing this because gencal() gives 1.0 for factors not found in reffile
factors_used = copy.deepcopy(factors_table)
if dbstatus is None:
factors_list = jyperkreader.read(context, self.inputs.reffile)
# scan through data
for spw in factors_table.keys():
for ant in factors_table[spw].keys():
for pol in factors_table[spw][ant].keys():
found = False
allowed_pol = [ pol, 'I' ]
for factor in factors_list:
if factor[1:3] == [ant, str(spw)] and factor[3] in allowed_pol:
found = True
break
if not found:
del factors_used[spw][ant][pol]
if factors_used[spw][ant] == {}:
del factors_used[spw][ant]
return factors_used
def _create_caltable( self, common_params: dict[str, str] ) -> bool | None:
"""
Invoke gencal and ceate the calibration table file
if inputs.dbservie is True, then try to obtain Jy/K factors from DB,
but falls back to read jyper.csv if DB access fails.
Args:
common_params : common parameters for calibration
Returns:
status of creating caltable file.
status = True : Caltable is created from DB
status = None : Caltable is created from Jy/K factor CSV file
status = False : Failed to create caltable
"""
inputs = self.inputs
status = None
gencal_args = common_params.copy()
gencal_args['caltype'] = 'jyperk' # override to invoke gencal in jyperk mode
# retrieve factors from DB
if inputs.dbservice:
gencal_job = casa_tasks.gencal(**gencal_args)
try:
self._executor.execute(gencal_job)
status = True
except Exception as e:
if len(str(e)) == 0:
LOG.warning( "Failed to get Jy/K factors from DB." )
else:
LOG.warning( e )
LOG.warning( "{}: Query to Jy/K DB failed. Will fallback to read CSV file '{}'".format(inputs.vis, inputs.reffile) )
status = False
# retrieve factors from file
if not status:
gencal_args['infile'] = inputs.reffile
if not os.path.exists(inputs.reffile):
LOG.error( "Jy/K scaling factor file '{}' does not exist.".format(inputs.reffile) )
status = False
else:
gencal_job = casa_tasks.gencal(**gencal_args)
try:
self._executor.execute(gencal_job)
status = None
except Exception as e:
LOG.error( "{}: Failed to create caltable from CSV file: {}".format(inputs.vis, e) )
status = False
return status
[docs]
def analyse(self, result: SDK2JyCalResults) -> SDK2JyCalResults:
"""Analyse SDK2JyCalResults instance produced by prepare.
1. Define factors actually used and analyze if the factors are provided to all relevant data in MS.
2. Check if caltables in the pool exist to validate the CalApplication, and register valid CalApplication's to final
attribute.
Args:
result: SDK2JyCalResults instance
Returns:
Updated SDK2JyCalResults instance
"""
vis = result.vis
if vis not in result.factors.keys() or len(result.factors[vis]) == 0:
result.all_ok = False
LOG.warning( "No Jy/K factor is given for MS '{}'".format(vis) )
return result
# check if factors are provided to all relevant data in MS
ms = self.inputs.context.observing_run.get_ms(vis)
pol_to_map_i = ('XX', 'YY', 'RR', 'LL', 'I')
for spw in ms.get_spectral_windows(science_windows_only=True):
spwid = spw.id
if spwid not in result.factors[vis]:
result.all_ok = False
LOG.warning( "No Jy/K factor is given for Spw={} of {}".format( spwid, vis ) )
continue
ddid = ms.get_data_description(spw=spwid)
pol_list = list(map(ddid.get_polarization_label, range(ddid.num_polarizations)))
# mapping for anonymous antenna if necessary
all_ant_factor = result.factors[vis][spwid].pop('ANONYMOUS', {})
for ant in ms.get_antenna():
ant_name = ant.name
if all_ant_factor:
result.factors[vis][spwid][ant_name] = all_ant_factor
elif ant_name not in result.factors[vis][spwid]:
result.all_ok = False
LOG.warning("No Jy/K factor is given for Spw={}, Ant={} of {}".format(spwid, ant_name, vis))
continue
all_pol_factor = result.factors[vis][spwid][ant_name].pop('I', {})
for pol in pol_list:
# mapping for stokes I if necessary
if all_pol_factor and pol in pol_to_map_i:
result.factors[vis][spwid][ant_name][pol] = all_pol_factor
# check factors provided for all spw, antenna, and pol
ok = self.__check_factor(result.factors[vis], spwid, ant_name, pol)
result.all_ok &= ok
if not ok:
LOG.warning("No Jy/K factor is given for Spw={}, Ant={}, Pol={} of {}".format(spwid, ant_name, pol, vis))
# double-check that the caltable was actually generated and prepare 'final'.
on_disk = [ca for ca in result.pool if ca.exists()]
result.final[:] = on_disk
missing = [ca for ca in result.pool if ca not in on_disk]
result.error.clear()
result.error.update(missing)
return result
@staticmethod
def __check_factor(
factors: dict[int, dict[str, dict[str, float]]],
spw: int,
ant: str,
pol: str
) -> bool:
"""Check if factor for given meta data is available
Args:
factors: List of Jy/K factors with meta data
spw: Spectral window id
ant: Antenna name
pol: Polarization type
Returns:
Availability of the factor for given meta data
"""
if factors.get( spw, None ) is None:
return False
if factors[spw].get( ant, None ) is None:
return False
if factors[spw][ant].get( pol, None ) is None:
return False
return True
def export_jyperk( outfile: str, vis: str, factors_used: dict ) -> None:
"""Export conversion factors to file.
Format of the output file is CSV.
Args:
outfile : Name of the output file
vis : Name of MS
factors : List of conversion factors with meta data
"""
if not os.path.exists(outfile):
# create file with header information
with open(outfile, 'w') as f:
f.write('MS,Antenna,Spwid,Polarization,Factor\n')
with open(outfile, 'a') as f:
for spw, v1 in factors_used.items():
for ant, v2 in v1.items():
for pol, factor in v2.items():
f.write( "{},{},{},{},{}\n".format( vis, ant, spw, pol, factor ) )