Source code for pipeline.infrastructure.executevlappr

"""
Execute the pipeline processing request.

Code first as module and convert to class if appropriate
Factor and document properly  when details worked out

Raises:
    exceptions.PipelineException
"""
from __future__ import annotations

import os
import sys
import traceback
from typing import TYPE_CHECKING

from . import casa_tools
from . import exceptions
from . import project
from . import utils
from .. import cli
from .executeppr import _getCommands, _getIntents, _getPerformanceParameters, _getProcessingIntents, _getPprObject, save_existing_context

if TYPE_CHECKING:
    from pipeline.extern.XmlObjectifier import XmlObject


[docs] def executeppr(pprXmlFile: str, importonly: bool = True, loglevel: str = 'info', plotlevel: str = 'summary', interactive: bool = True) -> None: """ Runs Pipeline Processing Request (PPR). Executes pipeline tasks based on instructions described in pprXmlFile. Args: pprXmlFile: A path to PPR file. importonly: Whether or not to indicate to stop processing after importing data. If True, execution of PPR stops after h*_importdata stage. The parameter has no effect if there is no h*_importdata stage in PPR. loglevel: A logging level. Available levels are, 'critical', 'error', 'warning', 'info', 'debug', 'todo', and 'trace'. plotlevel: A plot level. Available levels are, 'all', 'default', and 'summary' interactive: If True, print pipeline log to STDOUT. """ # save existing context to disk save_existing_context() # Useful mode parameters echo_to_screen = interactive workingDir = None try: # Decode the processing request casa_tools.post_to_log("Analyzing pipeline processing request ...", echo_to_screen=echo_to_screen) info, structure, relativePath, intentsDict, asdmList, procedureName, commandsList = \ _getFirstRequest(pprXmlFile) processing_intents = _getProcessingIntents(intentsDict, procedureName) # Set the directories if 'SCIPIPE_ROOTDIR' in os.environ: workingDir = os.path.join(os.path.expandvars('$SCIPIPE_ROOTDIR'), relativePath, 'working') rawDir = os.path.join(os.path.expandvars('$SCIPIPE_ROOTDIR'), relativePath, 'rawdata') else: # PIPE-2093: if $SCIPIPE_ROOTDIR doesn't exist, we likely run in a local dev/test environment. # Then we will override the typical production workingDir/rawDIR values that are traditionally # constructed from $SCIPIPE_ROOTDIR and the PPR <RelativePath> field. Note that we assume that # any executeppr call here happens inside the "working/" directory. workingDir = os.path.abspath(os.path.join('..', 'working')) rawDir = os.path.abspath(os.path.join('..', 'rawdata')) # Get the pipeline context context = cli.h_init(loglevel=loglevel, plotlevel=plotlevel, processing_intents=processing_intents) except Exception: casa_tools.post_to_log("Beginning pipeline run ...", echo_to_screen=echo_to_screen) casa_tools.post_to_log("For processing request: " + pprXmlFile, echo_to_screen=echo_to_screen) traceback.print_exc(file=sys.stdout) errstr = traceback.format_exc() casa_tools.post_to_log(errstr, echo_to_screen=echo_to_screen) casa_tools.post_to_log("Terminating procedure execution ...", echo_to_screen=echo_to_screen) errorfile = utils.write_errorexit_file(workingDir, 'errorexit', 'txt') return # Request decoded, starting run. casa_tools.post_to_log("Beginning pipeline run ...", echo_to_screen=echo_to_screen) casa_tools.post_to_log("For processing request: " + pprXmlFile, echo_to_screen=echo_to_screen) # Check for common error conditions. if relativePath == "": casa_tools.post_to_log(" Undefined relative data path", echo_to_screen=echo_to_screen) casa_tools.post_to_log("Terminating pipeline execution ...", echo_to_screen=echo_to_screen) errorfile = utils.write_errorexit_file(workingDir, 'errorexit', 'txt') return elif len(asdmList) < 1: casa_tools.post_to_log(" Empty ASDM list", echo_to_screen=echo_to_screen) casa_tools.post_to_log("Terminating pipeline execution ...", echo_to_screen=echo_to_screen) errorfile = utils.write_errorexit_file(workingDir, 'errorexit', 'txt') return elif len(commandsList) < 1: casa_tools.post_to_log(" Empty commands list", echo_to_screen=echo_to_screen) casa_tools.post_to_log("Terminating pipeline execution ...", echo_to_screen=echo_to_screen) errorfile = utils.write_errorexit_file(workingDir, 'errorexit', 'txt') return # List project summary information casa_tools.post_to_log("Project summary", echo_to_screen=echo_to_screen) for item in info: casa_tools.post_to_log(" " + item[1][0] + item[1][1], echo_to_screen=echo_to_screen) ds = dict(info) context.project_summary = project.ProjectSummary( proposal_code=ds['proposal_code'][1], proposal_title='unknown', piname='unknown', observatory=ds['observatory'][1], telescope=ds['telescope'][1]) # List project structure information casa_tools.post_to_log("Project structure", echo_to_screen=echo_to_screen) for item in structure: casa_tools.post_to_log(" " + item[1][0] + item[1][1], echo_to_screen=echo_to_screen) context.project_structure = project.ProjectStructure( ppr_file=pprXmlFile, recipe_name=procedureName) # Create performance parameters object context.project_performance_parameters = _getPerformanceParameters(intentsDict) # Print the relative path casa_tools.post_to_log("Directory structure", echo_to_screen=echo_to_screen) casa_tools.post_to_log(" Working directory: " + workingDir, echo_to_screen=echo_to_screen) casa_tools.post_to_log(" Raw data directory: " + rawDir, echo_to_screen=echo_to_screen) # Construct the ASDM list casa_tools.post_to_log("Number of ASDMs: " + str(len(asdmList)), echo_to_screen=echo_to_screen) files = [] sessions = [] defsession = 'session_1' for asdm in asdmList: session = defsession sessions.append(session) files.append(os.path.join(rawDir, asdm[1])) casa_tools.post_to_log(" Session: " + session + " ASDM: " + asdm[1], echo_to_screen=echo_to_screen) # Paths for all these ASDM should be the same # Add check for this ? # Beginning execution casa_tools.post_to_log("\nStarting procedure execution ...\n", echo_to_screen=echo_to_screen) casa_tools.post_to_log("Procedure name: " + procedureName + "\n", echo_to_screen=echo_to_screen) # Names of import tasks that need special treatment: import_tasks = ('h_importdata', 'hifa_importdata', 'hifv_importdata') restore_tasks = ('h_restoredata', 'hifv_restoredata') breakpoint_task = 'breakpoint' # Loop over the commands for command in commandsList: # Get task name and arguments lists. pipeline_task_name = command[0] task_args = command[1] casa_tools.set_log_origin(fromwhere=pipeline_task_name) # PIPE-2388: Skipping breakpoint command if pipeline_task_name == breakpoint_task: casa_tools.post_to_log("Found {}, not performing any action.".format(pipeline_task_name), echo_to_screen=echo_to_screen) continue # Execute the command casa_tools.post_to_log("Executing command ..." + pipeline_task_name, echo_to_screen=echo_to_screen) try: pipeline_task = cli.get_pipeline_task_with_name(pipeline_task_name) # List parameters for keyword, value in task_args.items(): casa_tools.post_to_log(" Parameter: " + keyword + " = " + repr(value), echo_to_screen=echo_to_screen) # For import/restore tasks, set vis and session explicitly (not inferred from context). if pipeline_task_name in import_tasks or pipeline_task_name in restore_tasks: task_args['vis'] = files task_args['session'] = sessions # If spectral mode is set to True, skip the Hanning task. spectral_mode = intentsDict.get('SPECTRAL_MODE', False) if spectral_mode and pipeline_task_name == 'hifv_hanning': casa_tools.post_to_log("SPECTRAL_MODE=True. Hanning smoothing will not be executed.") continue results = pipeline_task(**task_args) casa_tools.post_to_log('Results ' + str(results), echo_to_screen=echo_to_screen) if importonly and pipeline_task_name in import_tasks: casa_tools.post_to_log("Terminating execution after running " + pipeline_task_name, echo_to_screen=echo_to_screen) break except Exception: # Log message if an exception occurred that was not handled by # standardtask template (not turned into failed task result). casa_tools.post_to_log("Unhandled error in executevlappr while running pipeline task {}" "".format(pipeline_task_name), echo_to_screen=echo_to_screen) errstr = traceback.format_exc() casa_tools.post_to_log(errstr, echo_to_screen=echo_to_screen) errorfile = utils.write_errorexit_file(workingDir, 'errorexit', 'txt') break # Stop execution if result is a failed task result or a list # containing a failed task result. tracebacks = utils.get_tracebacks(results) if len(tracebacks) > 0: # Save the context cli.h_save() casa_tools.set_log_origin(fromwhere='') errorfile = utils.write_errorexit_file(workingDir, 'errorexit', 'txt') previous_tracebacks_as_string = "{}".format("\n".join([tb for tb in tracebacks])) raise exceptions.PipelineException(previous_tracebacks_as_string) # Save the context cli.h_save() casa_tools.post_to_log("Terminating procedure execution ...", echo_to_screen=echo_to_screen) casa_tools.set_log_origin(fromwhere='') return
# Return the intents list, the ASDM list, and the processing commands # for the first processing request. There should in general be only # one but the schema permits more. Generalize later if necessary. def _getFirstRequest(pprXmlFile: XmlObject) -> tuple[list, str, dict, list, list] | tuple[list, list, str, dict, list, str, list]: # Initialize info = [] relativePath = "" intentsDict = {} commandsList = [] asdmList = [] # Turn the XML file into an object pprObject = _getPprObject(pprXmlFile=pprXmlFile) # Count the processing requests. numRequests = _getNumRequests(pprObject=pprObject) if numRequests <= 0: casa_tools.post_to_log("Terminating execution: No valid processing requests") return info, relativePath, intentsDict, asdmList, commandsList elif numRequests > 1: casa_tools.post_to_log("Warning: More than one processing request") casa_tools.post_to_log('Number of processing requests: ', numRequests) # Get brief project summary info = _getProjectSummary(pprObject) # Get project structure. Set to empty list for VLA. structure = [] # Get the intents dictionary numIntents, intentsDict = _getIntents(pprObject=pprObject, requestId=0, numRequests=numRequests) casa_tools.post_to_log('Number of intents: {}'.format(numIntents)) casa_tools.post_to_log('Intents dictionary: {}'.format(intentsDict)) # Get the commands list procedureName, numCommands, commandsList = _getCommands(pprObject=pprObject, requestId=0, numRequests=numRequests) casa_tools.post_to_log('Number of commands: {}'.format(numCommands)) casa_tools.post_to_log('Commands list: {}'.format(commandsList)) # Count the scheduling block sets. Normally there should be only # one although the schema allows multiple sets. Check for this # condition and process only the first. numSbSets = _getNumSchedBlockSets(pprObject=pprObject, requestId=0, numRequests=numRequests) if numSbSets <= 0: casa_tools.post_to_log("Terminating execution: No valid scheduling block sets") return info, relativePath, intentsDict, asdmList, commandsList elif numSbSets > 1: casa_tools.post_to_log("Warning: More than one scheduling block set") casa_tools.post_to_log('Number of scheduling block sets: {}'.format(numSbSets)) # Get the ASDM list relativePath, numAsdms, asdmList = _getAsdmList(pprObject=pprObject, numSbSets=numSbSets, numRequests=numRequests) casa_tools.post_to_log('Relative path: {}'.format(relativePath)) casa_tools.post_to_log('Number of Asdms: {}'.format(numAsdms)) casa_tools.post_to_log('ASDM list: {}'.format(asdmList)) return info, structure, relativePath, intentsDict, asdmList, procedureName, commandsList # Given the pipeline processing request object print some project summary # information. Returns a list of tuples to preserve order (key, (prompt, value)) def _getProjectSummary(pprObject: 'XmlObject') -> list: ppr_summary = pprObject.SciPipeRequest.ProjectSummary summaryList = [] summaryList.append(('proposal_code', ('Proposal code: ', ppr_summary.ProposalCode.getValue()))) summaryList.append(('observatory', ('Observatory: ', ppr_summary.Observatory.getValue()))) summaryList.append(('telescope', ('Telescope: ', ppr_summary.Telescope.getValue()))) summaryList.append(('processing_site', ('Processsing site: ', ppr_summary.ProcessingSite.getValue()))) summaryList.append(('operator', ('Operator: ', ppr_summary.Operator.getValue()))) summaryList.append(('mode', ('Mode: ', ppr_summary.Mode.getValue()))) return summaryList # Given the pipeline processing request object return the number of processing # requests. For EVLA this should always be 1 but check. Assume a single scheduling block # per processing request. def _getNumRequests(pprObject: 'XmlObject') -> int: ppr_prequests = pprObject.SciPipeRequest.ProcessingRequests numRequests = 0 # Try single element / single scheduling block first. try: relative_path = ppr_prequests.ProcessingRequest.DataSet.RelativePath.getValue() numRequests = 1 return numRequests except Exception: pass # Next try multiple processing requests / single scheduling block search = 1 while search: try: relative_path = ppr_prequests.ProcessingRequest[numRequests].DataSet.RelativePath.getValue() numRequests = numRequests + 1 except Exception: search = 0 if numRequests > 0: return numRequests else: pass # Return the number of requests. return numRequests # Given the pipeline processing request object return the number of scheduling # block sets. For the EVLA there can be only one. def _getNumSchedBlockSets(pprObject: 'XmlObject', requestId: int, numRequests: int) -> int: if numRequests == 1: ppr_dset = pprObject.SciPipeRequest.ProcessingRequests.ProcessingRequest.DataSet else: ppr_dset = pprObject.SciPipeRequest.ProcessingRequests.ProcessingRequest[requestId].DataSet try: path = ppr_dset.RelativePath.getValue() numSchedBlockSets = 1 except Exception: numSchedBlockSets = 0 return numSchedBlockSets # Given the pipeline processing request object return a list of ASDMs # where each element in the list is a tuple consisting of the path # to the ASDM, the name of the ASDM, and the UID of the ASDM. def _getAsdmList(pprObject: XmlObject, numSbSets: int, numRequests: int) -> tuple[str, int, list]: if numRequests == 1: ppr_dset = pprObject.SciPipeRequest.ProcessingRequests.ProcessingRequest.DataSet if numSbSets == 1: ppr_dset = ppr_dset relativePath = ppr_dset.RelativePath.getValue() else: ppr_dset = ppr_dset relativePath = "" else: relativePath = "" asdmList = [] try: asdmName = ppr_dset.SdmIdentifier.getValue() asdmUid = asdmName asdmList.append((relativePath, asdmName, asdmUid)) numAsdms = 1 except Exception: numAsdms = 0 return relativePath, numAsdms, asdmList