"""
The exportdata module provides base classes for preparing data products
on disk for upload to the archive.
"""
from __future__ import annotations
import collections
import copy
import json
import fnmatch
import glob
import io
import os
import shutil
import sys
import tarfile
import uuid
from typing import TYPE_CHECKING
import astropy.io.fits as apfits
from pipeline.domain import DataType
from pipeline.h.tasks.common import manifest
from pipeline.h.tasks.exportdata.aqua import export_to_disk as export_aqua_to_disk
from pipeline import environment, infrastructure
from pipeline.infrastructure import (basetask, callibrary, casa_tasks, casa_tools, imagelibrary, task_registry,
utils, vdp)
from pipeline.infrastructure.filenamer import fitsname, PipelineProductNameBuilder
if TYPE_CHECKING:
from pipeline.h.tasks.exportdata.aqua import AquaXmlGenerator
from pipeline.infrastructure.launcher import Context
# the logger for this module
LOG = infrastructure.logging.get_logger(__name__)
StdFileProducts = collections.namedtuple('StdFileProducts', 'ppr_file weblog_file casa_commands_file casa_pipescript casa_restore_script')
class ExportDataInputs(vdp.StandardInputs):
"""Manages the inputs for the ExportData task.
Attributes:
context: The pipeline Context state object holding all pipeline state.
output_dir: The directory containing the output of the pipeline.
session: A string or list of strings containing the sessions(s) associated with each vis.
Defaults to a single session containing all vis. Vis without a matching session are
assigned to the last session in the list.
vis: A string or list of strings containing the MS name(s) on which to operate.
pprfile: The pipeline processing request.
calintents: The list of calintents defining the calibrator source images to be saved.
Defaults to all calibrator intents.
calimages: The list of calibrator source images to be saved. Defaults to all calibrator
images matching calintents. If defined overrides calintents and the calibrator images
in the context.
targetimages: The list of target source images to be saved. Defaults to all target images.
If defined overrides the list of target images in the context.
products_dir: The directory where the data productions will be written.
"""
processing_data_type = [DataType.RAW, DataType.REGCAL_CONTLINE_ALL,
DataType.REGCAL_CONTLINE_SCIENCE, DataType.SELFCAL_CONTLINE_SCIENCE,
DataType.REGCAL_CONT_SCIENCE, DataType.SELFCAL_CONT_SCIENCE,
DataType.REGCAL_LINE_SCIENCE, DataType.SELFCAL_LINE_SCIENCE]
calimages = vdp.VisDependentProperty(default=[])
calintents = vdp.VisDependentProperty(default='')
exportmses = vdp.VisDependentProperty(default=False)
pprfile = vdp.VisDependentProperty(default='')
session = vdp.VisDependentProperty(default=[])
targetimages = vdp.VisDependentProperty(default=[])
imaging_products_only = vdp.VisDependentProperty(default=False)
tarms = vdp.VisDependentProperty(default=True)
@vdp.VisDependentProperty
def products_dir(self):
if self.context.products_dir is None:
return os.path.abspath('./')
else:
return self.context.products_dir
@vdp.VisDependentProperty
def exportcalprods(self):
return not (self.imaging_products_only or self.exportmses)
# docstring and type hints: supplements h_exportdata, hsd_exportdata, hsdn_exportdata
def __init__(
self,
context: Context,
output_dir: str = None,
session: list[str] = None,
vis: list[str] = None,
exportmses: bool = None,
tarms: bool = None,
pprfile: list[str] = None,
calintents: str = None,
calimages: list[str] = None,
targetimages: list[str] = None,
products_dir: str = None,
imaging_products_only: bool = None,
):
"""Initialise the Inputs, initialising any property values to those given here.
Args:
context: the pipeline Context state object
output_dir: the working directory for pipeline data
session: List of sessions one per visibility/autocorrelation file.
Defaults to a single virtual session containing all the
visibility files in vis.
Example: ``session=['session1', 'session2']``
vis: List of visibility/autocorrelation data files for which
flagging and calibration information will be exported.
Defaults to the list maintained in the pipeline context.
Example: ``vis=['X227.ms', 'X228.ms']``
exportmses: Export MeasurementSets defined in vis instead of flags,
caltables, and calibration instructions.
Example: ``exportmses=True``
Default: ``None`` (equivalent to ``False``)
tarms: Tar final MeasurementSets
pprfile: Name of the pipeline processing request to be exported.
Defaults to a file matching the template 'PPR_*.xml'.
Example: ``pprfile=['PPR_GRB021004.xml']``
calintents: List of calibrator image types to be exported.
Defaults to all standard calibrator intents 'BANDPASS', 'PHASE', 'FLUX'
Example: ``calintents='PHASE'``
calimages: List of calibrator images to be exported.
Defaults to all calibrator images recorded in the pipeline context.
Example: ``calimages=['3C454.3.bandpass', '3C279.phase']``
targetimages: List of science target images to be exported.
Defaults to all science target images recorded in the pipeline context.
Example: ``targetimages=['NGC3256.band3', 'NGC3256.band6']``,
``targetimages=['r_aqr.CM02.spw5.line0.XXYY.sd.im', 'r_aqr.CM02.spw5.XXYY.sd.cont.im']``
products_dir: Name of the data products subdirectory.
Defaults to './'.
Example: ``products_dir='../products'``
imaging_products_only: Export the science target image products only.
"""
super().__init__()
self.context = context
self.vis = vis
self.output_dir = output_dir
self.session = session
self.exportmses = exportmses
self.tarms = tarms
self.pprfile = pprfile
self.calintents = calintents
self.calimages = calimages
self.targetimages = targetimages
self.products_dir = products_dir
self.imaging_products_only = imaging_products_only
class ExportDataResults(basetask.Results):
def __init__(self, pprequest='', sessiondict=None, msvisdict=None, calvisdict=None, calimages=None, targetimages=None, weblog='',
pipescript='', restorescript='', commandslog='', manifest=''):
"""
Initialise the results object with the given list of JobRequests.
"""
super().__init__()
if sessiondict is None:
sessiondict = collections.OrderedDict()
if msvisdict is None:
msvisdict = collections.OrderedDict()
if calvisdict is None:
calvisdict = collections.OrderedDict()
if calimages is None:
calimages = []
if targetimages is None:
targetimages = []
self.pprequest = pprequest
self.sessiondict = sessiondict
self.msvisdict = msvisdict
self.calvisdict = msvisdict
self.calimages = calimages
self.targetimages = targetimages
self.weblog = weblog
self.pipescript = pipescript
self.restorescript = restorescript
self.commandslog = commandslog
self.manifest = manifest
def __repr__(self):
s = 'ExportData results:\n'
return s
[docs]
@task_registry.set_equivalent_casa_task('h_exportdata')
@task_registry.set_casa_commands_comment('The output data products are computed.')
class ExportData(basetask.StandardTaskTemplate):
"""
ExportData is the base class for exporting data to the products
subdirectory. It performs the following operations:
- Saves the pipeline processing request in an XML file
- Saves the final flags per ASDM in a compressed / tarred CASA flag
versions file
- Saves the final calibration apply list per ASDM in a text file
- Saves the final set of caltables per session in a compressed /
tarred file containing CASA tables
- Saves the final web log in a compressed / tarred file
- Saves the final CASA command log in a text file
- Saves the final pipeline script in a Python file
- Saves the final pipeline restore script in a Python file
- Saves the images in FITS cubes one per target and spectral window
"""
# link the accompanying inputs to this task
Inputs = ExportDataInputs
# Override the default behavior for multi-vis tasks
is_multi_vis_task = True
# name builder
NameBuilder = PipelineProductNameBuilder
[docs]
def prepare(self):
"""
Prepare and execute an export data job appropriate to the
task inputs.
"""
# Create a local alias for inputs, so we're not saying
# 'self.inputs' everywhere
inputs = self.inputs
# Create products directory if necessary.
utils.ensure_products_dir_exists(inputs.products_dir)
# Initialize the standard OUS status ID string.
oussid = inputs.context.get_oussid()
# Define the results object
result = ExportDataResults()
# Make the standard vislist and the sessions lists.
# These lists are constructed for the calibration mses only no matter the value of
# inputs.imaging_products_only
session_list, session_names, session_vislists, vislist = self._make_lists(inputs.context, inputs.session,
inputs.vis, imaging_only_mses=False)
# Export the standard per OUS file products
# The pipeline processing request
# A compressed tarfile of the weblog
# The pipeline processing script
# The pipeline restore script (if exportporting calibartion products)
# The CASA commands log
recipe_name = self.get_recipename(inputs.context)
if not recipe_name:
prefix = oussid
else:
prefix = oussid + '.' + recipe_name
stdfproducts = self._do_standard_ous_products(
inputs.context, inputs.exportcalprods, prefix, inputs.pprfile, session_list, vislist, inputs.output_dir,
inputs.products_dir)
if stdfproducts.ppr_file:
result.pprequest = os.path.basename(stdfproducts.ppr_file)
result.weblog = os.path.basename(stdfproducts.weblog_file)
result.pipescript = os.path.basename(stdfproducts.casa_pipescript)
if not inputs.exportcalprods:
result.restorescript = 'Undefined'
else:
result.restorescript = os.path.basename(stdfproducts.casa_restore_script)
result.commandslog = os.path.basename(stdfproducts.casa_commands_file)
# Make the standard ms dictionary and export per ms products
# Currently these are compressed tar files of per MS flagging tables and per MS text files of calibration
# apply instructions
msvisdict = collections.OrderedDict()
calvisdict = collections.OrderedDict()
_, exportmses_session_names, exportmses_session_vislists, exportmses_vislist = self._make_lists(
inputs.context, inputs.session, None, imaging_only_mses=None)
if not inputs.imaging_products_only:
if inputs.exportmses:
msvisdict = self._do_ms_products(inputs.context, exportmses_vislist, inputs.products_dir)
if inputs.exportcalprods:
calvisdict = self._do_standard_ms_products(inputs.context, vislist, inputs.products_dir)
result.msvisdict = msvisdict
result.calvisdict = calvisdict
# Make the standard sessions dictionary and export per session products
# Currently these are compressed tar files of per session calibration tables
sessiondict = collections.OrderedDict()
if not inputs.imaging_products_only:
if inputs.exportcalprods:
sessiondict = self._do_standard_session_products(inputs.context, oussid, session_names, session_vislists,
inputs.products_dir)
elif inputs.exportmses:
# still needs sessiondict
for i in range(len(exportmses_session_names)):
sessiondict[exportmses_session_names[i]] = \
([os.path.basename(visfile) for visfile in exportmses_session_vislists[i]], )
result.sessiondict = sessiondict
# Export calibrator images to FITS
calimages_list, calimages_fitslist, calimages_fitskeywords = self._export_images(inputs.context, True, inputs.calintents,
inputs.calimages, inputs.products_dir, oussid)
result.calimages=(calimages_list, calimages_fitslist)
# Export science target images to FITS
targetimages_list, targetimages_fitslist, targetimages_fitskeywords = self._export_images(inputs.context, False, 'TARGET',
inputs.targetimages, inputs.products_dir, oussid)
result.targetimages=(targetimages_list, targetimages_fitslist)
# Export the pipeline manifest file
pipemanifest = self._make_pipe_manifest(inputs.context, oussid, stdfproducts, sessiondict, msvisdict,
inputs.exportmses, calvisdict, inputs.exportcalprods,
[os.path.basename(image) for image in calimages_fitslist], calimages_fitskeywords,
[os.path.basename(image) for image in targetimages_fitslist], targetimages_fitskeywords)
casa_pipe_manifest = self._export_pipe_manifest(prefix, 'pipeline_manifest.xml', inputs.products_dir,
pipemanifest)
result.manifest = os.path.basename(casa_pipe_manifest)
# Return the results object, which will be used for the weblog
return result
[docs]
def analyse(self, results):
"""
Analyse the results of the export data operation.
This method does not perform any analysis, so the results object is
returned exactly as-is, with no data massaging or results items
added.
Args:
results: The results object from prepare.
Returns:
The results object unchanged.
"""
return results
[docs]
def get_recipename(self, context):
"""
Get the recipe name
"""
# Get the parent ous ousstatus name. This is the sanitized ous
# status uid
ps = context.project_structure
if ps is None or ps.recipe_name == 'Undefined':
recipe_name = ''
else:
recipe_name = ps.recipe_name
return recipe_name
def _has_imaging_data(self, context, vis):
"""Check if the given vis contains any imaging data."""
imaging_datatypes = [DataType.SELFCAL_CONTLINE_SCIENCE, DataType.REGCAL_CONTLINE_SCIENCE, DataType.SELFCAL_CONT_SCIENCE,
DataType.REGCAL_CONT_SCIENCE, DataType.SELFCAL_LINE_SCIENCE, DataType.REGCAL_LINE_SCIENCE]
ms_object = context.observing_run.get_ms(name=vis)
return any(ms_object.get_data_column(datatype) for datatype in imaging_datatypes)
def _make_lists(
self,
context: Context,
session: list[str],
vis: list[str] | str | None = None,
imaging_only_mses: bool | None = False,
) -> tuple[list[str], list[str], list[list[str]], list[str]]:
"""
Create the vis and sessions lists.
This method processes input parameters to generate lists of sessions, session names,
visibility files, and measurement sets based on the provided context and flags.
Args:
context: The Pipeline context object.
session: Session names
vis: A single visibility file name, a list of such names,
or None. If None, the method uses all measurement sets registered in the context.
imaging_only_mses: A flag to determine how to filter measurement
sets based on imaging data:
True: Includes only measurement sets with imaging data.
False: Includes only those without imaging data.
None: Disables filtering, including all measurement sets.
Returns:
session_list: A list of session identifiers.
session_names: A list of names corresponding to each session in session_list.
session_vislists: For each session, a list of visibility files associated
with that session.
vislist: The final filtered list of visibility files.
"""
# process the 'vis' parameter and ensure vislist is a list
vislist = vis
if vislist is None:
vislist = [ms.name for ms in context.observing_run.measurement_sets]
if isinstance(vislist, str):
vislist = [vislist]
# filter based on imaging_only_mses condition
if isinstance(imaging_only_mses, bool):
if imaging_only_mses:
vislist = [vis for vis in vislist if self._has_imaging_data(context, vis)]
else:
vislist = [vis for vis in vislist if not self._has_imaging_data(context, vis)]
# Get the session list and the visibility files associated with each session.
session_list, session_names, session_vislists = self._get_sessions(context, session, vislist)
return session_list, session_names, session_vislists, vislist
def _do_standard_ous_products(self, context, exportcalprods, oussid, pprfile, session_list, vislist, output_dir,
products_dir):
"""
Generate the per ous standard products
"""
# Locate and copy the pipeline processing request.
# There should normally be at most one pipeline processing request.
# In interactive mode there is no PPR.
ppr_files = self._export_pprfile(context, output_dir, products_dir, oussid, pprfile)
if ppr_files:
ppr_file = os.path.basename(ppr_files[0])
else:
ppr_file = None
# Export a tar file of the web log
weblog_file = self._export_weblog(context, products_dir)
# Export the processing log independently of the web log
casa_commands_file = self._export_casa_commands_log(context, context.logs['casa_commands'], products_dir,
oussid)
# Export the processing script independently of the web log
casa_pipescript = self._export_casa_script(context, context.logs['pipeline_script'], products_dir, oussid)
# Export the restore script independently of the web log
if not exportcalprods:
casa_restore_script = 'Undefined'
else:
casa_restore_script = self._export_casa_restore_script(context, context.logs['pipeline_restore_script'],
products_dir, oussid, vislist, session_list)
return StdFileProducts(ppr_file, weblog_file, casa_commands_file, casa_pipescript, casa_restore_script)
def _do_ms_products(self, context, vislist, products_dir):
"""
Tar up the final calibrated mses and put them in the products
directory.
Used for reprocessing applications
"""
# Loop over the measurements sets in the working directory and tar
# them up.
mslist = []
for visfile in vislist:
ms_file = self._export_final_ms( context, visfile, products_dir)
mslist.append(ms_file)
# Create the ordered vis dictionary
# The keys are the base vis names
# The values are the ms files
visdict = collections.OrderedDict()
for i in range(len(vislist)):
visdict[os.path.basename(vislist[i])] = \
os.path.basename(mslist[i])
return visdict
def _do_standard_ms_products(self, context, vislist, products_dir):
"""
Generate the per ms standard products
"""
# Loop over the measurements sets in the working directory and
# save the final flags using the flag manager.
flag_version_name = 'Pipeline_Final'
for visfile in vislist:
self._save_final_flagversion(visfile, flag_version_name)
# Copy the final flag versions to the data products directory
# and tar them up.
flag_version_list = []
for visfile in vislist:
flag_version_file = self._export_final_flagversion(context, visfile, flag_version_name, products_dir)
flag_version_list.append(flag_version_file)
# Loop over the measurements sets in the working directory, and
# create the calibration apply file(s) in the products directory.
apply_file_list = []
for visfile in vislist:
apply_file = self._export_final_applylist(context, \
visfile, products_dir)
apply_file_list.append(apply_file)
# Create the ordered vis dictionary
# The keys are the base vis names
# The values are a tuple containing the flags and applycal files
visdict = collections.OrderedDict()
for i in range(len(vislist)):
visdict[os.path.basename(vislist[i])] = \
(os.path.basename(flag_version_list[i]), \
os.path.basename(apply_file_list[i]))
return visdict
def _do_standard_session_products(self, context, oussid, session_names, session_vislists, products_dir,
imaging=False):
"""
Generate the per ms standard products
"""
# Export tar files of the calibration tables one per session
caltable_file_list = []
for i in range(len(session_names)):
caltable_file = self._export_final_calfiles(context, oussid,
session_names[i], session_vislists[i], products_dir, imaging=imaging)
caltable_file_list.append(caltable_file)
# Create the ordered session dictionary
# The keys are the session names
# The values are a tuple containing the vislist and the caltables
sessiondict = collections.OrderedDict()
for i in range(len(session_names)):
sessiondict[session_names[i]] = \
([os.path.basename(visfile) for visfile in session_vislists[i]], \
os.path.basename(caltable_file_list[i]))
return sessiondict
def _do_if_auxiliary_products(
self,
oussid: str,
output_dir: str,
products_dir: str,
vislist: list[str],
imaging_products_only: bool,
pipeline_stats_file: str = None,
) -> str | None:
"""
Generate the auxiliary products
"""
if imaging_products_only:
contfile_name = 'cont.dat'
fluxfile_name = 'Undefined'
antposfile_name = 'Undefined'
else:
fluxfile_name = 'flux.csv'
antposfile_name = 'antennapos.csv'
contfile_name = 'cont.dat'
empty = True
# PIPE-51: Look for per-MS antennapos.json files
antpos_files = glob.glob(os.path.join(output_dir, "*.antennapos.json"))
# Get the flux, antenna position, and continuum subtraction
# files and test to see if at least one of them exists
flux_file = os.path.join(output_dir, fluxfile_name)
antpos_file = os.path.join(output_dir, antposfile_name)
cont_file = os.path.join(output_dir, contfile_name)
if any([os.path.exists(flux_file), os.path.exists(antpos_file),
os.path.exists(cont_file), antpos_files]):
empty = False
# Export the general and target source template flagging files
# The general template flagging files are not required for the restore but are
# informative to the user.
# Whether or not the target template files should be exported to the archive depends
# on the final place of the target flagging step in the work flow and
# how flags will or will not be stored back into the ASDM.
targetflags_filelist = []
if self.inputs.imaging_products_only:
flags_file_list = utils.glob_ordered('*.flagtargetstemplate.txt')
elif not vislist:
flags_file_list = utils.glob_ordered('*.flagtemplate.txt')
flags_file_list.extend(utils.glob_ordered('*.flagtsystemplate.txt'))
else:
flags_file_list = utils.glob_ordered('*.flag*template.txt')
for file_name in flags_file_list:
flags_file = os.path.join(output_dir, file_name)
if os.path.exists(flags_file):
empty = False
targetflags_filelist.append(flags_file)
else:
targetflags_filelist.append('Undefined')
# PIPE-1834: look for timetracker json report files
timetracker_file_list = utils.glob_ordered('*.timetracker.json')
if timetracker_file_list:
empty = False
# PIPE-1802: look for the selfcal/restore resources
selfcal_resources_list = []
if hasattr(self.inputs.context, 'selfcal_resources') and isinstance(self.inputs.context.selfcal_resources, list):
selfcal_resources_list = self.inputs.context.selfcal_resources
if selfcal_resources_list:
empty = False
# PIPE-2094: check for the pipeline stats file
if pipeline_stats_file and os.path.exists(pipeline_stats_file):
empty = False
if empty:
return None
# Define the name of the output tarfile
tarfilename = f'{oussid}.auxproducts.tgz'
LOG.info('Saving auxiliary data products in %s', tarfilename)
# Open tarfile
with tarfile.open(os.path.join(products_dir, tarfilename), 'w:gz') as tar:
# Save flux file
if os.path.exists(flux_file):
tar.add(flux_file, arcname=os.path.basename(flux_file))
LOG.info('Saving auxiliary data product %s in %s', os.path.basename(flux_file), tarfilename)
else:
LOG.info('Auxiliary data product flux.csv does not exist')
# Save antenna positions file; check for both per-MS files and default file types
if antpos_files:
for f in antpos_files:
tar.add(f, arcname=os.path.basename(f))
LOG.info('Saving auxiliary data product %s in %s', os.path.basename(f), tarfilename)
else:
if os.path.exists(antpos_file):
tar.add(antpos_file, arcname=os.path.basename(antpos_file))
LOG.info('Saving auxiliary data product %s in %s', os.path.basename(antpos_file), tarfilename)
else:
LOG.info('Auxiliary data product antennapos file(s) does not exist')
# Save continuum regions file
if os.path.exists(cont_file):
tar.add(cont_file, arcname=os.path.basename(cont_file))
LOG.info('Saving auxiliary data product %s in %s', os.path.basename(cont_file), tarfilename)
else:
LOG.info('Auxiliary data product cont.dat does not exist')
# Save target flag files
for flags_file in targetflags_filelist:
if os.path.exists(flags_file):
tar.add(flags_file, arcname=os.path.basename(flags_file))
LOG.info('Saving auxiliary data product %s in %s', os.path.basename(flags_file), tarfilename)
else:
LOG.info('Auxiliary data product flagging target templates file does not exist')
# PIPE-1834: Save timetracker json report files
for timetracker_file in timetracker_file_list:
if os.path.exists(timetracker_file):
tar.add(timetracker_file, arcname=os.path.basename(timetracker_file))
LOG.info('Saving auxiliary data product %s in %s', os.path.basename(timetracker_file), tarfilename)
else:
LOG.info('Auxiliary data product timetracker json report file does not exist')
# PIPE-1802: Save selfcal restore resources
for selfcal_resource in selfcal_resources_list:
if os.path.exists(selfcal_resource):
tar.add(selfcal_resource, arcname=selfcal_resource)
LOG.info('Saving auxiliary data product %s in %s', selfcal_resource, tarfilename)
# PIPE-2094: Save pipeline statistics file
if pipeline_stats_file and os.path.exists(pipeline_stats_file):
tar.add(pipeline_stats_file, arcname=pipeline_stats_file)
LOG.info('Saving pipeline statistics file %s in %s', pipeline_stats_file, tarfilename)
else:
LOG.info("Pipeline statistics file does not exist.")
tar.close()
return tarfilename
def _make_pipe_manifest(self, context, oussid, stdfproducts, sessiondict, msvisdict, exportmses, calvisdict,
exportcalprods, calimages, calimages_fitskeywords, targetimages, targetimages_fitskeywords):
"""
Generate the manifest file
"""
# Separate the calibrator images into per ous and per ms images
# based on the image values of prefix.
per_ous_calimages = []
per_ous_calimages_keywords = []
per_ms_calimages = []
per_ms_calimages_keywords = []
for i, image in enumerate(calimages):
if (image.startswith(oussid) or
any(image.startswith(session_name) for session_name in sessiondict) or
image.startswith('oussid') or
image.startswith('unknown') or
image.startswith('session')):
per_ous_calimages.append(image)
per_ous_calimages_keywords.append(calimages_fitskeywords[i])
else:
per_ms_calimages.append(image)
per_ms_calimages_keywords.append(calimages_fitskeywords[i])
# Initialize the manifest document and the top level ous status.
pipemanifest = self._init_pipemanifest(oussid)
ouss = pipemanifest.set_ous(oussid)
pipemanifest.add_casa_version(ouss, environment.casa_version_string)
pipemanifest.add_pipeline_version(ouss, environment.pipeline_revision)
pipemanifest.add_procedure_name(ouss, context.project_structure.recipe_name)
pipemanifest.add_environment_info(ouss)
if stdfproducts.ppr_file:
pipemanifest.add_pprfile(ouss, os.path.basename(stdfproducts.ppr_file), oussid, level='member', package=context.project_structure.recipe_name)
# Add the flagging and calibration products
for session_name in sessiondict:
session = pipemanifest.set_session(ouss, session_name)
if exportcalprods:
pipemanifest.add_caltables(session, sessiondict[session_name][1], session_name, level='member', package=context.project_structure.recipe_name)
for vis_name in sessiondict[session_name][0]:
immatchlist = [imname for imname in per_ms_calimages if imname.startswith(vis_name)]
(ms_file, flags_file, calapply_file) = (None, None, None)
if exportmses:
ms_file = msvisdict[vis_name]
if exportcalprods:
(flags_file, calapply_file) = calvisdict[vis_name]
pipemanifest.add_asdm_imlist(session, vis_name, ms_file, flags_file, calapply_file, immatchlist,
'calibrator')
# Add a tar file of the web log
pipemanifest.add_weblog(ouss, os.path.basename(stdfproducts.weblog_file), oussid, level='member', package=context.project_structure.recipe_name)
# Add the processing log independently of the web log
pipemanifest.add_casa_cmdlog(ouss, os.path.basename(stdfproducts.casa_commands_file), oussid, level='member', package=context.project_structure.recipe_name)
# Add the processing script independently of the web log
pipemanifest.add_pipescript(ouss, os.path.basename(stdfproducts.casa_pipescript), oussid, level='member', package=context.project_structure.recipe_name)
# Add the restore script independently of the web log
if stdfproducts.casa_restore_script != 'Undefined':
pipemanifest.add_restorescript(ouss, os.path.basename(stdfproducts.casa_restore_script), oussid, level='member', package=context.project_structure.recipe_name)
# Add the calibrator images
pipemanifest.add_images(ouss, per_ous_calimages, 'calibrator', per_ous_calimages_keywords)
pipemanifest.add_images(ouss, per_ms_calimages, 'calibrator', per_ms_calimages_keywords)
# Add the target images
pipemanifest.add_images(ouss, targetimages, 'target', targetimages_fitskeywords)
return pipemanifest
def _init_pipemanifest(self, oussid):
"""
Initialize the pipeline manifest
"""
return manifest.PipelineManifest(oussid)
def _export_pprfile(self, context, output_dir, products_dir, oussid, pprfile):
# Prepare the search template for the pipeline processing request file.
# Was a template in the past
# Forced to one file now but keep the template structure for the moment
if pprfile == '':
ps = context.project_structure
if ps is None or ps.ppr_file == '':
pprtemplate = None
else:
pprtemplate = os.path.basename(ps.ppr_file)
else:
pprtemplate = os.path.basename(pprfile)
# Locate the pipeline processing request(s) and generate a list
# to be copied to the data products directory. Normally there
# should be only one match but if there are more copy them all.
pprmatches = []
if pprtemplate is not None:
for file in os.listdir(os.path.abspath(output_dir)): # the file list will be names without path
if fnmatch.fnmatch(file, pprtemplate):
LOG.debug('Located pipeline processing request %s', file)
pprmatches.append(os.path.join(output_dir, file))
# Copy the pipeline processing request files.
pprmatchesout = []
for file in pprmatches:
if oussid:
outfile = os.path.join(products_dir, oussid + '.pprequest.xml')
else:
outfile = file
pprmatchesout.append(outfile)
LOG.info('Copying pipeline processing file %s to %s', os.path.basename(file), os.path.basename(outfile))
shutil.copy(file, outfile)
return pprmatchesout
def _export_final_ms(self, context: object, vis: str, products_dir: str) -> str | None:
"""Export a CASA measurement set (MS) to the products directory.
If `self.inputs.tarms` is True, the MS will be saved as a compressed tarball (`.tgz`) in the products directory.
Otherwise, the MS will be copied directly to the products directory as a directory structure.
Args:
context: The current pipeline context object (not used directly in this method).
vis: The path to the measurement set to be exported.
products_dir: The directory where the final MS will be stored.
Returns:
The name of the exported file (compressed tarball or directory), or `None` if an error occurs.
"""
# Define the name of the output tarfile or directory
visname = os.path.basename(vis)
if self.inputs.tarms:
# Export as tarball
tarfilename = visname + '.tgz'
LOG.info('Storing final MS %s in %s', visname, tarfilename)
# Create the tar file
tar_path = os.path.join(products_dir, tarfilename)
try:
with tarfile.open(tar_path, "w:gz") as tar:
tar.add(visname)
return tarfilename
except Exception as e:
LOG.error('Failed to create tarball %s: %s', tar_path, str(e))
return None
else:
# Copy MS directory directly
target_path = os.path.join(products_dir, visname)
LOG.info('Copying final MS %s to %s', visname, target_path)
try:
shutil.copytree(visname, target_path)
return visname
except Exception as e:
LOG.error('Failed to copy MS %s: %s', target_path, str(e))
return None
def _save_final_flagversion(self, vis, flag_version_name):
"""
Save the final flags to a final flag version.
"""
LOG.info('Saving final flags for %s in flag version %s', os.path.basename(vis), flag_version_name)
task = casa_tasks.flagmanager(vis=vis, mode='save', versionname=flag_version_name, comment="Final pipeline flags")
self._executor.execute(task)
def _export_final_flagversion(self, context, vis, flag_version_name, products_dir):
"""
Save the final flags version to a compressed tarfile in products.
"""
# Define the name of the output tarfile
visname = os.path.basename(vis)
tarfilename = visname + '.flagversions.tgz'
LOG.info('Storing final flags for %s in %s', visname, tarfilename)
# Define the versions list file to be saved
flag_version_list = os.path.join(visname + '.flagversions', 'FLAG_VERSION_LIST')
tar_info = tarfile.TarInfo(flag_version_list)
LOG.info('Saving flag version list')
# retrieve all flagversions saved
task = casa_tasks.flagmanager(vis=visname, mode='list')
flag_dict = self._executor.execute(task)
# remove MS key entry if it exists; MS key does not conform with other entries
# more information about flagmanager return dictionary here:
# https://casadocs.readthedocs.io/en/stable/api/tt/casatasks.flagging.flagmanager.html#mode
flag_dict.pop('MS', None)
flag_dict = {x['name']:x['comment'] for x in flag_dict.values()}
# Define the versions list file to be saved
# Define the directory to be saved, and where to store in tar archive.
if not isinstance(flag_version_name, list):
flag_version_name = [flag_version_name]
# PIPE-933: Add flag version 'after_deterministic_flagging' to tarfile
if "after_deterministic_flagging" in flag_dict and "after_deterministic_flagging" not in flag_version_name:
flag_version_name.append("after_deterministic_flagging")
# Create the tar file and populate it
tar = tarfile.open(os.path.join(products_dir, tarfilename), "w:gz")
line = ""
for f in flag_version_name:
LOG.info('Saving flag version %s', f)
flagsname = os.path.join(vis + '.flagversions', 'flags.' + f)
flagsarcname = os.path.join(visname + '.flagversions', 'flags.' + f)
line += "{} : {}\n".format(f, flag_dict[f])
tar.add(flagsname, arcname=flagsarcname)
line = line.encode(sys.stdout.encoding)
tar_info.size = len(line)
tar.addfile(tar_info, io.BytesIO(line))
tar.close()
return tarfilename
def _export_final_applylist(self, context, vis, products_dir, imaging=False):
"""
Save the final calibration list to a file. For now this is
a text file. Eventually it will be the CASA callibrary file.
"""
applyfile_name = self.NameBuilder.calapply_list(vis=vis, aux_product=imaging)
LOG.info('Storing calibration apply list for %s in %s', os.path.basename(vis), applyfile_name)
try:
calto = callibrary.CalTo(vis=vis)
applied_calstate = context.callibrary.applied.trimmed(context, calto)
# Log the list in human readable form. Better way to do this ?
nitems = 0
for calto, calfrom in applied_calstate.merged().items():
LOG.info('Apply to: Field: %s Spw: %s Antenna: %s',
calto.field, calto.spw, calto.antenna)
nitems = nitems + 1
for item in calfrom:
LOG.info(' Gaintable: %s Caltype: %s Gainfield: %s Spwmap: %s Interp: %s',
os.path.basename(item.gaintable),
item.caltype,
item.gainfield,
item.spwmap,
item.interp)
# Open the file
if nitems > 0:
with open(os.path.join(products_dir, applyfile_name), "w") as applyfile:
applyfile.write('# Apply file for %s\n' % (os.path.basename(vis)))
applyfile.write(applied_calstate.as_applycal())
else:
applyfile_name = 'Undefined'
LOG.info('No calibrations for MS %s', os.path.basename(vis))
except:
applyfile_name = 'Undefined'
LOG.info('No calibrations for MS %s', os.path.basename(vis))
return applyfile_name
def _get_sessions(self, context, sessions, vis):
"""
Return a list of sessions where each element of the list contains
the vis files associated with that session. In future this routine
will be driven by the context but for now use the user defined sessions
"""
# If the input session list is empty put all the visibility files
# in the same session.
if len(sessions) == 0:
wksessions = []
for visname in vis:
session = context.observing_run.get_ms(name=visname).session
wksessions.append(session)
else:
wksessions = sessions
# Determine the number of unique sessions.
session_seqno = 0; session_dict = {}
for i in range(len(wksessions)):
if wksessions[i] not in session_dict:
session_dict[wksessions[i]] = session_seqno
session_seqno = session_seqno + 1
# Initialize the output session names and visibility file lists
session_names = []
session_vis_list = []
for key, value in sorted(session_dict.items(), key=lambda k_v: (k_v[1], k_v[0])):
session_names.append(key)
session_vis_list.append([])
# Assign the visibility files to the correct session
for j in range(len(vis)):
# Match the session names if possible
if j < len(wksessions):
for i in range(len(session_names)):
if wksessions[j] == session_names[i]:
session_vis_list[i].append(vis[j])
# Assign to the last session
else:
session_vis_list[len(session_names)-1].append(vis[j])
# Log the sessions
for i in range(len(session_vis_list)):
LOG.info('Visibility list for session %s is %s', session_names[i], session_vis_list[i])
return wksessions, session_names, session_vis_list
def _export_final_calfiles(self, context, oussid, session, vislist, products_dir, imaging=False):
"""
Save the final calibration tables in a tarfile one file
per session.
"""
# Define the name of the output tarfile
tarfilename = self.NameBuilder.caltables(ousstatus_entity_id=oussid,
session_name=session,
aux_product=imaging)
LOG.info('Saving final caltables for %s in %s', session, tarfilename)
# Create the tar file
caltables = set()
for visfile in vislist:
LOG.info('Collecting final caltables for %s in %s', os.path.basename(visfile), tarfilename)
# Create the list of applied caltables for that vis
try:
calto = callibrary.CalTo(vis=visfile)
calstate = context.callibrary.applied.trimmed(context, calto)
caltables.update(calstate.get_caltable())
except:
LOG.info('No caltables for MS %s', os.path.basename(visfile))
if not caltables:
return 'Undefined'
with tarfile.open(os.path.join(products_dir, tarfilename), 'w:gz') as tar:
# Tar the session list.
for table in caltables:
tar.add(table, arcname=os.path.basename(table))
return tarfilename
def _export_weblog(self, context, products_dir):
"""
Save the processing web log to a tarfile
"""
return utils.export_weblog_as_tar(context, products_dir, self.NameBuilder)
def _export_casa_commands_log(self, context, casalog_name, products_dir, oussid):
"""
Save the CASA commands file.
"""
casalog_file = os.path.join(context.report_dir, casalog_name)
ps = context.project_structure
out_casalog_file = self.NameBuilder.casa_script(casalog_name,
project_structure=ps,
ousstatus_entity_id=oussid,
output_dir=products_dir)
LOG.info('Copying casa commands log %s to %s', casalog_file, out_casalog_file)
shutil.copy(casalog_file, out_casalog_file)
return os.path.basename(out_casalog_file)
def _export_casa_restore_script(self, context, script_name, products_dir, oussid, vislist, session_list):
"""
Save the CASA restore scropt.
"""
script_file = os.path.join(context.report_dir, script_name)
# Get the output file name
ps = context.project_structure
out_script_file = self.NameBuilder.casa_script(script_name,
project_structure=ps,
ousstatus_entity_id=oussid,
output_dir=products_dir)
LOG.info('Creating casa restore script %s', script_file)
# This is hardcoded.
tmpvislist = []
#ALMA default
ocorr_mode = 'ca'
for vis in vislist:
filename = os.path.basename(vis)
if filename.endswith('.ms'):
filename, filext = os.path.splitext(filename)
tmpvislist.append(filename)
task_string = " hif_restoredata(vis=%s, session=%s, ocorr_mode='%s')" % (tmpvislist, session_list, ocorr_mode)
template = '''h_init()
try:
%s
finally:
h_save()
''' % task_string
with open(script_file, 'w') as casa_restore_file:
casa_restore_file.write(template)
LOG.info('Copying casa restore script %s to %s', script_file, out_script_file)
shutil.copy(script_file, out_script_file)
return os.path.basename(out_script_file)
def _export_casa_script(self, context, casascript_name, products_dir, oussid):
"""
Save the CASA script.
"""
ps = context.project_structure
casascript_file = os.path.join(context.report_dir, casascript_name)
out_casascript_file = self.NameBuilder.casa_script(casascript_name,
project_structure=ps,
ousstatus_entity_id=oussid,
output_dir=products_dir)
LOG.info('Copying casa script file %s to %s', casascript_file, out_casascript_file)
shutil.copy(casascript_file, out_casascript_file)
return os.path.basename(out_casascript_file)
def _export_pipe_manifest(self, oussid, manifest_name, products_dir, pipemanifest):
"""
Save the manifest file.
"""
out_manifest_file = self.NameBuilder.manifest(manifest_name,
ousstatus_entity_id=oussid,
output_dir=products_dir)
LOG.info('Creating manifest file %s', out_manifest_file)
# We can add the manifest element only now after the manifest name is known
ouss = pipemanifest.get_ous()
pipemanifest.add_manifest(ouss, os.path.basename(out_manifest_file), ous_name=oussid, level='member', package=self.inputs.context.project_structure.recipe_name)
pipemanifest.write(out_manifest_file)
return out_manifest_file
def _export_images(self, context, calimages, calintents, images, products_dir, ous_name):
"""
Export the images to FITS files.
"""
# Create the image list
images_list = []
if len(images) == 0:
# Get the image library
if calimages:
LOG.info('Exporting calibrator source images')
if calintents == '':
intents = ['PHASE', 'BANDPASS', 'CHECK', 'AMPLITUDE', 'POLARIZATION']
else:
intents = calintents.split(',')
original_cleanlist = context.calimlist.get_imlist()
else:
LOG.info('Exporting target source images')
intents = ['TARGET']
original_cleanlist = context.sciimlist.get_imlist()
# Walk through image list in reverse order to keep only the latest products
# based on the key tuple (field, intent, spw, specmode, stokes, version).
# If the key already exists, skip any previous products. In addition, skip
# any target Stokes I products if the corresponding Stokes IQUV product also
# exists (PIPE-2465).
cleanlist_dict = {}
product_keys = {}
deleted_keys = []
for image in reversed(original_cleanlist):
# SD saves the spwlist as list of ints while IF uses a comma separated string.
# Since lists are not hashable, we need to convert them.
if type(image['spwlist']) is not str:
spwlist_key = ','.join(str(spwid) for spwid in image['spwlist'])
else:
spwlist_key = image['spwlist']
product_key = (image['sourcename'], image['sourcetype'], spwlist_key, image['specmode'], image['stokes'], image['datatype'], image['version'])
# Store only Stokes IQUV if both I and IQUV exist
if image['sourcetype'] == 'TARGET' and image['stokes'] == 'IQUV':
# Make corresponding Stokes I key
product_key_stokes_i = (image['sourcename'], image['sourcetype'], spwlist_key, image['specmode'], 'I', image['datatype'], image['version'])
# Keep key to catch arbitrary sequences of I and IQUV
# images of the same data selection.
deleted_keys.append(product_key_stokes_i)
# Remove any previously added Stokes I image
if product_key_stokes_i in cleanlist_dict:
del cleanlist_dict[product_key_stokes_i]
if product_key not in cleanlist_dict and product_key not in deleted_keys:
# We need to store the image
image['fitsfiles'] = []
image['auxfitsfiles'] = []
image['imagename_version'] = []
version = image.get('version', 1)
# Image name probably includes path
if image['sourcetype'] in intents:
if image['multiterm']:
for nt in range(image['multiterm']):
imagename = image['imagename'].replace('.image', '.image.tt%d' % (nt))
image['imagename_version'].append((imagename, version))
image['fitsfiles'].append(fitsname(products_dir, imagename, version))
if (image['imagename'].find('.pbcor') != -1):
imagename = image['imagename'].replace('.image.pbcor', '.alpha')
image['imagename_version'].append((imagename, version))
image['fitsfiles'].append(fitsname(products_dir, imagename, version))
imagename = '%s.error' % (image['imagename'].replace('.image.pbcor', '.alpha'))
image['imagename_version'].append((imagename, version))
image['fitsfiles'].append(fitsname(products_dir, imagename, version))
else:
imagename = image['imagename'].replace('.image', '.alpha')
image['imagename_version'].append((imagename, version))
image['fitsfiles'].append(fitsname(products_dir, imagename, version))
imagename = '%s.error' % (image['imagename'].replace('.image', '.alpha'))
image['imagename_version'].append((imagename, version))
image['fitsfiles'].append(fitsname(products_dir, imagename, version))
elif (image['imagename'].find('image.sd') != -1): # single dish
imagename = image['imagename']
image['imagename_version'].append((imagename, version))
image['fitsfiles'].append(fitsname(products_dir, imagename, version))
imagename = image['imagename'].replace('image.sd', 'image.sd.weight')
image['imagename_version'].append((imagename, version))
image['fitsfiles'].append(fitsname(products_dir, imagename, version))
else:
imagename = image['imagename']
image['imagename_version'].append((imagename, version))
image['fitsfiles'].append(fitsname(products_dir, imagename, version))
# Add PBs for interferometry
if (image['imagename'].find('.image') != -1) and (image['imagename'].find('.image.sd') == -1):
if (image['imagename'].find('.pbcor') != -1):
if (image['multiterm']):
imagename = image['imagename'].replace('.image.pbcor', '.pb.tt0')
image['imagename_version'].append((imagename, version))
image['auxfitsfiles'].append(fitsname(products_dir, imagename, version))
else:
imagename = image['imagename'].replace('.image.pbcor', '.pb')
image['imagename_version'].append((imagename, version))
image['auxfitsfiles'].append(fitsname(products_dir, imagename, version))
else:
if (image['multiterm']):
imagename = image['imagename'].replace('.image', '.pb.tt0')
image['imagename_version'].append((imagename, version))
image['auxfitsfiles'].append(fitsname(products_dir, imagename, version))
else:
imagename = image['imagename'].replace('.image', '.pb')
image['imagename_version'].append((imagename, version))
image['auxfitsfiles'].append(fitsname(products_dir, imagename, version))
# Add auto-boxing masks for interferometry
if (image['imagename'].find('.image') != -1) and (image['imagename'].find('.image.sd') == -1):
if (image['imagename'].find('.pbcor') != -1):
imagename = image['imagename'].replace('.image.pbcor', '.mask')
imagename2 = image['imagename'].replace('.image.pbcor', '.cleanmask')
# Special case of IQUV target imaging (PIPE-2464) can use a '.cleanmask' from a previous
# Stokes I imaging stage in which case '.mask' and '.cleanmask' both exist.
if (os.path.exists(imagename) and not os.path.exists(imagename2)) or \
(os.path.exists(imagename) and os.path.exists(imagename2) and 'IQUV' in imagename and 'IQUV' in imagename2):
image['imagename_version'].append((imagename, version))
image['auxfitsfiles'].append(fitsname(products_dir, imagename, version))
else:
imagename = image['imagename'].replace('.image', '.mask')
imagename2 = image['imagename'].replace('.image', '.cleanmask')
# PIPE-2464 case like above
if (os.path.exists(imagename) and not os.path.exists(imagename2)) or \
(os.path.exists(imagename) and os.path.exists(imagename2) and 'IQUV' in imagename and 'IQUV' in imagename2):
image['imagename_version'].append((imagename, version))
image['auxfitsfiles'].append(fitsname(products_dir, imagename, version))
# Add POLI/POLA images for polarization calibrators
if image['sourcetype'] == 'POLARIZATION':
if image['imagename'].find('.pbcor') != -1:
for polcal_imtype in ('POLI', 'POLA'):
imagename = image['imagename'].replace('.pbcor', '').replace('IQUV', polcal_imtype)
if os.path.exists(imagename):
image['imagename_version'].append((imagename, version))
image['fitsfiles'].append(fitsname(products_dir, imagename, version))
else:
for polcal_imtype in ('POLI', 'POLA'):
imagename = image['imagename'].replace('IQUV', polcal_imtype)
if os.path.exists(imagename):
image['imagename_version'].append((imagename, version))
image['fitsfiles'].append(fitsname(products_dir, imagename, version))
cleanlist_dict[product_key] = image
cleanlist = []
# Store remaining images in original order
for k in reversed(cleanlist_dict):
cleanlist.append(cleanlist_dict[k])
images_list.extend(cleanlist_dict[k]['imagename_version'])
else:
# Assume only the root image name was given.
cleanlib = imagelibrary.ImageLibrary()
for image in images:
if calimages:
imageitem = imagelibrary.ImageItem(imagename=image,
sourcename='UNKNOWN',
spwlist='UNKNOWN',
sourcetype='CALIBRATOR')
else:
imageitem = imagelibrary.ImageItem(imagename=image,
sourcename='UNKNOWN',
spwlist='UNKNOWN',
sourcetype='TARGET')
cleanlib.add_item(imageitem)
if os.path.basename(image) == '':
images_list.append((os.path.join(context.output_dir, image), 1))
else:
images_list.append((image, 1))
cleanlist = cleanlib.get_imlist()
# Need to add the FITS names
for i in range(len(cleanlist)):
cleanlist[i]['fitsfiles'] = [fitsname(products_dir, images_list[i][0])]
cleanlist[i]['auxfitsfiles'] = []
# Convert to FITS.
fits_list = []
fits_keywords_list = []
for image_ver in images_list:
image, version = image_ver
fitsfile = fitsname(products_dir, image, version)
# skip if image doesn't exist
if not os.path.exists(image):
LOG.info('Skipping unexisting image %s', os.path.basename(image))
continue
LOG.info('Saving final image %s to FITS file %s', os.path.basename(image), os.path.basename(fitsfile))
# PIPE-325: abbreviate 'spw' and/or 'virtspw' for FITS header when spw string is "too long"
with casa_tools.ImageReader(image) as img:
info = img.miscinfo()
if 'spw' in info:
if len(info['spw']) >= 68:
spw_sorted = sorted([int(x) for x in info['spw'].split(',')])
info['spw'] = '{},...,{}'.format(spw_sorted[0], spw_sorted[-1])
img.setmiscinfo(info)
if 'virtspw' in info:
if len(info['virtspw']) >= 68:
spw_sorted = sorted([int(x) for x in info['virtspw'].split(',')])
info['virtspw'] = '{},...,{}'.format(spw_sorted[0], spw_sorted[-1])
img.setmiscinfo(info)
task = casa_tasks.exportfits(imagename=image, fitsimage=fitsfile, velocity=False, optical=False,
bitpix=-32, minpix=0, maxpix=-1, overwrite=True, dropstokes=False,
stokeslast=True)
self._executor.execute(task)
fits_list.append(fitsfile)
# Fetch header keywords for manifest
try:
ff = apfits.open(fitsfile)
fits_keywords = dict()
# Loop through FITS keywords.
for key in ['object', 'obsra', 'obsdec', 'intent', 'specmode',
'spw', 'virtspw', 'spwisvrt',
'naxis1', 'ctype1', 'cunit1', 'crpix1', 'crval1', 'cdelt1',
'naxis2', 'ctype2', 'cunit2', 'crpix2', 'crval2', 'cdelt2',
'naxis3', 'ctype3', 'cunit3', 'crpix3', 'crval3', 'cdelt3',
'naxis4', 'ctype4', 'cunit4', 'crpix4', 'crval4', 'cdelt4',
'bmaj', 'bmin', 'bpa', 'robust', 'weight',
'effbw', 'level', 'ctrfrq', 'obspatt', 'arrays', 'modifier', 'session']:
fits_keywords[key] = str(ff[0].header.get(key, 'N/A'))
if 'nspwnam' in ff[0].header:
nspwnam = ff[0].header['nspwnam']
fits_keywords['nspwnam'] = str(nspwnam)
for i in range(1, nspwnam+1):
key = 'spwnam{:02d}'.format(i)
fits_keywords[key] = str(ff[0].header.get(key, 'N/A'))
if 'nsessio' in ff[0].header:
nsession = ff[0].header['nsessio']
session = ''
for i in range(1, nsession+1):
key = 'sessio{:02d}'.format(i)
session += str(ff[0].header.get(key, 'N/A'))
fits_keywords['session'] = session
# Some names and/or values need to be mapped
fits_keywords['imagemin'] = str(ff[0].header.get('datamin', 'N/A'))
fits_keywords['imagemax'] = str(ff[0].header.get('datamax', 'N/A'))
fits_keywords['rms'] = str(ff[0].header.get('datarms', 'N/A'))
fits_keywords['producttype'] = str(ff[0].header.get('specmode', 'N/A'))
fits_keywords['pl_datatype'] = str(ff[0].header.get('datatype', 'N/A'))
fits_keywords['pol'] = str(ff[0].header.get('stokes', 'N/A'))
imagetype = str(ff[0].header['type'])
if imagetype == 'flux':
fits_keywords['datatype'] = 'pb'
elif imagetype == 'pbcorimage':
fits_keywords['datatype'] = 'pbcor'
elif imagetype == 'cleanmask':
fits_keywords['datatype'] = 'mask'
elif imagetype == 'singledish':
fits_keywords['datatype'] = 'sd'
else:
fits_keywords['datatype'] = 'N/A'
fits_keywords['format'] = 'fits'
fits_keywords['ous'] = ous_name
ff.close()
except Exception as e:
LOG.info('Fetching FITS keywords for {} failed: {}'.format(fitsfile, e))
fits_keywords = dict()
fits_keywords_list.append(fits_keywords)
new_cleanlist = copy.deepcopy(cleanlist)
return new_cleanlist, fits_list, fits_keywords_list
@staticmethod
def _add_to_manifest(manifest_file, aux_fproducts, aux_caltablesdict, aux_calapplysdict, aqua_report, ous_name, package="N/A"):
pipemanifest = manifest.PipelineManifest('')
pipemanifest.import_xml(manifest_file)
ouss = pipemanifest.get_ous()
if aqua_report:
pipemanifest.add_aqua_report(ouss, os.path.basename(aqua_report), ous_name, level='member', package=package)
if aux_fproducts:
# Add auxiliary data products file
pipemanifest.add_aux_products_file(ouss, os.path.basename(aux_fproducts), ous_name, level='member', package=package)
# Add the auxiliary caltables
if aux_caltablesdict:
for session_name in aux_caltablesdict:
session = pipemanifest.get_session(ouss, session_name)
if session is None:
session = pipemanifest.set_session(ouss, session_name)
pipemanifest.add_auxcaltables(session, aux_caltablesdict[session_name][1], session_name)
for vis_name in aux_caltablesdict[session_name][0]:
pipemanifest.add_auxasdm(session, vis_name, aux_calapplysdict[vis_name])
pipemanifest.write(manifest_file)
def _export_aqua_report(self, context: Context, oussid: str, products_dir: str, report_generator: AquaXmlGenerator,
weblog_filename: str):
"""Save the AQUA report.
Note the method is mostly a duplicate of the conterpart
in hifa/tasks/exportdata/almaexportdata
Args:
context : pipeline context
oussid : OUS status ID
products_dir: path of product directory
report_generator: AQUA XML Generator
weblog_filename: weblog tarball filename
Returns:
AQUA report file path
"""
aqua_file = os.path.join(context.output_dir, context.logs['aqua_report'])
LOG.info('Generating pipeline AQUA report')
try:
report_xml = report_generator.get_report_xml(context)
export_aqua_to_disk(report_xml, aqua_file)
except Exception as e:
LOG.exception('Error generating the pipeline AQUA report', exc_info=e)
return 'Undefined'
ps = context.project_structure
out_aqua_file = self.NameBuilder.aqua_report(context.logs['aqua_report'],
project_structure=ps,
ousstatus_entity_id=oussid,
output_dir=products_dir)
LOG.info(f'Copying AQUA report {aqua_file} to {out_aqua_file}')
shutil.copy(aqua_file, out_aqua_file)
# put aqua report into html directory, so it can be linked to the weblog
aqua_html_path = os.path.join(context.report_dir, aqua_file)
LOG.info(f'Copying AQUA report {aqua_file} to {aqua_html_path}')
shutil.copy(aqua_file, context.report_dir)
products_weblog_tarball = os.path.join(context.products_dir, weblog_filename)
if os.path.isfile(products_weblog_tarball) and tarfile.is_tarfile(products_weblog_tarball):
with tarfile.open(products_weblog_tarball, "r:gz") as tar:
files_to_keep = []
aqua_html_path_in_tarball = None
for member in tar.getmembers():
if aqua_file in member.name:
aqua_html_path_in_tarball = member
else:
files_to_keep.append(member)
# PIPE-1942: we create a temp tarball file under a random UUID name, add/update the latest AUQA report XML
# and all other existing weblog content into it, and overwrite the old weblog tarball with this new one.
# We do this because tarfile does not support updating files in place. We also avoid using Python/tempfile
# because 1) it might create a large temp file in /tmp on the computing node; 2) it will create a file
# without the group read/write permission.
# Create a new tarball with the updated aqua file, keeping all others the same
temp_weblog_tarball = str(uuid.uuid4())
with tarfile.open(temp_weblog_tarball, "w:gz") as new_tar:
LOG.debug(f'Created a temp tarball file: {temp_weblog_tarball}')
# Add all the existing files (excluding the aqua report XML) from the old tarball to the new tarball
for member in files_to_keep:
if member.isreg():
new_tar.addfile(member, tar.extractfile(member))
else:
new_tar.addfile(member)
# If an aqua file was already in the old weblog tarball, then add it under the same relative path/name.
# Else, add it into the weblog tarball.
if aqua_html_path_in_tarball is not None:
LOG.debug(f'Updating {aqua_html_path_in_tarball.name} in contents of {temp_weblog_tarball}')
new_tar.add(aqua_file, arcname=aqua_html_path_in_tarball.name)
else:
LOG.debug(f'Adding {aqua_html_path} to contents of {temp_weblog_tarball}')
new_tar.add(f'{aqua_html_path}')
LOG.info(f'Adding/updating the AQUA report in {products_weblog_tarball}')
LOG.debug(f'Moving {temp_weblog_tarball} to {products_weblog_tarball}')
shutil.move(temp_weblog_tarball, products_weblog_tarball)
return os.path.basename(out_aqua_file)
def _export_stats_file(self, context, oussid='') -> str:
"""Generate and output the stats file.
Args:
context: the pipieline context
oussid: the ous id
Returns:
The filename of the outputfile.
"""
from pipeline.infrastructure import pipeline_statistics as pstats
statsfile_name = "pipeline_stats_{}.json".format(oussid)
stats_file = os.path.join(context.output_dir, statsfile_name)
LOG.info('Generating pipeline statistics file')
stats_dict = pstats.generate_stats(context)
# Write the stats file to disk
with open(stats_file, 'w', encoding='utf-8') as f:
json.dump(stats_dict, f, ensure_ascii=False, indent=4, sort_keys=True)
return stats_file