Source code for pipeline.hsd.tasks.exportdata.exportdata

"""
The exportdata for SD module.

This module provides base classes for preparing data products on disk
for upload to the archive.
"""
from __future__ import annotations

import collections
import glob
import os
import shutil
import string
import tarfile
import traceback
from typing import TYPE_CHECKING

import pipeline.h.tasks.exportdata.exportdata as exportdata
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.project as project
from pipeline.hsd.tasks.importdata.importdata import SDImportDataResults
from pipeline.hsd.tasks.restoredata.restoredata import SDRestoreDataResults
from pipeline.hsd.tasks.common.utils import is_nro
from pipeline.infrastructure import task_registry
from pipeline.infrastructure.utils import absolute_path

from . import almasdaqua

if TYPE_CHECKING:
    from collections.abc import Generator

    from pipeline.infrastructure.launcher import Context

# the logger for this module
LOG = infrastructure.logging.get_logger(__name__)


class SDExportDataInputs(exportdata.ExportDataInputs):
    """Inputs class for SDExportData.

    Inputs class must be separated per task class even if
    it's effectively the same.
    """

    pass


[docs] @task_registry.set_equivalent_casa_task('hsd_exportdata') @task_registry.set_casa_commands_comment('The output data products are computed.') class SDExportData(exportdata.ExportData): """A class for exporting single dish data to the products subdirectory. It performs the following additional operations after ExportData does: - Exports auxiliary tar files of the calibration tables one per session - Exports auxiliary MS products (calibration apply files) in text files - Saves a K2JY reference file and flag files into tarball - Saves the AQUA report file """ Inputs = SDExportDataInputs
[docs] def prepare(self) -> exportdata.ExportDataResults: """ Prepare and execute an export data job appropriate to the task inputs. This is almost equivalent to ALMAExportData.prepare(). Returns: ExportDataResults object """ results = super().prepare() oussid = self.inputs.context.get_oussid() # Make the imaging list of names of MeasurementSet and the sessions lists. _, session_names, session_vislists, vislist = \ self._make_lists(self.inputs.context, self.inputs.session, self.inputs.vis, imaging_only_mses=None) LOG.info('vislist={}'.format(vislist)) if vislist: # Export the auxiliary caltables if any # These are currently the uvcontinuum fit tables. auxcaltables = \ self._do_aux_session_products(self.inputs.context, oussid, session_names, session_vislists, self.inputs.products_dir) # Export the auxiliary cal apply files if any # These are currently the uvcontinuum fit tables. auxcalapplys = \ self._do_aux_ms_products(self.inputs.context, vislist, self.inputs.products_dir) else: auxcaltables = None auxcalapplys = None # Create and export the pipeline stats file for ALMA single dish data pipeline_stats_file = None if not is_nro(self.inputs.context): try: pipeline_stats_file = self._export_stats_file(context=self.inputs.context, oussid=oussid) except Exception as e: LOG.info("Unable to output pipeline statistics file: {}".format(e)) LOG.debug(traceback.format_exc()) pass # Export the auxiliary file products into a single tar file # These are optional for reprocessing but informative to the user # The calibrator source fluxes file # The antenna positions file # The continuum regions file # The target flagging file # The pipeline statistics file (if it exists) recipe_name = self.get_recipename(self.inputs.context) if not recipe_name: prefix = oussid else: prefix = oussid + '.' + recipe_name auxfproducts = \ self._do_auxiliary_products(self.inputs.context, oussid, self.inputs.output_dir, self.inputs.products_dir, pipeline_stats_file) # Export the AQUA report pipe_aqua_reportfile = self._export_aqua_report(context=self.inputs.context, oussid=prefix, products_dir=self.inputs.products_dir, report_generator=almasdaqua.AlmaAquaXmlGenerator(), weblog_filename=results.weblog) # Update the manifest if auxfproducts is not None or pipe_aqua_reportfile is not None: manifest_file = os.path.join(self.inputs.context.products_dir, results.manifest) recipe_name = self.inputs.context.project_structure.recipe_name self._add_to_manifest(manifest_file, auxfproducts, auxcaltables, auxcalapplys, pipe_aqua_reportfile, oussid, recipe_name) return results
def _do_aux_session_products(self, context: Context, oussid: str, session_names: list[str], session_vislists: list[list[str]], products_dir: str) -> \ dict[str, list[str]]: """Export auxiliary calibration tables to products directory and return session dictionary. Args: context : pipeline context oussid : OUS status ID session_names : list of session names session_vislists : list of MeasurementSet names associated with each session products_dir : path of products directory Returns: ordered dictionary object contains session name(key) and a list of file name of MeasurementSet and the name of auxiliary calibration product associated with the session (value). """ # Make the standard sessions dictionary and export per session products # Currently these are compressed tar files of per session calibration tables # Export tar files of the calibration tables one per session LOG.info('_do_aux_session_products') caltable_file_list = [] for i in range(len(session_names)): caltable_file = self._export_final_baseline_calfiles( context, oussid, session_names[i], session_vislists[i], products_dir) caltable_file_list.append(caltable_file) # Create the ordered session dictionary # The keys are the session names # The values are a tuple containing the vislist and the caltables sessiondict = collections.OrderedDict() for i in range(len(session_names)): sessiondict[session_names[i]] = ( [os.path.basename(visfile) for visfile in session_vislists[i]], os.path.basename(caltable_file_list[i]) ) return sessiondict def __get_last_baseline_table(self, vis: str) -> str | None: """Sort baseline table names and return the last of them. Args: vis : MeasurementSet name Returns: the last baseline table name """ basename = os.path.basename(vis.rstrip('/')) bl_tables = glob.glob('{}.*hsd_baseline*.bl.tbl'.format(basename)) if len(bl_tables) > 0: bl_tables.sort() name = bl_tables[-1] LOG.debug('bl cal table for {} is {}'.format(vis, name)) return name else: return None def _export_final_baseline_calfiles(self, context: Context, oussid: str, session: str, vislist: list[str], products_dir: str) -> str: """Save the final baseline tables in a tarfile one file per session. This method is an exact copy of same method in superclass except for handling baseline caltables. Args: context : pipeline context oussid : OUS status ID session : session name vislist : list of MeasurementSet names products_dir : products directory Returns: tar file name """ # Save the current working directory and move to the pipeline # working directory. This is required for tarfile IO # Define the name of the output tarfile tarfilename = self.NameBuilder.caltables(ousstatus_entity_id=oussid, session_name=session, aux_product=True) # tarfilename = '{}.{}.auxcaltables.tgz'.format(oussid, session) # tarfilename = '{}.{}.caltables.tgz'.format(oussid, session) LOG.info('Saving final caltables for %s in %s', session, tarfilename) # Create the tar file # caltables = set() bl_caltables = set() for visfile in vislist: LOG.info('Collecting final caltables for %s in %s', os.path.basename(visfile), tarfilename) # Create the list of baseline caltable for that vis name = self.__get_last_baseline_table(visfile) if name is not None: bl_caltables.add(name) with tarfile.open(os.path.join(products_dir, tarfilename), 'w:gz')\ as tar: # Tar the session list. # for table in caltables: # tar.add(table, arcname=os.path.basename(table)) for table in bl_caltables: tar.add(table, arcname=os.path.basename(table)) return tarfilename def _do_aux_ms_products(self, context: Context, vislist: list[str], products_dir: str) -> \ dict[str, str]: """Export auxiliary MS products. This method exports calibration apply files per MeasurementSet which store calibration commands to apply baseline tables. Args: context : pipeline context vislist : list of MeasurementSet names products_dir : path of products directory Returns: an ordered dictionary which contains MeasurementSet name (key) and calibration apply file name(value) """ # Loop over the MeasurementSets in the working directory, and # create the calibration apply file(s) in the products directory. apply_file_list = [] for visfile in vislist: apply_file = self._export_final_baseline_applylist(context, visfile, products_dir) apply_file_list.append(apply_file) # Create the ordered MeasurementSet names dictionary # The keys are the base MeasurementSet names # The values are a tuple containing the flags and applycal files visdict = collections.OrderedDict() for i in range(len(vislist)): visdict[os.path.basename(vislist[i])] = \ os.path.basename(apply_file_list[i]) return visdict def _export_final_baseline_applylist(self, context: Context, vis: str, products_dir: str) -> str: """Save commands to apply the final baseline table to a file. For now this is a text file. Eventually it will be the CASA callibrary file. Args: context : pipeline context vis : MeasurementSet name products_dir : path of products directory Returns: the name of calibration apply file """ applyfile_name = self.NameBuilder.calapply_list(os.path.basename(vis), aux_product=True) # applyfile_name = os.path.basename(vis) + '.auxcalapply.txt' LOG.info('Storing calibration apply list for %s in %s', os.path.basename(vis), applyfile_name) try: # Log the list in human readable form. Better way to do this ? cmd = string.Template("sdbaseline(infile='${infile}', " "datacolumn='corrected', spw='${spw}', " "blmode='apply', bltable='${bltable}', " "blfunc='poly', outfile='${outfile}', " "overwrite=True)") # Create the list of baseline caltable for that vis name = self.__get_last_baseline_table(vis) ms = context.observing_run.get_ms(vis) science_spws = ms.get_spectral_windows(science_windows_only=True) spw = ','.join([str(s.id) for s in science_spws]) if name is not None: applied_calstate = \ cmd.safe_substitute(infile=vis, bltable=name, spw=spw, outfile=vis.rstrip('/') + '_bl') # Open the file. with open(os.path.join(products_dir, applyfile_name), "w") \ as applyfile: applyfile.write('# Apply file for %s\n' % (os.path.basename(vis))) applyfile.write(applied_calstate) except Exception: applyfile_name = 'Undefined' LOG.info('No calibrations for MS %s' % os.path.basename(vis)) return applyfile_name def _detect_jyperk(self, context: Context) -> str: """Detect K2Jy file and return it. Args: context : pipeline context Raises: RuntimeError: raise if multiple K2Jy files are detected Returns: path of K2Jy file """ reffile_list = set(self.__get_reffile(context.results)) if len(reffile_list) == 0: # if no reffile is found, return None LOG.debug('No K2Jy factor file found.') return None if len(reffile_list) > 1: raise RuntimeError("K2Jy conversion file must be only one. %s found." % (len(reffile_list))) jyperk = reffile_list.pop() if not os.path.exists(jyperk): # if reffile doesn't exist, return None LOG.debug('K2Jy file \'%s\' not found' % jyperk) return None LOG.info('Exporting {0} as a product'.format(jyperk)) return absolute_path(jyperk) @staticmethod def __get_reffile(results: Generator[str, None, None]): """Find SDK2JyCalResults and yield K2JY reference file. Args: results : a list of ResultsProxy (contains Results object of every tasks) Yields: K2JY reference file, default: jyperk.csv """ for proxy in results: result = proxy.read() if not isinstance(result, basetask.ResultsList): result = [result] for r in result: if str(r).find('SDK2JyCalResults') != -1 and hasattr(r, 'reffile'): reffile = r.reffile if reffile is not None and os.path.exists(reffile): yield reffile def _do_auxiliary_products(self, context: Context, oussid: str, output_dir: str, products_dir: str, pipeline_stats_file: str) -> str: """Save a K2JY reference file and flag files into tarball. Args: context : pipeline context oussid : OUS Status UID output_dir : path of output directory products_dir : path of products directory pipeline_stats_file: pipeline stats file Returns: tarball file name """ # Track whether any auxiliary products exist to be exported. aux_prod_exists = False # Get the jyperk file, check whether it exists. jyperk = self._detect_jyperk(context) if jyperk and os.path.exists(jyperk): aux_prod_exists = True # Export the general and target source template flagging files # The general template flagging files are not required # for the restore but are informative to the user. # Whether or not the target template files should be exported to # the archive depends on the final place of the target flagging # step in the work flow and how flags will or will not be stored # back into the ASDM. flags_file_list = [os.path.join(output_dir, fname) for fname in glob.glob('*.flag*template.txt')] if flags_file_list: aux_prod_exists = True # If no auxiliary product was found, skip creation of tarfile # and return early. if not aux_prod_exists: return None # Create the tarfile. # Define the name of the output tarfile. tarfilename = self.NameBuilder.auxiliary_products( 'auxproducts.tgz', ousstatus_entity_id=oussid) # tarfilename = '{}.auxproducts.tgz'.format(oussid) LOG.info('Saving auxiliary data products in %s', tarfilename) # Open tarfile. with tarfile.open(os.path.join(products_dir, tarfilename), 'w:gz') \ as tar: # Save flux file. if jyperk and os.path.exists(jyperk): tar.add(jyperk, arcname=os.path.basename(jyperk)) LOG.info('Saving auxiliary data product ' '{} in {}'.format(os.path.basename(jyperk), tarfilename)) elif isinstance(jyperk, str): LOG.info('Auxiliary data product ' '{} does not exist'.format(os.path.basename(jyperk))) # Save flag files. for flags_file in flags_file_list: if os.path.exists(flags_file): tar.add(flags_file, arcname=os.path.basename(flags_file)) LOG.info('Saving auxiliary data product ' '{} in {}'.format(os.path.basename(flags_file), tarfilename)) else: LOG.info('Auxiliary data product ' '{} does not exist'.format(os.path.basename(flags_file))) # PIPE-2380: Save the pipeline statistics file if pipeline_stats_file and os.path.exists(pipeline_stats_file): tar.add(pipeline_stats_file, arcname=pipeline_stats_file) LOG.info('Saving pipeline statistics file %s in %s', pipeline_stats_file, tarfilename) else: LOG.info("Pipeline statistics file does not exist.") tar.close() return tarfilename def _export_casa_restore_script(self, context: Context, script_name: str, products_dir: str, oussid: str, vislist: list[str], session_list: list[str]) -> str: """Save the CASA restore script. Args: context : pipeline context script_name : name of the restore script products_dir : name of the product directory oussid : OUS Status ID vislist : list of MeasurementSet names session_list : list of session Returns: path of output CASA script file """ tmpvislist = [] for vis in vislist: filename = os.path.basename(vis) if filename.endswith('.ms'): filename, filext = os.path.splitext(filename) tmpvislist.append(filename) restore_task_name = 'hsd_restoredata' hm_rasterscan = self._get_hm_rasterscan_value(context) args = collections.OrderedDict(vis=tmpvislist, session=session_list, ocorr_mode='ao', hm_rasterscan=hm_rasterscan) return self._export_casa_restore_script_template(context, script_name, products_dir, oussid, restore_task_name, args) def _get_hm_rasterscan_value(self, context: Context) -> str: """Retrieve hm_rasterscan value from Results object. This method checks if either SDImportDataResults or SDRestoreDataResults object is registered to Pipeline context. If exists, hm_rasterscan value is retrieved from the results object. If no SDImportDataResults nor SDRestoreDataResults object is registered, 'time' will be returned. Args: context: Pipeline context Returns: The hm_rasterscan value. """ results_filter = filter( lambda x: isinstance(x, basetask.ResultsList) and isinstance(x[0], (SDImportDataResults, SDRestoreDataResults)), map(lambda x: x.read(), context.results) ) importdata_results = next(results_filter, None) if importdata_results: hm_rasterscan = importdata_results.inputs.get('hm_rasterscan', 'time') else: hm_rasterscan = 'time' return hm_rasterscan def _export_casa_restore_script_template(self, context: Context, script_name: str, products_dir: str, oussid: str, restore_task_name: str, restore_task_args: dict[str, str])\ -> str: """Generate and export CASA restore script. Args: context : pipeline context script_name : Name of the restore script products_dir : Name of the product directory oussid : OUS Status ID restore_task_name : Name of the restoredata task restore_task_args : Set of the parameters for the restoredata task. If an order of the parameter matters, it can be collections.OrderedDict. Returns: path of output CASA script file """ # Generate the file list # Get the output file name ps = context.project_structure script_file = os.path.join(context.report_dir, script_name) out_script_file = self.NameBuilder.casa_script(script_name, project_structure=ps, ousstatus_entity_id=oussid, output_dir=products_dir) # if ps is None or ps.ousstatus_entity_id == 'unknown': # script_file = os.path.join(context.report_dir, script_name) # out_script_file = os.path.join(products_dir, script_name) # else: # script_file = os.path.join(context.report_dir, script_name) # out_script_file = os.path.join(products_dir, oussid + '.' + script_name) LOG.info('Creating casa restore script %s' % script_file) # This is hardcoded. # tmpvislist = [] # ALMA TP default # ocorr_mode = 'ao' # for vis in vislist: # filename = os.path.basename(vis) # if filename.endswith('.ms'): # filename, filext = os.path.splitext(filename) # tmpvislist.append(filename) # task_string = " hsd_restoredata(vis=%s, session=%s, ocorr_mode='%s')" % (tmpvislist, session_list, # ocorr_mode) args_string = ', '.join(['{}={!r}'.format(k, v) for k, v in restore_task_args.items()]) task_string = " {}({})".format(restore_task_name, args_string) state_commands = [] for o in (context.project_summary, context.project_structure, context.project_performance_parameters): state_commands += ['context.set_state({!r}, {!r}, {!r})'.format( cls, name, value) for cls, name, value in project.get_state(o)] template = '''context = h_init() %s try: %s finally: h_save() ''' % ('\n'.join(state_commands), task_string) with open(script_file, 'w') as casa_restore_file: casa_restore_file.write(template) LOG.info('Copying casa restore script ' '%s to %s' % (script_file, out_script_file)) shutil.copy(script_file, out_script_file) return os.path.basename(out_script_file)