Source code for pipeline.infrastructure.utils.caltable_tools
import os
import re
import numpy as np
from pipeline.infrastructure import casa_tools
__all__ = [
'get_num_caltable_polarizations',
'nchan_from_caltable',
'chan_freq_from_caltable',
'antenna_names_from_caltable',
'get_ant_ids_from_caltable',
]
[docs]
def get_num_caltable_polarizations(caltable: str) -> int:
"""Obtain number of polarisations from calibration table.
Seemingly the number of QA ID does not map directly to the number of
polarisations for the spw in the MS, but the number of polarisations for
the spw as held in the caltable.
"""
with casa_tools.TableReader(caltable) as tb:
col_shapes = set(tb.getcolshapestring('CPARAM'))
# get the number of pols stored in the caltable, checking that this
# is consistent across all rows
fmt = re.compile(r'\[(?P<num_pols>\d+), (?P<num_rows>\d+)\]')
col_pols = set()
for shape in col_shapes:
m = fmt.match(shape)
if m:
col_pols.add(int(m.group('num_pols')))
else:
raise ValueError('Could not find shape of polarisation from %s' % shape)
if len(col_pols) != 1:
raise ValueError('Got %s polarisations from %s' % (len(col_pols), col_shapes))
return int(col_pols.pop())
# Adapted from analysisUtils.getNChanFromCaltable()
[docs]
def nchan_from_caltable(caltable, spw) -> int:
"""
Returns the number of channels of the specified spw in a caltable.
"""
if not os.path.exists(caltable):
raise FileNotFoundError(f"Caltable {caltable} does not exist")
with casa_tools.TableReader(caltable) as mytb:
spectralWindowTable = mytb.getkeyword('SPECTRAL_WINDOW').split()[1]
with casa_tools.TableReader(spectralWindowTable) as mytb:
nchan = mytb.getcell('NUM_CHAN', spw)
return nchan
# Adapted from analysisUtils.getChanFreqFromCaltable()
[docs]
def chan_freq_from_caltable(caltable, spw) -> np.array:
"""
Returns the frequency (in GHz) of the specified spw channel in a caltable.
Return array of all channel frequencies
"""
if not os.path.exists(caltable):
raise FileNotFoundError(f"Caltable {caltable} does not exist")
with casa_tools.TableReader(caltable) as mytb:
spectralWindowTable = mytb.getkeyword('SPECTRAL_WINDOW').split()[1]
with casa_tools.TableReader(spectralWindowTable) as mytb:
spws = range(len(mytb.getcol('MEAS_FREQ_REF')))
chanFreqGHz = {}
for i in spws:
# The array shapes can vary, so read one at a time.
spectrum = mytb.getcell('CHAN_FREQ', i)
chanFreqGHz[i] = 1e-9 * spectrum
return chanFreqGHz[spw]
[docs]
def antenna_names_from_caltable(caltable) -> list[str]:
"""
Returns the antenna names from the specified caltable's ANTENNA table.
"""
if not os.path.exists(caltable):
raise FileNotFoundError(f"Caltable {caltable} does not exist")
mytable = os.path.join(caltable, 'ANTENNA')
with casa_tools.TableReader(mytable) as mytb:
names = mytb.getcol('NAME') # an array
return list(names)
[docs]
def get_ant_ids_from_caltable(caltable) -> list[int]:
"""
Returns a list of all unique antenna ids in the caltable
"""
if not os.path.exists(caltable):
raise FileNotFoundError(f"Caltable {caltable} does not exist")
with casa_tools.TableReader(caltable) as tb:
table_ants = set(tb.getcol('ANTENNA1'))
caltable_antennas = [int(ant) for ant in table_ants]
return caltable_antennas
def get_spws_from_table(caltable) -> list[int]:
"""
Returns a list of all unique spws in the calibration table
"""
if not os.path.exists(caltable):
raise FileNotFoundError(f"Caltable {caltable} does not exist")
with casa_tools.TableReader(caltable) as tb:
table_spws = set(tb.getcol('SPECTRAL_WINDOW_ID'))
caltable_spws = sorted([int(spw) for spw in table_spws])
return caltable_spws
def field_ids_from_caltable(caltable) -> list[int]:
"""
Returns a list of all unique field ids in the calibration table
"""
if not os.path.exists(caltable):
raise FileNotFoundError(f"Caltable {caltable} does not exist")
with casa_tools.TableReader(caltable) as mytb:
fields = list(set(mytb.getcol('FIELD_ID')))
return fields
def field_names_from_caltable(caltable) -> list[str]:
"""
Returns a list of all unique field names in the calibration table
"""
if not os.path.exists(caltable):
raise FileNotFoundError(f"Caltable {caltable} does not exist")
fields = field_ids_from_caltable(caltable)
with casa_tools.TableReader(caltable + '/FIELD') as mytb:
names = mytb.getcol('NAME')
fields = list(names[fields])
return fields