Source code for pipeline.h.tasks.restoredata.restoredata

"""
The restore data module provides a class for reimporting, reflagging, and
recalibrating a subset of the ASDMs belonging to a member OUS, using pipeline
flagging and calibration data products.

The basic restore data module assumes that the ASDMs, flagging, and calibration
data products are on disk in the rawdata directory in the format produced by
the ExportData class.

This class assumes that the required data products have been
    - downloaded from the archive along with the ASDMs (not yet possible)
    - are sitting on disk in a form which is compatible with what is
      produced by ExportData
"""
import os
import re
import sys
import shutil
import tarfile
import tempfile

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry
from pipeline.infrastructure import utils
from .. import applycal
from .. import importdata
from ..common import manifest

from pipeline.extern.almarenorm_pl2023 import alma_renorm

LOG = infrastructure.get_logger(__name__)


class RestoreDataInputs(vdp.StandardInputs):
    """Manages the inputs for the RestoreData task.

    Attributes:
        context: The pipeline Context state object holding all pipeline state.
        products_dir: The directory containing the archived pipeline flagging and calibration
            data products. Data products will be unpacked from this directory into rawdata_dir.
            Support for this parameter is not yet implemented.
        rawdata_dir: The directory containing the raw data ASDM(s) and the pipeline
            flagging and calibration data products.
        output_dir: The working directory where the restored data will be written.
        session: A string or list of strings containing the sessions(s) one for each vis.
        vis: A string or list of strings containing the ASDM(s) to be restored.
    """

    asis = vdp.VisDependentProperty(default='')
    bdfflags = vdp.VisDependentProperty(default=True)
    copytoraw = vdp.VisDependentProperty(default=True)
    lazy = vdp.VisDependentProperty(default=False)
    ocorr_mode = vdp.VisDependentProperty(default='ca')

    @vdp.VisDependentProperty
    def products_dir(self):
        if self.context.products_dir is None:
            return os.path.abspath('../products')
        return self.context.products_dir

    @vdp.VisDependentProperty
    def rawdata_dir(self):
        return os.path.abspath('../rawdata')

    @vdp.VisDependentProperty
    def session(self):
        return []

    # docstring and type hints: supplements h_restoredata
    def __init__(self, context, copytoraw=None, products_dir=None, rawdata_dir=None, output_dir=None, session=None,
                 vis=None, bdfflags=None, lazy=None, asis=None, ocorr_mode=None):
        """
        Initialise the Inputs, initialising any property values to those given here.

        Args:
            context: The pipeline Context state object.
            copytoraw: Copy calibration and flagging tables from products_dir to rawdata_dir.
                Defaults to True. Example: ``copytoraw=False``
            products_dir: Path to the data products directory for copying calibration products.
                Only effective when copytoraw=True. When copytoraw=False, calibration products in
                rawdata_dir will be used. Example: ``products_dir='/path/to/my/products'``
            rawdata_dir: Path to the rawdata subdirectory. Example: ``rawdata_dir='/path/to/my/rawdata'``
            output_dir: The working directory for the restored data.
            session: List of sessions, one per visibility file. Example: ``session=['session_3']``
            vis: List of raw visibility data files to be restored. Assumed to be in the directory
                specified by rawdata_dir. Example: ``vis=['uid___A002_X30a93d_X43e']``
            bdfflags: Set the BDF flags. Defaults to True. Example: ``bdfflags=False``
            lazy: Use the lazy filler option. Defaults to False. Example: ``lazy=True``
            asis: Creates verbatim copies of the ASDM tables in the output MS. Value must be a
                space-separated list of table names. Example: ``asis='Source Receiver'``
            ocorr_mode: Set correlation import mode. Defaults to 'ca'. Example: ``ocorr_mode='ca'``
        """
        super().__init__()

        self.context = context
        self.output_dir = output_dir
        self.vis = vis
        self.session = session

        self.products_dir = products_dir
        self.rawdata_dir = rawdata_dir
        self.copytoraw = copytoraw

        self.bdfflags = bdfflags
        self.lazy = lazy
        self.asis = asis
        self.ocorr_mode = ocorr_mode


