import pipeline.h.tasks.restoredata.restoredata as restoredata
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.vdp as vdp
from pipeline.h.tasks.applycal import applycal
from pipeline.infrastructure import task_registry
from ..importdata import almaimportdata
LOG = infrastructure.get_logger(__name__)
class ALMARestoreDataInputs(restoredata.RestoreDataInputs):
# PIPE-2794/PIPE-2799: Added Pointing to retrieve ASDM_POINTING table for offset information (PIPE-2067)
asis = vdp.VisDependentProperty(
default='SBSummary ExecBlock Antenna Annotation Station Receiver Source CalAtmosphere CalWVR CalPointing Pointing')
# docstring and type hints: supplements hifa_restoredata
def __init__(self, context, copytoraw=None, products_dir=None, rawdata_dir=None, output_dir=None, session=None,
vis=None, bdfflags=None, lazy=None, asis=None, ocorr_mode=None):
"""Initialize Inputs.
Args:
context: Pipeline context object containing state information.
copytoraw: Copy calibration and flagging tables from ``products_dir`` to
``rawdata_dir`` directory.
Default: ``True``.
Example: ``copytoraw=False``
products_dir: Name of the data products directory to copy calibration
products from.
Default: '../products'
The parameter is effective only when ``copytoraw`` = True.
When ``copytoraw`` = False, calibration products in
``rawdata_dir`` will be used.
Example: ``products_dir='myproductspath'``
rawdata_dir: Name of the raw data directory.
Default: ``'../rawdata'``.
Example: ``rawdata_dir='myrawdatapath'``
output_dir: Output directory.
Defaults to None, which corresponds to the current working directory.
session: List of sessions one per visibility file.
Example: ``session=['session_3']``
vis: List of raw visibility data files to be restored.
Assumed to be in the directory specified by rawdata_dir.
Example: ``vis=['uid___A002_X30a93d_X43e']``
bdfflags: Set the BDF flags.
Default: True.
Example: ``bdfflags=False``
lazy: Use the lazy filler option.
Default: ``False``.
Example: ``lazy=True``
asis: Creates verbatim copies of the ASDM tables in the output MS.
The value given to this option must be a string containing a
list of table names separated by whitespace characters.
Default: ``'SBSummary ExecBlock Antenna Annotation Station Receiver Source CalAtmosphere CalWVR CalPointing'``.
Example: ``asis='Source Receiver'``
ocorr_mode: Set ocorr_mode.
Default: ``'ca'``.
Example: ``ocorr_mode='ca'``
"""
super().__init__(context, copytoraw=copytoraw, products_dir=products_dir,
rawdata_dir=rawdata_dir, output_dir=output_dir, session=session,
vis=vis, bdfflags=bdfflags, lazy=lazy, asis=asis,
ocorr_mode=ocorr_mode)
[docs]
@task_registry.set_equivalent_casa_task('hifa_restoredata')
class ALMARestoreData(restoredata.RestoreData):
Inputs = ALMARestoreDataInputs
# Override generic method and use an ALMA specific one. Not much difference
# now but should simplify parameters in future
def _do_importasdm(self, sessionlist, vislist):
inputs = self.inputs
container = vdp.InputsContainer(almaimportdata.ALMAImportData, inputs.context, vis=vislist, session=sessionlist,
save_flagonline=False, lazy=inputs.lazy, bdfflags=inputs.bdfflags,
dbservice=False, asis=inputs.asis, ocorr_mode=inputs.ocorr_mode)
importdata_task = almaimportdata.ALMAImportData(container)
return self._executor.execute(importdata_task, merge=True)
# Override generic method for an ALMA specific one.
def _do_applycal(self):
# PIPE-1165: for hifa_applycal, include the polarization intents.
container = vdp.InputsContainer(applycal.SerialApplycal, self.inputs.context,
intent='TARGET,PHASE,BANDPASS,AMPLITUDE,CHECK,POLARIZATION,POLANGLE,POLLEAKAGE')
# PIPE-1973: check whether any of the caltables-to-be-applied were
# produced by hifa_polcal, and if so, ensure that applycal is called
# with parang=True.
try:
hifa_polcal_found = self._check_for_hifa_polcal_tables(container)
if hifa_polcal_found:
LOG.info("Found hifa_polcal produced caltable(s) to be applied: will call applycal with parang=True.")
container.parang = True
except:
LOG.info("Unable to determine if any hifa_polcal produced caltable(s) are to be applied; will not modify"
" value for 'parang'.")
# Create task, and return result from executing task.
applycal_task = applycal.SerialApplycal(container)
return self._executor.execute(applycal_task, merge=True)
def _check_for_hifa_polcal_tables(self, inputs_container):
"""
Utility method to determine whether a hifa_polcal produced caltable
is present in the context callibrary.
"""
# Perform check for each set of inputs (i.e. per MS).
hifa_polcal_found = False
for inputs in inputs_container:
# Get the target data selection for this task as a CalTo object and
# retrieve corresponding CalState.
calto = callibrary.get_calto_from_inputs(inputs)
calstate = self.inputs.context.callibrary.get_calstate(calto)
# Determine whether any caltable-to-be-applied was created by
# hifa_polcal.
if any('.hifa_polcal.' in calfrom.gaintable for _, calfroms in calstate.merged().items()
for calfrom in calfroms):
hifa_polcal_found = True
break
return hifa_polcal_found