Source code for pipeline.infrastructure.executeppr

"""
Execute the pipeline processing request.

Code first as module and convert to class if appropriate
Factor and document properly  when details worked out

Raises:
    exceptions.PipelineException
"""
from __future__ import annotations

import os
import shutil
import sys
import traceback
from typing import TYPE_CHECKING

from ..extern import XmlObjectifier

from . import casa_tools
from . import exceptions
from . import filenamer
from . import project
from . import utils
from .. import cli

if TYPE_CHECKING:
    from . launcher import Context

# Dictionary with known processing intents and their possible recipe
# names. The latter is fragile, but initially necessary until the PPR
# contains the actual intents.
KNOWN_PROCESSING_INTENTS = {
    'INTERFEROMETRY_FULL_POL_CUBE_IMAGING': ['hifa_polcalimage', 'hifa_polimage'],
    'INTERFEROMETRY_HETEROGENEOUS_IMAGING': []
}


[docs] def executeppr(pprXmlFile: str, importonly: bool = True, breakpoint: str = 'breakpoint', bpaction: str = 'ignore', loglevel: str = 'info', plotlevel: str = 'default', interactive: bool = True) -> None: """ Runs Pipeline Processing Request (PPR). Executes pipeline tasks based on instructions described in pprXmlFile. Args: pprXmlFile: A path to PPR file. importonly: Whether or not to indicate to stop processing after importing data. If True, execution of PPR stops after h*_importdata stage. The parameter has no effect if there is no h*_importdata stage in PPR. breakpoint: A name of command that should be considered as a break point. bpaction: An action to be taken at the breakpoint. Available actions are, 'ignore': ignores breakpoint in pprXmlFile. 'break': stop execution at the breakpoint in pprXmlFile. 'resume': resume the last context and restart processing after the breakpoint in pprXmlFile. loglevel: A logging level. Available levels are, 'critical', 'error', 'warning', 'info', 'debug', 'todo', and 'trace'. plotlevel: A plot level. Available levels are, 'all', 'default', and 'summary' interactive: If True, print pipeline log to STDOUT. Examples: Only import EBs. >>> executeppr('PPR_uid___A001_X14c3_X1dd.xml') Full execution of PPR. >>> executeppr('PPR_uid___A001_X14c3_X1dd.xml', importonly=False) Run pipeline tasks up to the 'breakpoint' in PPR and save context. >>> executeppr('PPR_uid___A001_X14c3_X1dd.xml', importonly=False, bpaction='break') Resume execution from the 'breakpoint' in PPR. >>> executeppr('PPR_uid___A001_X14c3_X1dd.xml', importonly=False, bpaction='resume') """ # save existing context to disk save_existing_context() # Useful mode parameters echo_to_screen = interactive workingDir = None try: # Decode the processing request casa_tools.post_to_log("Analyzing pipeline processing request ...", echo_to_screen=echo_to_screen) info, structure, relativePath, intentsDict, asdmList, procedureName, commandsList = \ _getFirstRequest(pprXmlFile) processing_intents = _getProcessingIntents(intentsDict, procedureName) # Set the directories if 'SCIPIPE_ROOTDIR' in os.environ: workingDir = os.path.join(os.path.expandvars('$SCIPIPE_ROOTDIR'), relativePath, 'working') rawDir = os.path.join(os.path.expandvars('$SCIPIPE_ROOTDIR'), relativePath, 'rawdata') else: # PIPE-2093: if $SCIPIPE_ROOTDIR doesn't exist, we likely run in a local dev/test environment. # Then we will override the typical production workingDir/rawDIR values that are traditionally # constructed from $SCIPIPE_ROOTDIR and the PPR <RelativePath> field. Note that we assume that # any executeppr call here happens inside the "working/" directory. workingDir = os.path.abspath(os.path.join('..', 'working')) rawDir = os.path.abspath(os.path.join('..', 'rawdata')) # Check for the breakpoint bpset = False if breakpoint != '': for command in commandsList: if command[0] == breakpoint: casa_tools.post_to_log(" Found break point: " + breakpoint, echo_to_screen=echo_to_screen) casa_tools.post_to_log(" Break point action is: " + bpaction, echo_to_screen=echo_to_screen) bpset = True break # Get the pipeline context # Resumes from the last context. Consider adding name if bpset and bpaction == 'resume': context = cli.h_resume(filename='last') context.processing_intents.update(processing_intents) casa_tools.post_to_log(" Resuming from last context", echo_to_screen=echo_to_screen) else: context = cli.h_init(loglevel=loglevel, plotlevel=plotlevel, processing_intents=processing_intents) casa_tools.post_to_log(" Creating new pipeline context", echo_to_screen=echo_to_screen) except Exception: casa_tools.post_to_log("Beginning pipeline run ...", echo_to_screen=echo_to_screen) casa_tools.post_to_log("For processing request: " + pprXmlFile, echo_to_screen=echo_to_screen) traceback.print_exc(file=sys.stdout) errstr = traceback.format_exc() casa_tools.post_to_log(errstr, echo_to_screen=echo_to_screen) casa_tools.post_to_log("Terminating procedure execution ...", echo_to_screen=echo_to_screen) errorfile = utils.write_errorexit_file(workingDir, 'errorexit', 'txt') return # Request decoded, starting run. casa_tools.post_to_log("Beginning pipeline run ...", echo_to_screen=echo_to_screen) casa_tools.post_to_log("For processing request: " + pprXmlFile, echo_to_screen=echo_to_screen) # Check for common error conditions. if relativePath == "": casa_tools.post_to_log(" Undefined relative data path", echo_to_screen=echo_to_screen) casa_tools.post_to_log("Terminating pipeline execution ...", echo_to_screen=echo_to_screen) errorfile = utils.write_errorexit_file(workingDir, 'errorexit', 'txt') return elif len(asdmList) < 1: casa_tools.post_to_log(" Empty ASDM list", echo_to_screen=echo_to_screen) casa_tools.post_to_log("Terminating pipeline execution ...", echo_to_screen=echo_to_screen) errorfile = utils.write_errorexit_file(workingDir, 'errorexit', 'txt') return elif len(commandsList) < 1: casa_tools.post_to_log(" Empty commands list", echo_to_screen=echo_to_screen) casa_tools.post_to_log("Terminating pipeline execution ...", echo_to_screen=echo_to_screen) errorfile = utils.write_errorexit_file(workingDir, 'errorexit', 'txt') return # List project summary information casa_tools.post_to_log("Project summary", echo_to_screen=echo_to_screen) for item in info: casa_tools.post_to_log(" " + item[1][0] + item[1][1], echo_to_screen=echo_to_screen) ds = dict(info) context.project_summary = project.ProjectSummary( proposal_code=ds['proposal_code'][1], proposal_title='unknown', piname='unknown', observatory=ds['observatory'][1], telescope=ds['telescope'][1]) # List project structure information casa_tools.post_to_log("Project structure", echo_to_screen=echo_to_screen) for item in structure: casa_tools.post_to_log(" " + item[1][0] + item[1][1], echo_to_screen=echo_to_screen) ds = dict(structure) context.project_structure = project.ProjectStructure( ous_entity_id=ds['ous_entity_id'][1], ous_part_id=ds['ous_part_id'][1], ous_title=ds['ous_title'][1], ous_type=ds['ous_type'][1], ps_entity_id=ds['ps_entity_id'][1], ousstatus_entity_id=ds['ousstatus_entity_id'][1], ppr_file=pprXmlFile, recipe_name=procedureName) # Create performance parameters object context.project_performance_parameters = _getPerformanceParameters(intentsDict) # Get the session info from the intents dictionary if len(intentsDict) > 0: sessionsDict = _getSessions(intentsDict) else: sessionsDict = {} # Print the relative path casa_tools.post_to_log("Directory structure", echo_to_screen=echo_to_screen) casa_tools.post_to_log(" Working directory: " + workingDir, echo_to_screen=echo_to_screen) casa_tools.post_to_log(" Raw data directory: " + rawDir, echo_to_screen=echo_to_screen) # Construct the ASDM list casa_tools.post_to_log("Number of ASDMs: " + str(len(asdmList)), echo_to_screen=echo_to_screen) files = [] sessions = [] defsession = 'session_1' for asdm in asdmList: session = defsession for key, value in sessionsDict.items(): if filenamer.sanitize_for_ms(asdm[1]) in value: session = key.lower() break sessions.append(session) files.append(os.path.join(rawDir, asdm[1])) casa_tools.post_to_log(" Session: " + session + " ASDM: " + asdm[1], echo_to_screen=echo_to_screen) # Paths for all these ASDM should be the same # Add check for this ? # Beginning execution casa_tools.post_to_log("\nStarting procedure execution ...\n", echo_to_screen=echo_to_screen) casa_tools.post_to_log("Procedure name: " + procedureName + "\n", echo_to_screen=echo_to_screen) # Names of import tasks that need special treatment: import_tasks = ('h_importdata', 'hifa_importdata', 'hifv_importdata', 'hsd_importdata', 'hsdn_importdata') restore_tasks = ('h_restoredata', 'hifa_restoredata', 'hifv_restoredata', 'hsd_restoredata') restore_tasks_no_session = ('hsdn_restoredata',) # Loop over the commands errstr = '' foundbp = False tracebacks = [] results = None for command in commandsList: # Get task name and arguments lists. pipeline_task_name = command[0] task_args = command[1] casa_tools.set_log_origin(fromwhere=pipeline_task_name) # Handle break point if one is set if bpset: # Found the break point # Set the found flag # Ignore it or # Break the loop or # Resume execution if pipeline_task_name == breakpoint: foundbp = True if bpaction == 'ignore': casa_tools.post_to_log("Ignoring breakpoint " + pipeline_task_name, echo_to_screen=echo_to_screen) continue elif bpaction == 'break': casa_tools.post_to_log("Terminating execution at breakpoint " + pipeline_task_name, echo_to_screen=echo_to_screen) break elif bpaction == 'resume': casa_tools.post_to_log("Resuming execution after breakpoint " + pipeline_task_name, echo_to_screen=echo_to_screen) continue # Not the break point so check the resume case elif not foundbp and bpaction == 'resume': casa_tools.post_to_log("Skipping task " + pipeline_task_name, echo_to_screen=echo_to_screen) continue # Execute the command casa_tools.post_to_log("Executing command ..." + pipeline_task_name, echo_to_screen=echo_to_screen) try: pipeline_task = cli.get_pipeline_task_with_name(pipeline_task_name) # List parameters for keyword, value in task_args.items(): casa_tools.post_to_log(" Parameter: " + keyword + " = " + repr(value), echo_to_screen=echo_to_screen) # For import/restore tasks, set vis and session explicitly (not inferred from context). if pipeline_task_name in import_tasks or pipeline_task_name in restore_tasks: task_args['vis'] = files task_args['session'] = sessions elif pipeline_task_name in restore_tasks_no_session: task_args['vis'] = files results = pipeline_task(**task_args) casa_tools.post_to_log('Results ' + str(results), echo_to_screen=echo_to_screen) if importonly and pipeline_task_name in import_tasks: casa_tools.post_to_log("Terminating execution after running " + pipeline_task_name, echo_to_screen=echo_to_screen) break except Exception: # Log message if an exception occurred that was not handled by # standardtask template (not turned into failed task result). casa_tools.post_to_log("Unhandled error in executeppr while running pipeline task {}" "".format(pipeline_task_name), echo_to_screen=echo_to_screen) errstr = traceback.format_exc() casa_tools.post_to_log(errstr, echo_to_screen=echo_to_screen) # Check whether any exceptions were raised either during the task # (shown as traceback in task results) or after the task during results # acceptance. if results: tracebacks.extend(utils.get_tracebacks(results)) if errstr: tracebacks.append(errstr) # If we have a traceback from an exception, then create local error # exit file, and export both error file and weblog to the products # directory. if len(tracebacks) > 0: errorfile = utils.write_errorexit_file(workingDir, 'errorexit', 'txt') export_on_exception(context, errorfile) # Save the context cli.h_save() casa_tools.post_to_log("Terminating procedure execution ...", echo_to_screen=echo_to_screen) casa_tools.set_log_origin(fromwhere='') previous_tracebacks_as_string = "{}".format("\n".join([tb for tb in tracebacks])) raise exceptions.PipelineException(previous_tracebacks_as_string) # Save the context cli.h_save() casa_tools.post_to_log("Terminating procedure execution ...", echo_to_screen=echo_to_screen) casa_tools.set_log_origin(fromwhere='') return
[docs] def save_existing_context() -> None: """Save existing context to disk. Save existing global pipeline context to avoid being overwritten by newly created one. """ try: cli.h_save() except Exception: # Since h_save raises exception if no context is registered, # just continue processing in this case pass else: # Last-saved context in the current working directory should be # the one saved by the h_save task above. context_files = sorted( (f for f in os.listdir() if f.endswith('.context')), key=lambda f: os.stat(f).st_mtime ) assert len(context_files) > 0 context_file = context_files[-1] casa_tools.post_to_log(f'Saved existing context {context_file} to disk.')
[docs] def export_on_exception(context: Context, errorfile: str) -> None: # Define path for exporting output products, and ensure path exists. products_dir = utils.get_products_dir(context) utils.ensure_products_dir_exists(products_dir) # Copy error file to products. if os.path.exists(errorfile): shutil.copyfile(errorfile, os.path.join(products_dir, os.path.basename(errorfile))) # Attempt to export weblog. try: utils.export_weblog_as_tar(context, products_dir, filenamer.PipelineProductNameBuilder) except: casa_tools.post_to_log("Unable to export weblog to products.")
# Return the intents list, the ASDM list, and the processing commands # for the first processing request. There should in general be only # one but the schema permits more. Generalize later if necessary. def _getFirstRequest(pprXmlFile: str) -> tuple[list, str, dict, list, list] | tuple[list, list, str, dict, list, str, list]: # Initialize info = [] relativePath = "" intentsDict = {} commandsList = [] asdmList = [] # Turn the XML file into an object pprObject = _getPprObject(pprXmlFile=pprXmlFile) # Count the processing requests. numRequests = _getNumRequests(pprObject=pprObject) if numRequests <= 0: casa_tools.post_to_log("Terminating execution: No valid processing requests") return info, relativePath, intentsDict, asdmList, commandsList elif numRequests > 1: casa_tools.post_to_log("Warning: More than one processing request") casa_tools.post_to_log('Number of processing requests: ', numRequests) # Get brief project summary info = _getProjectSummary(pprObject) # Get project structure. structure = _getProjectStructure(pprObject) # Get the intents dictionary numIntents, intentsDict = _getIntents(pprObject=pprObject, requestId=0, numRequests=numRequests) casa_tools.post_to_log('Number of intents: {}'.format(numIntents)) casa_tools.post_to_log('Intents dictionary: {}'.format(intentsDict)) # Get the commands list procedureName, numCommands, commandsList = _getCommands(pprObject=pprObject, requestId=0, numRequests=numRequests) casa_tools.post_to_log('Number of commands: {}'.format(numCommands)) casa_tools.post_to_log('Commands list: {}'.format(commandsList)) # Count the scheduling block sets. Normally there should be only # one although the schema allows multiple sets. Check for this # condition and process only the first. numSbSets = _getNumSchedBlockSets(pprObject=pprObject, requestId=0, numRequests=numRequests) if numSbSets <= 0: casa_tools.post_to_log("Terminating execution: No valid scheduling block sets") return info, relativePath, intentsDict, asdmList, commandsList elif numSbSets > 1: casa_tools.post_to_log("Warning: More than one scheduling block set") # Get the ASDM list relativePath, numAsdms, asdmList = _getAsdmList(pprObject=pprObject, sbsetId=0, numSbSets=numSbSets, requestId=0, numRequests=numRequests) casa_tools.post_to_log('Relative path: {}'.format(relativePath)) casa_tools.post_to_log('Number of Asdms: {}'.format(numAsdms)) casa_tools.post_to_log('ASDM list: {}'.format(asdmList)) return info, structure, relativePath, intentsDict, asdmList, procedureName, commandsList # Give the path to the pipeline processing request XML file return the pipeline # processing request object. def _getPprObject(pprXmlFile: str) -> XmlObjectifier.XmlObject: pprObject = XmlObjectifier.XmlObject(fileName=pprXmlFile) return pprObject # Given the pipeline processing request object print some project summary # information. Returns a list of tuples to preserve order (key, (prompt, value)) def _getProjectSummary(pprObject: XmlObjectifier.XmlObject) -> list: ppr_summary = pprObject.SciPipeRequest.ProjectSummary summaryList = [] summaryList.append(('proposal_code', ('Proposal code: ', ppr_summary.ProposalCode.getValue()))) summaryList.append(('observatory', ('Observatory: ', ppr_summary.Observatory.getValue()))) summaryList.append(('telescope', ('Telescope: ', ppr_summary.Telescope.getValue()))) return summaryList # Given the pipeline processing request object print some project structure # information. def _getProjectStructure(pprObject: XmlObjectifier.XmlObject) -> list: # backwards compatibility test ppr_project = pprObject.SciPipeRequest.ProjectStructure try: entityid = ppr_project.OUSStatusRef.getAttribute('entityId') except Exception: ppr_project = ppr_project.AlmaStructure structureList = [] structureList.append( ('ous_entity_type', ('ObsUnitSet Entity Type: ', ppr_project.ObsUnitSetRef.getAttribute('entityTypeName')))) structureList.append( ('ous_entity_id', ('ObsUnitSet Entity Id: ', ppr_project.ObsUnitSetRef.getAttribute('entityId')))) structureList.append( ('ous_part_id', ('ObsUnitSet Part Id: ', ppr_project.ObsUnitSetRef.getAttribute('partId')))) structureList.append( ('ous_title', ('ObsUnitSet Title: ', ppr_project.ObsUnitSetTitle.getValue()))) structureList.append( ('ous_type', ('ObsUnitSet Type: ', ppr_project.ObsUnitSetType.getValue()))) structureList.append( ('ps_entity_type', ('ProjectStatus Entity Type: ', ppr_project.ProjectStatusRef.getAttribute('entityTypeName')))) structureList.append( ('ps_entity_id', ('ProjectStatus Entity Id: ', ppr_project.ProjectStatusRef.getAttribute('entityId')))) structureList.append( ('ousstatus_entity_type', ('OUSStatus Entity Type: ', ppr_project.OUSStatusRef.getAttribute('entityTypeName')))) structureList.append( ('ousstatus_entity_id', ('OUSStatus Entity Id: ', ppr_project.OUSStatusRef.getAttribute('entityId')))) return structureList # Given the pipeline processing request object return the number of processing # requests. This should normally be 1. def _getNumRequests(pprObject: XmlObjectifier.XmlObject) -> int: ppr_prequests = pprObject.SciPipeRequest.ProcessingRequests numRequests = 0 # Try single element / single scheduling block first. try: relative_path = ppr_prequests.ProcessingRequest.DataSet.SchedBlockSet.SchedBlockIdentifier.RelativePath.getValue() numRequests = 1 return numRequests except Exception: pass # Try single element / multiple scheduling block next. try: relative_path = ppr_prequests.ProcessingRequest.DataSet.SchedBlockSet.SchedBlockIdentifier[0].RelativePath.getValue() numRequests = 1 return numRequests except Exception: pass # Next try multiple elements / single scheduling block search = 1 while search: try: relative_path = ppr_prequests.ProcessingRequest[numRequests].DataSet.SchedBlockSet.SchedBlockIdentifier.RelativePath.getValue() numRequests = numRequests + 1 except Exception: search = 0 if numRequests > 0: return numRequests else: pass # Next try multiple elements / multiple scheduling block search = 1 while search: try: relative_path = ppr_prequests.ProcessingRequest[numRequests].DataSet.SchedBlockSet.SchedBlockIdentifier[0].RelativePath.getValue() numRequests = numRequests + 1 except Exception: search = 0 if numRequests > 0: return numRequests else: pass # Return the number of requests. return numRequests # Given the pipeline processing request object return a list of processing # intents in the form of a keyword and value dictionary def _getIntents(pprObject: XmlObjectifier.XmlObject, requestId: int, numRequests: int) -> tuple[int, dict]: if numRequests == 1: ppr_intents = pprObject.SciPipeRequest.ProcessingRequests.ProcessingRequest.ProcessingIntents else: ppr_intents = pprObject.SciPipeRequest.ProcessingRequests.ProcessingRequest[requestId].ProcessingIntents intentsDict = {} numIntents = 0 try: intentName = ppr_intents.Intents.Keyword.getValue() try: intentValue = ppr_intents.Intents.Value.getValue() except Exception: intentValue = "" numIntents = 1 intentsDict[intentName] = intentValue except Exception: search = 1 while search: try: intentName = ppr_intents.Intents[numIntents].Keyword.getValue() try: intentValue = ppr_intents.Intents[numIntents].Value.getValue() except Exception: intentValue = "" numIntents = numIntents + 1 intentsDict[intentName] = intentValue except Exception: search = 0 return numIntents, intentsDict def _getPerformanceParameters(intentsDict: dict) -> project.PerformanceParameters: performanceParams = project.PerformanceParameters() # No performance parameters if len(intentsDict) <= 0: return performanceParams # Set supported attributes for key in intentsDict: # Parameter not defined in __init__ method if not hasattr(performanceParams, key): continue setattr(performanceParams, key, intentsDict[key]) return performanceParams def _getProcessingIntents(intentsDict: dict, procedureName: str) -> dict: """ Filter the real processing intents from the PPR ProcessingIntents section. As a fallback also inspect the recipe name to derive some processing information. Args: intentsDict: The dictionary with the PPR ProcessingIntents content. procedureName: The PPR recipe name. Returns: processingIntents: The dictionary with the real processing intents. """ # Initial filtering of PPR intents processingIntents = dict((k, v) for (k, v) in intentsDict.items() if k in KNOWN_PROCESSING_INTENTS) # Fallback via recipe names for k, v in KNOWN_PROCESSING_INTENTS.items(): if (k not in processingIntents) and (procedureName in v): processingIntents[k] = True return processingIntents def _getSessions(intentsDict: dict) -> dict: sessionsDict = {} searching = True ptr = 1 while searching: key = 'SESSION_' + str(ptr) if key in intentsDict: asdmList = intentsDict[key].split(' | ') asdmList = [asdm.translate(str.maketrans(':/', '__')) for asdm in asdmList] sessionsDict[key] = asdmList ptr = ptr + 1 else: searching = False break return sessionsDict # Given the pipeline processing request object return a list of processing # commands where each element in the list is a tuple consisting of the # processing command name and the parameter set dictionary. def _getCommands(pprObject: XmlObjectifier.XmlObject, requestId: int, numRequests: int) -> tuple[str, int, list]: if numRequests == 1: ppr_cmds = pprObject.SciPipeRequest.ProcessingRequests.ProcessingRequest.ProcessingProcedure else: ppr_cmds = pprObject.SciPipeRequest.ProcessingRequests.ProcessingRequest[requestId].ProcessingProcedure try: procedureName = ppr_cmds.ProcedureTitle.getValue() except Exception: procedureName = "Undefined" commandsList = [] numCommands = 0 try: cmdName = ppr_cmds.ProcessingCommand.Command.getValue() ppr_params = ppr_cmds.ProcessingCommand.ParameterSet numParams, paramsDict = _getParameters(ppr_params) numCommands = 1 commandsList.append((cmdName, paramsDict)) except Exception: search = 1 while search: try: cmdName = ppr_cmds.ProcessingCommand[numCommands].Command.getValue() ppr_params = ppr_cmds.ProcessingCommand[numCommands].ParameterSet numParams, paramsDict = _getParameters(ppr_params) numCommands = numCommands + 1 commandsList.append((cmdName, paramsDict)) except Exception: search = 0 return procedureName, numCommands, commandsList # Given the pipeline processing request object return the number of scheduling # block sets. def _getNumSchedBlockSets(pprObject: XmlObjectifier.XmlObject, requestId: int, numRequests: int) -> int: if numRequests == 1: ppr_dset = pprObject.SciPipeRequest.ProcessingRequests.ProcessingRequest.DataSet else: ppr_dset = pprObject.SciPipeRequest.ProcessingRequests.ProcessingRequest[requestId].DataSet try: path = ppr_dset.SchedBlockSet.SchedBlockIdentifier.RelativePath.getValue() numSchedBlockSets = 1 except Exception: numSchedBlockSets = 0 return numSchedBlockSets # Given the pipeline processing request object return a list of ASDMs # where each element in the list is a tuple consisting of the path # to the ASDM, the name of the ASDM, and the UID of the ASDM. def _getAsdmList(pprObject: XmlObjectifier.XmlObject, sbsetId: int, numSbSets: int, requestId: int, numRequests: int) -> tuple[str, int, list]: if numRequests == 1: ppr_dset = pprObject.SciPipeRequest.ProcessingRequests.ProcessingRequest.DataSet if numSbSets == 1: ppr_dset = ppr_dset.SchedBlockSet.SchedBlockIdentifier else: ppr_dset = ppr_dset.SchedBlockSet[sbsetId].SchedBlockIdentifier relativePath = ppr_dset.RelativePath.getValue() else: ppr_dset = pprObject.SciPipeRequest.ProcessingRequests.ProcessingRequest[requestId].DataSet if numSbSets == 1: ppr_dset = ppr_dset.SchedBlockSet.SchedBlockIdentifier else: ppr_dset = ppr_dset.SchedBlockSet[sbsetId].SchedBlockIdentifier relativePath = ppr_dset.RelativePath.getValue() numAsdms = 0 asdmList = [] try: asdmName = ppr_dset.AsdmIdentifier.AsdmDiskName.getValue() asdmUid = ppr_dset.AsdmIdentifier.AsdmRef.ExecBlockId.getValue() asdmList.append((relativePath, asdmName, asdmUid)) numAsdms = 1 except Exception: search = 1 while search: try: asdmName = ppr_dset.AsdmIdentifier[numAsdms].AsdmDiskName.getValue() asdmUid = ppr_dset.AsdmIdentifier[numAsdms].AsdmRef.ExecBlockId.getValue() numAsdms = numAsdms + 1 asdmList.append((relativePath, asdmName, asdmUid)) except Exception: search = 0 return relativePath, numAsdms, asdmList # Given a parameter set object retrieve the parameter set dictionary for # each command. def _getParameters(ppsetObject: XmlObjectifier.XmlObject) -> tuple[int, dict]: numParams = 0 paramsDict = {} try: paramName = ppsetObject.Parameter.Keyword.getValue() try: paramValue = ppsetObject.Parameter.Value.getValue() except Exception: paramValue = "" numParams = 1 paramsDict[paramName] = paramValue except Exception: search = 1 while search: try: paramName = ppsetObject.Parameter[numParams].Keyword.getValue() try: paramValue = ppsetObject.Parameter[numParams].Value.getValue() except Exception: paramValue = "" numParams = numParams + 1 paramsDict[paramName] = paramValue except Exception: search = 0 return numParams, paramsDict