from __future__ import annotations
import ssl
import traceback
import urllib
from typing import TYPE_CHECKING
import certifi
from pipeline import infrastructure
from pipeline.h.tasks.importdata import fluxes, importdata
from pipeline.hifa.tasks.importdata import dbfluxes
from pipeline.infrastructure import sessionutils, task_registry, vdp
if TYPE_CHECKING:
from pipeline.domain import MeasurementSet, ObservingRun
from pipeline.h.tasks.common.commonfluxresults import FluxCalibrationResults
from pipeline.infrastructure.launcher import Context
__all__ = [
'ALMAImportData',
'SerialALMAImportData',
'ALMAImportDataInputs',
'ALMAImportDataResults'
]
LOG = infrastructure.logging.get_logger(__name__)
class ALMAImportDataInputs(importdata.ImportDataInputs):
# PIPE-2067: Added Pointing to retrieve ASDM_POINTING table for offset information
asis = vdp.VisDependentProperty(
default='Annotation Antenna CalAtmosphere CalPointing CalWVR ExecBlock Receiver Pointing SBSummary Source Station'
)
dbservice = vdp.VisDependentProperty(default=False)
createmms = vdp.VisDependentProperty(default='false')
# sets threshold for polcal parallactic angle coverage. See PIPE-597
minparang = vdp.VisDependentProperty(default=0.0)
parallel = sessionutils.parallel_inputs_impl(default=False)
# docstring and type hints: supplements hifa_importdata
def __init__(
self,
context: Context,
vis: list[str] | None = None,
output_dir: str | None = None,
asis: str | None = None,
process_caldevice: bool | None = None,
session: str | None = None,
overwrite: bool | None = None,
nocopy: bool | None = None,
bdfflags: bool | None = None,
lazy: bool | None = None,
save_flagonline: bool | None = None,
dbservice: bool | None = None,
createmms: str | None = None,
ocorr_mode: str | None = None,
datacolumns: dict[str, str] | None = None,
minparang: float | None = None,
parallel: bool | None = None,
):
"""Initialize Inputs.
Args:
context: Pipeline context object containing state information.
vis: List of visibility data files. These may be ASDMs, tar
files of ASDMs, MSes, or tar files of MSes. If ASDM files
are specified, they will be converted to MS format.
Example: ``vis=['X227.ms', 'asdms.tar.gz']``
output_dir: Output directory.
Defaults to None, which corresponds to the current working directory.
asis: Creates verbatim copies of the ASDM tables in the output MS.
The value given to this option must be a list of table names
separated by space characters.
process_caldevice: Import the caldevice table from the ASDM.
session: List of session names, one for each visibility dataset,
used to group the MSes into sessions.
Example: ``session=['session_1', 'session_2']``
overwrite: Overwrite existing files on import; defaults to False.
When converting ASDM to MS, if overwrite=False and the MS
already exists in the output directory, then this existing
MS dataset will be used instead.
Example: ``overwrite=True``
nocopy: Disable copying of MS to working directory; defaults to
False.
Example: ``nocopy=True``
bdfflags: Apply BDF flags on import.
lazy: Use the lazy filler import.
save_flagonline:
dbservice: Use the online flux catalog.
createmms: Create an MMS.
ocorr_mode: ALMA default set to ca.
datacolumns: Dictionary defining the data types of existing columns.
The format is:
``{'data': 'data type 1'}``
or
``{'data': 'data type 1', 'corrected': 'data type 2'}``.
For ASDMs the data type can only be RAW and one
can only specify it for the data column.
For MSes one can define two different data types
for the DATA and CORRECTED_DATA columns and they
can be any of the known data types (RAW,
REGCAL_CONTLINE_ALL, REGCAL_CONTLINE_SCIENCE,
SELFCAL_CONTLINE_SCIENCE, REGCAL_LINE_SCIENCE,
SELFCAL_LINE_SCIENCE, BASELINED, ATMCORR). The
intent selection strings _ALL or _SCIENCE can be
skipped. In that case the task determines this
automatically by inspecting the existing intents
in the dataset.
Usually, a single datacolumns dictionary is used
for all datasets. If necessary, one can define a
list of dictionaries, one for each EB, with
different setups per EB.
If no types are specified,
{'data':'raw','corrected':'regcal_contline'}
or {'data':'raw'} will be assumed, depending on
whether the corrected column exists or not.
minparang: Minimum required parallactic angle range for polarisation
calibrator, in degrees. The default of 0.0 is used for
non-polarisation processing.
parallel: Process multiple MeasurementSets in parallel using the casampi parallelization framework.
Options: ``'automatic'``, ``'true'``, ``'false'``, ``True``, ``False``
Default: ``None`` (equivalent to ``False``)
"""
super().__init__(context, vis=vis, output_dir=output_dir, asis=asis,
process_caldevice=process_caldevice, session=session,
overwrite=overwrite, nocopy=nocopy, bdfflags=bdfflags, lazy=lazy,
save_flagonline=save_flagonline, createmms=createmms,
ocorr_mode=ocorr_mode, datacolumns=datacolumns)
self.dbservice = dbservice
self.minparang = minparang
self.parallel = parallel
class ALMAImportDataResults(importdata.ImportDataResults):
def __init__(
self,
mses: list[MeasurementSet] | None = None,
setjy_results: list[FluxCalibrationResults] | None = None,
):
super().__init__(mses=mses, setjy_results=setjy_results)
def __repr__(self) -> str:
return 'ALMAImportDataResults:\n\t{0}'.format(
'\n\t'.join([ms.name for ms in self.mses]))
class SerialALMAImportData(importdata.ImportData):
Inputs = ALMAImportDataInputs
Results = ALMAImportDataResults
def prepare(self, **parameters) -> Results:
results = super().prepare()
# If online flux services were requested but unavailable, signal an error
# after weblog generation to stop the pipeline before the next task.
if getattr(self.inputs, 'dbservice', False) and results.fluxservice == 'FAIL':
results.tb = (
'Online flux catalog unavailable; fell back to local Source.xml fluxes. '
'Stopping after weblog export.'
)
return results
def _get_fluxes(
self,
context: Context,
observing_run: ObservingRun,
) -> tuple[str | None, list[FluxCalibrationResults], list[dict[str, str | None]] | None]:
# get the flux measurements from Source.xml for each MS
if self.inputs.dbservice:
testquery = '?DATE=27-March-2013&FREQUENCY=86837309056.169219970703125&WEIGHTED=true&RESULT=1&NAME=J1427-4206&VERBOSE=1'
# Test for service response
flux_url, backup_flux_url = dbfluxes.get_flux_urls()
url = flux_url + testquery
try:
ssl_context = ssl.create_default_context(cafile=certifi.where())
LOG.info('Attempting test query at: %s', url)
urllib.request.urlopen(url, context=ssl_context, timeout=60.0)
xml_results, qastatus = dbfluxes.get_setjy_results(observing_run.measurement_sets)
fluxservice = 'FIRSTURL'
except Exception:
try:
LOG.warning('Unable to execute initial test query with primary flux service.')
traceback_msg = traceback.format_exc()
LOG.debug(traceback_msg)
ssl_context = ssl.create_default_context(cafile=certifi.where())
url = backup_flux_url + testquery
LOG.info('Attempting test query at backup: %s', url)
urllib.request.urlopen(url, context=ssl_context, timeout=60.0)
xml_results, qastatus = dbfluxes.get_setjy_results(observing_run.measurement_sets)
fluxservice='BACKUPURL'
except Exception as e2:
LOG.warning(('Unable to execute backup test query with flux service.\n'
'Proceeding without using the online flux catalog service.'))
traceback_msg = traceback.format_exc()
LOG.debug(traceback_msg)
xml_results = fluxes.get_setjy_results(observing_run.measurement_sets)
fluxservice = 'FAIL'
qastatus = None
else:
xml_results = fluxes.get_setjy_results(observing_run.measurement_sets)
fluxservice = None
qastatus = None
# write/append them to flux.csv
# Cycle 1 hack for exporting the field intents to the CSV file:
# export_flux_from_result queries the context, so we pseudo-register
# the mses with the context by replacing the original observing run
orig_observing_run = context.observing_run
context.observing_run = observing_run
try:
fluxes.export_flux_from_result(xml_results, context)
finally:
context.observing_run = orig_observing_run
# re-read from flux.csv, which will include any user-coded values
combined_results = fluxes.import_flux(context.output_dir, observing_run)
return fluxservice, combined_results, qastatus
[docs]
@task_registry.set_equivalent_casa_task('hifa_importdata')
@task_registry.set_casa_commands_comment('If required, ASDMs are converted to MeasurementSets.')
class ALMAImportData(sessionutils.ParallelTemplate):
"""ALMAImportData class for parallelization."""
Inputs = ALMAImportDataInputs
Task = SerialALMAImportData