class RestoreDataResults(basetask.Results):
    def __init__(self, importdata_results=None, applycal_results=None, flagging_summaries=None,
                 casa_version_orig=None, pipeline_version_orig=None, orig_mpi_servers=0, renorm_applied=False):
        """
        Initialise the results objects.
        """
        super().__init__()
        self.importdata_results = importdata_results
        self.applycal_results = applycal_results
        self.mses = []
        self.flagging_summaries = flagging_summaries
        self.casa_version_orig = casa_version_orig
        self.pipeline_version_orig = pipeline_version_orig
        self.orig_mpi_servers = orig_mpi_servers
        self.renorm_applied = renorm_applied

    def merge_with_context(self, context):
        if self.importdata_results:
            for result in self.importdata_results:
                result.merge_with_context(context)
        for ms in context.observing_run.measurement_sets:
            self.mses.append(ms)
        if self.applycal_results:
            if isinstance(self.applycal_results, list):
                for result in self.applycal_results:
                    result.merge_with_context(context)
            else:
                self.applycal_results.merge_with_context(context)

    def __repr__(self):
        return 'RestoreDataResults:\n\t{0}'.format(
                '\n\t'.join([ms.name for ms in self.mses]))


[docs] @task_registry.set_equivalent_casa_task('h_restoredata') class RestoreData(basetask.StandardTaskTemplate): """ RestoreData is the base class for restoring flagged and calibrated data produced during a previous pipeline run and archived on disk. - Imports the selected ASDMs from rawdata - Imports the flagversions for the selected ASDMs from ../rawdata - Imports the calibration data for the selected ASDMs from ../rawdata - Restores the final set of pipeline flags - Restores the final calibration state - Applies the calibrations """ # link the accompanying inputs to this task Inputs = RestoreDataInputs # Override the default behavior for multi-vis tasks # Does this interfere with multi-vis behavior of # called tasks. is_multi_vis_task = True
[docs] def prepare(self): # Create a local alias for inputs, so we're not saying # 'self.inputs' everywhere inputs = self.inputs # Force inputs.vis and inputs.session to be a list. sessionlist = inputs.session if isinstance(sessionlist, str): sessionlist = [sessionlist, ] tmpvislist = inputs.vis if isinstance(tmpvislist, str): tmpvislist = [tmpvislist, ] vislist = [] for vis in tmpvislist: if os.path.dirname(vis) == '': vislist.append(os.path.join(inputs.rawdata_dir, vis)) else: vislist.append(vis) # Download ASDMs from the archive or products_dir to rawdata_dir. # TBD: Currently assumed done somehow # Copy the required calibration products from "someplace" on disk # (default ../products) to ../rawdata. The pipeline manifest file # if present is used to determine which files to copy. Otherwise # a file naming scheme is used. The latter is deprecated as it # requires the exportdata / restoredata tasks to be synchronized # but it is maintained for testing purposes. if inputs.copytoraw: self._do_copy_manifest_toraw('*pipeline_manifest.xml') pipemanifest = self._do_get_manifest('*pipeline_manifest.xml', '*cal*pipeline_manifest.xml') self._do_copytoraw(pipemanifest) else: pipemanifest = self._do_get_manifest('*pipeline_manifest.xml', '*cal*pipeline_manifest.xml') # Convert ASDMS assumed to be on disk in rawdata_dir. After this step # has been completed the MS and MS.flagversions directories will exist # and MS,flagversions will contain a copy of the original MS flags, # Flags.Original. # TBD: Add error handling import_results = self._do_importasdm(sessionlist=sessionlist, vislist=vislist) # Restore final MS.flagversions and flags flag_version_name = 'Pipeline_Final' self._do_restore_flags(pipemanifest, flag_version_name=flag_version_name) # Get the session list and the visibility files associated with # each session. session_names, session_vislists = self._get_sessions() # Restore calibration tables self._do_restore_caltables(pipemanifest, session_names=session_names, session_vislists=session_vislists) # Import calibration apply lists self._do_restore_calstate(pipemanifest) # Apply the calibrations. apply_results = self._do_applycal() # Get a summary of the flagging state. flagging_summaries = self._get_flagging_summaries(session_names, session_vislists) # Extract CASA version and pipeline version for previous run from # pipeline manifest. casa_version, pipeline_version, num_mpi = self._extract_casa_pipeline_version(pipemanifest) # If necessary, renormalize ALMA IF data; returns bool declaring # whether the renormalization was run. renorm_applied = self._do_renormalize(pipemanifest, session_names=session_names, session_vislists=session_vislists) # Return the results object, which will be used for the weblog return RestoreDataResults(import_results, apply_results, flagging_summaries, casa_version, pipeline_version, num_mpi, renorm_applied)
[docs] def analyse(self, results): return results
def _do_renormalize(self, pipemanifest, session_names=None, session_vislists=None): # read the manifest and look for a renorm element(s). there should be one per EB # if it's there, do the renorm steps using the stored params, but do not # create the plots applied = False # Loop over sessions for index, session in enumerate(session_names): # Get the visibility list for that session. vislist = session_vislists[index] for vis in vislist: # for each non-target asdm/vis in the manifest structure params = pipemanifest.get_renorm(vis) if params: # Convert input parameters with ast (string to bool, dict, etc). kwargs = {key: utils.string_to_val(val) if val else val for key, val in params.items()} # PIPE-1687: restrict input arguments to those needed by # renorm interface function. # PIPE-2049: if some parameters are absent from the manifest, # use default values adopted from extern.heuristics.ACreNorm.renormalize, # except for atm_auto_exclude, which is defaulted to False. kwargs = { 'vis': vis, 'spw': [int(x) for x in kwargs.get('spw', '').split(',') if x], 'apply': kwargs.get('apply', False), 'threshold': kwargs.get('threshold', None), 'excludechan': kwargs.get('excludechan', {}), 'correct_atm': kwargs.get('correctATM', False), 'atm_auto_exclude': kwargs.get('atm_auto_exclude', False), 'bwthreshspw': kwargs.get('bwthreshspw', {}), } try: LOG.info(f'Renormalizing {vis} with hifa_renorm {kwargs}') _, _, _, _, _, renorm_applied, _, _ = alma_renorm(**kwargs) if 'apply' in kwargs and kwargs['apply'] and renorm_applied: applied = True else: LOG.error(f'Failed application of renormalization for {vis} {params}') except Exception as e: LOG.error(f'Failure in running renormalization heuristic: {e}') else: LOG.info(f'Not calling hifa_renorm for {vis} - no renorm call in manifest.') return applied def _do_copy_manifest_toraw(self, template): """ Get the pipeline manifest """ inputs = self.inputs # Download the pipeline manifest file from the archive or # products_dir to rawdata_dir manifest_files = utils.glob_ordered(os.path.join(inputs.products_dir, template)) for manifestfile in manifest_files: LOG.info('Copying %s to %s' % (manifestfile, inputs.rawdata_dir)) shutil.copy(manifestfile, os.path.join(inputs.rawdata_dir, os.path.basename(manifestfile))) def _do_get_manifest(self, template1, template2): """ Get the pipeline manifest object """ inputs = self.inputs # Get the list of files in the rawdata directory # First find all the manifest files of any kind # If there is more than one file in that list then try the more restrictive template manifestfiles = utils.glob_ordered(os.path.join(inputs.rawdata_dir, template1)) if len(manifestfiles) > 1: manifestfiles2 = utils.glob_ordered(os.path.join(inputs.rawdata_dir, template2)) if len(manifestfiles2) > 0 and len(manifestfiles2) < len(manifestfiles): manifestfiles = manifestfiles2 # Parse manifest file if it exists. if len(manifestfiles) > 0: # Parse manifest file # There should be only one of these so pick one pipemanifest = manifest.PipelineManifest('') pipemanifest.import_xml(manifestfiles[0]) else: pipemanifest = None return pipemanifest def _do_copytoraw(self, pipemanifest): inputs = self.inputs ouss = pipemanifest.get_ous() # Download flag versions # Download from the archive or products_dir to rawdata_dir. if pipemanifest is not None: inflagfiles = [os.path.join(inputs.products_dir, flagfile) for flagfile in pipemanifest.get_final_flagversions(ouss).values()] else: inflagfiles = utils.glob_ordered(os.path.join(inputs.products_dir, '*.flagversions.tgz')) for flagfile in inflagfiles: LOG.info('Copying %s to %s' % (flagfile, inputs.rawdata_dir)) shutil.copy(flagfile, os.path.join(inputs.rawdata_dir, os.path.basename(flagfile))) # Download calibration tables # Download calibration files from the archive or products_dir to if pipemanifest is not None: incaltables = [os.path.join(inputs.products_dir, caltable) for caltable in pipemanifest.get_caltables(ouss).values()] else: incaltables = utils.glob_ordered(os.path.join(inputs.products_dir, '*.caltables.tgz')) for caltable in incaltables: LOG.info('Copying %s to %s' % (caltable, inputs.rawdata_dir)) shutil.copy(caltable, os.path.join(inputs.rawdata_dir, os.path.basename(caltable))) # Download calibration apply lists # Download from the archive or products_dir to rawdata_dir. # TBD: Currently assumed done somehow if pipemanifest is not None: inapplycals = [os.path.join(inputs.products_dir, applycals) for applycals in pipemanifest.get_applycals(ouss).values()] else: inapplycals = utils.glob_ordered(os.path.join(inputs.products_dir, '*.calapply.txt')) for calapply_list in inapplycals: LOG.info('Copying %s to %s' % (calapply_list, inputs.rawdata_dir)) shutil.copy(calapply_list, os.path.join(inputs.rawdata_dir, os.path.basename(calapply_list))) def _do_importasdm(self, sessionlist, vislist): inputs = self.inputs # The asis is temporary until we get the EVLA / ALMA factoring # figured out. container = vdp.InputsContainer(importdata.ImportData, inputs.context, vis=vislist, session=sessionlist, save_flagonline=False, lazy=inputs.lazy, bdfflags=inputs.bdfflags, asis=inputs.asis, ocorr_mode=inputs.ocorr_mode, createmms='false') importdata_task = importdata.ImportData(container) return self._executor.execute(importdata_task, merge=True) def _do_restore_flags(self, pipemanifest, flag_version_name='Pipeline_Final'): inputs = self.inputs flagversionlist = [] if pipemanifest is not None: ouss = pipemanifest.get_ous() else: ouss = None # Loop over MS list in working directory for ms in inputs.context.observing_run.measurement_sets: # Remove imported MS.flagversions from working directory flagversion = ms.basename + '.flagversions' flagversionpath = os.path.join(inputs.output_dir, flagversion) if os.path.exists(flagversionpath): LOG.info('Removing default flagversion for %s' % ms.basename) shutil.rmtree(flagversionpath) # Untar MS.flagversions file in rawdata_dir to output_dir if ouss is not None: tarfilename = os.path.join(inputs.rawdata_dir, pipemanifest.get_final_flagversions(ouss)[ms.basename]) else: tarfilename = os.path.join(inputs.rawdata_dir, ms.basename + '.flagversions.tgz') LOG.info('Extracting %s' % flagversion) LOG.info(' From %s' % tarfilename) LOG.info(' Into %s' % inputs.output_dir) with tarfile.open(tarfilename, 'r:gz') as tar: tar.extractall(path=inputs.output_dir, filter='fully_trusted') # Restore final flags version using flagmanager LOG.info('Restoring final flags for %s from flag version %s' % (ms.basename, flag_version_name)) task = casa_tasks.flagmanager(vis=ms.name, mode='restore', versionname=flag_version_name) try: self._executor.execute(task) except Exception: LOG.error("Application of final flags failed for %s" % ms.basename) raise flagversionlist.append(flagversionpath) return flagversionlist def _do_restore_calstate(self, pipemanifest): inputs = self.inputs if pipemanifest is not None: ouss = pipemanifest.get_ous() else: ouss = None # Loop over MS list in working directory append = False for ms in inputs.context.observing_run.measurement_sets: if ouss is not None: applyfile_name = os.path.join(inputs.rawdata_dir, pipemanifest.get_applycals(ouss)[ms.basename]) else: applyfile_name = os.path.join(inputs.rawdata_dir, ms.basename + '.calapply.txt') LOG.info('Restoring calibration state for %s from %s' '' % (ms.basename, applyfile_name)) # Write converted calstate to a temporary file and use this # for the import. the temporary file will automatically be # deleted once out of scope with tempfile.NamedTemporaryFile() as tmpfile: LOG.trace('Writing converted calstate to %s' '' % tmpfile.name) converted = self._convert_calstate_paths(applyfile_name) tmpfile.write(converted.encode(sys.stdout.encoding)) tmpfile.flush() inputs.context.callibrary.import_state(tmpfile.name, append=append) append = True def _convert_calstate_paths(self, applyfile): """ Convert paths in the exported calstate to point to the new output directory. Returns the converted commands as a list of strings """ # regex to match unix paths unix_path = re.compile(r'((?:(\.{1,2})?\/[\w\.\-]+)+)', re.IGNORECASE | re.DOTALL) # define a function that replaces directory names with our new output # directory def repfn(matchobj): basename = os.path.basename(matchobj.group(0)) return os.path.join(self.inputs.output_dir, basename) # search-and-replace directory names in the exported calstate file with open(applyfile, 'r') as f: return unix_path.sub(repfn, f.read()) def _do_restore_caltables(self, pipemanifest, session_names=None, session_vislists=None): inputs = self.inputs if pipemanifest is not None: ouss = pipemanifest.get_ous() else: ouss = None # Determine the OUS uid ps = inputs.context.project_structure if ps is None or ps.ousstatus_entity_id == 'unknown': ousid = '' else: ousid = ps.ousstatus_entity_id.translate(str.maketrans(':/', '__')) + '.' # Loop over sessions for index, session in enumerate(session_names): # Get the visibility list for that session. vislist = session_vislists[index] # Open the tarfile and get the names if ouss is not None: try: tarfilename = os.path.join(inputs.rawdata_dir, pipemanifest.get_caltables(ouss)[session]) except Exception: tarfilename = os.path.join(inputs.rawdata_dir, pipemanifest.get_caltables(ouss)['default']) elif ousid == '': tarfilename = utils.glob_ordered(os.path.join(inputs.rawdata_dir, '*' + session + '.caltables.tgz'))[0] else: tarfilename = os.path.join(inputs.rawdata_dir, ousid + session + '.caltables.tgz') with tarfile.open(tarfilename, 'r:gz') as tar: tarmembers = tar.getmembers() # Retrieve any caltable associated with either session name or # the measurement sets associated with this session. for vis in vislist + [session]: LOG.info('Restoring caltables for %s from %s' '' % (os.path.basename(vis), tarfilename)) extractlist = [] for member in tarmembers: if member.name.startswith(os.path.basename(vis)): extractlist.append(member) # it is uncertain whether or not slash (/) exists at the end if member.name.endswith('.tbl/') or member.name.endswith('.tbl'): LOG.info(' Extracting caltable %s' % member.name) if len(extractlist) == len(tarmembers): tar.extractall(path=inputs.output_dir, filter='fully_trusted') else: tar.extractall(path=inputs.output_dir, members=extractlist, filter='fully_trusted') def _do_applycal(self): container = vdp.InputsContainer(applycal.SerialApplycal, self.inputs.context) applycal_task = applycal.SerialApplycal(container) return self._executor.execute(applycal_task, merge=True) def _get_sessions(self, sessions=None, vis=None): """ Return a list of sessions where each element of the list contains the vis files associated with that session. If sessions is undefined the context is searched for session information """ if sessions is None: sessions = [] if vis is None: vis = [] inputs = self.inputs all_mses = inputs.context.observing_run.measurement_sets # Get the MS list from the context by default. if len(vis) == 0: wkvis = [ms.name for ms in all_mses] else: wkvis = vis # If the input session list is empty determine the sessions from # the context. if len(sessions) == 0: wksessions = [ms.session for ms in all_mses] else: wksessions = sessions # Determine the number of unique sessions. session_seqno = 0 session_dict = {} for i in range(len(wksessions)): if wksessions[i] not in session_dict: session_dict[wksessions[i]] = session_seqno session_seqno = session_seqno + 1 # Initialize the output session names and visibility file lists session_names = [] session_vis_list = [] for key, _ in sorted(session_dict.items(), key=lambda k_v: (k_v[1], k_v[0])): session_names.append(key) session_vis_list.append([]) # Assign the visibility files to the correct session for j in range(len(wkvis)): # Match the session names if possible if j < len(wksessions): for i in range(len(session_names)): if wksessions[j] == session_names[i]: session_vis_list[i].append(wkvis[j]) # Assign to the last session else: session_vis_list[len(session_names) - 1].append(wkvis[j]) # Log the sessions for i in range(len(session_vis_list)): LOG.info('Visibility list for session %s is %s' % (session_names[i], session_vis_list[i])) return session_names, session_vis_list def _get_flagging_summaries(self, session_names, session_vislists): # Initialize summaries dictionary. summaries = {} # Extract summaries for each session. for ind, session_name in enumerate(session_names): summaries[session_name] = {} # Extract summary for each vis. for vis in session_vislists[ind]: vis_name = os.path.basename(vis) LOG.info("Creating flagging summary for session: {}, vis: {}".format(session_name, vis_name)) # Get CASA intent corresponding to 'TARGET'. ms = self.inputs.context.observing_run.get_ms(name=vis) casa_intent = utils.to_CASA_intent(ms, 'TARGET') # Create and execute flagdata summary job, store result in dictionary. job = casa_tasks.flagdata(vis=vis_name, mode='summary', fieldcnt=True, intent=casa_intent) summarydict = self._executor.execute(job) summaries[session_name][vis_name] = summarydict return summaries @staticmethod def _extract_casa_pipeline_version(pipemanifest): if pipemanifest is not None: ouss = pipemanifest.get_ous() else: ouss = None if ouss is not None: casa_version = pipemanifest.get_casa_version(ouss) pipeline_version = pipemanifest.get_pipeline_version(ouss) total_mpi_servers = sum([int(node.get('num_mpi_servers')) for node in pipemanifest.get_execution_nodes(ouss)]) else: casa_version = None pipeline_version = None total_mpi_servers = None return casa_version, pipeline_version, total_mpi_servers