Source code for pipeline.recipereducer

"""
recipereducer is a utility to reduce data using a standard pipeline procedure.
It parses a XML reduction recipe, converts it to pipeline tasks, and executes
the tasks for the given data. It was written to give pipeline developers
without access to PPRs and/or a PPR generator a way to reduce data using the
latest standard recipe.

Note: multiple input datasets can be specified. Doing so will reduce the data
      as part of the same session.

Example #1: process uid123.tar.gz using the standard recipe.

    import pipeline.recipereducer
    pipeline.recipereducer.reduce(vis=['uid123.tar.gz'])

Example #2: process uid123.tar.gz using a named recipe.

    import pipeline.recipereducer
    pipeline.recipereducer.reduce(vis=['uid123.tar.gz'],
                                  procedure='procedure_hif.xml')

Example #3: process uid123.tar.gz and uid124.tar.gz using the standard recipe.

    import pipeline.recipereducer
    pipeline.recipereducer.reduce(vis=['uid123.tar.gz', 'uid124.tar.gz'])

Example #4: process uid123.tar.gz, naming the context 'testrun', thus
            directing all weblog output to a directory called 'testrun'.

    import pipeline.recipereducer
    pipeline.recipereducer.reduce(vis=['uid123.tar.gz'], name='testrun')

Example #5: process uid123.tar.gz with a log level of TRACE

    import pipeline.recipereducer
    pipeline.recipereducer.reduce(vis=['uid123.tar.gz'], loglevel='trace')

"""
from __future__ import annotations

import collections
import os
import tempfile
import traceback
import xml.etree.ElementTree as ElementTree
from typing import TYPE_CHECKING

import pipeline.h.cli.cli as h_cli
from pipeline import cli, infrastructure
from pipeline.infrastructure import exceptions, launcher, utils

if TYPE_CHECKING:
    from collections.abc import Callable
    from typing import Any

    from pipeline.infrastructure.launcher import Context

LOG = infrastructure.logging.get_logger(__name__)
RECIPES_DIR = os.path.realpath(os.path.join(os.path.dirname(__file__), 'recipes'))


TaskArgs = collections.namedtuple('TaskArgs', 'vis infiles session')


def _create_context(loglevel: str, plotlevel: str, name: str) -> Context:
    """Create Pipeline context.

    Args:
        loglevel: Logging level
        plotlevel: Plot level
        name: Name of the context

    Returns:
        Pipeline context object
    """
    pipeline = launcher.Pipeline(loglevel=loglevel, plotlevel=plotlevel,
                                 name=name)
    return pipeline.context


def _register_context(loglevel: str, plotlevel: str, context: Context) -> None:
    """Register given context to global scope.

    If pipeline context already exists in global scope, it is saved on
    disk to avoid being overwritten.

    Args:
        loglevel: Logging level
        plotlevel: Plot level
        context: Pipeline context object
    """
    # check if global context exists
    pipeline_instance = h_cli.stack.get(h_cli.PIPELINE_NAME, None)
    if pipeline_instance and isinstance(pipeline_instance, launcher.Pipeline):
        # if global context exists, check identity with given context
        global_context = pipeline_instance.context
        if global_context == context:
            # context is already registered
            return
        elif global_context.name != context.name:
            # global context is different from given context
            # save global context with intrinsic name
            global_context.save()
            context_file = f'{global_context.name}.context'
            LOG.info('Global context exists. Saved it %s to disk.', context_file)
        else:
            # global context is different from given context but
            # they accidentally have the same name
            # save global context with different name to avoid
            # name conflict with new one
            for i in range(10):
                context_file = f'{global_context.name}-{i}.context'
                if not os.path.exists(context_file):
                    global_context.save(context_file)
                    LOG.info('Global context exists. Saved it %s to disk.', context_file)
                    break
            else:
                # failed attempt to find appropriate context name
                # it should rarely happen, but overwrite existing context
                # if it happened
                LOG.warning('Existing Pipeline context will be overridden by the current pipeline processing.')

    # register given context to global scope
    with tempfile.TemporaryDirectory(dir='.') as temp_dir:
        context_name = os.path.join(temp_dir, context.name)
        try:
            # to disable some log messages during registration
            temp_loglevel = 'error'
            infrastructure.logging.set_logging_level(level=temp_loglevel)
            context.save(context_name)
            # create pipeline instance using temporary context
            pipeline_instance = launcher.Pipeline(
                loglevel=temp_loglevel, plotlevel=plotlevel,
                context=context_name
            )
            # then, replace context
            pipeline_instance.context = context
            h_cli.stack[h_cli.PIPELINE_NAME] = pipeline_instance
        finally:
            # set user-specified loglevel
            infrastructure.logging.set_logging_level(level=loglevel)


def _get_context_name(procedure: str) -> str:
    root, _ = os.path.splitext(os.path.basename(procedure))
    return f'pipeline-{root}'


def _get_processing_procedure(procedure: str) -> ElementTree:
    # find the procedure file on disk, then fall back to the standard recipes
    if os.path.exists(procedure):
        procedure_file = os.path.abspath(procedure)
    else:
        procedure_file = os.path.join(RECIPES_DIR, procedure)
    if os.path.exists(procedure_file):
        LOG.info('Using procedure file: %s', procedure_file)
    else:
        msg = f'Procedure not found:: {procedure}'
        LOG.error(msg)
        raise IOError(msg)

    procedure_xml = ElementTree.parse(procedure_file)
    if not procedure_xml:
        msg = f'Could not parse procedure file at {procedure_file}'
        LOG.error(msg)
        raise IOError(msg)

    return procedure_xml


def _get_procedure_title(procedure: str) -> str:
    procedure_xml = _get_processing_procedure(procedure)
    procedure_title = procedure_xml.findtext('ProcedureTitle', default='Undefined')
    return procedure_title


def _get_tasks(context: Context, args: TaskArgs, procedure: str):
    procedure_xml = _get_processing_procedure(procedure)

    commands_seen = []
    for processingcommand in procedure_xml.findall('ProcessingCommand'):
        cli_command = processingcommand.findtext('Command')
        commands_seen.append(cli_command)

        # ignore breakpoints
        if cli_command == 'breakpoint':
            continue

        # skip exportdata when preceded by a breakpoint
        if len(commands_seen) > 1 and commands_seen[-2] == ['breakpoint'] \
                and cli_command == 'hif_exportdata':
            continue

        task_args = {}

        if cli_command in ['h_importdata',
                           'hifa_importdata',
                           'hifv_importdata',
                           'hsd_importdata',
                           'hsdn_importdata',
                           'h_restoredata',
                           'hif_restoredata',
                           'hifa_restoredata',
                           'hifv_restoredata',
                           'hsd_restoredata']:
            task_args['vis'] = args.vis
            # we might override this later with the procedure definition
            task_args['session'] = args.session

        elif cli_command in [ 'hsdn_restoredata' ]:
            task_args['vis'] = args.vis

        for parameterset in processingcommand.findall('ParameterSet'):
            for parameter in parameterset.findall('Parameter'):
                argname = parameter.findtext('Keyword')
                argval = parameter.findtext('Value')
                task_args[argname] = utils.string_to_val(argval)

        # we yield rather than return so that the context can be updated
        # between task executions
        task = cli.get_pipeline_task_with_name(cli_command)
        yield task, task_args


def _format_arg_value(arg_val: tuple[Any, Any]) -> str:
    arg, val = arg_val
    return f'{arg}={val!r}'


def _as_task_call(task_func: Callable, task_args: dict) -> str:
    kw_args = list(map(_format_arg_value, task_args.items()))
    return f"{task_func.__name__}({', '.join(kw_args)})"


def _execute_task(task: Callable, task_args: dict) -> Any:
    """Execute a single pipeline task with logging and error handling.

    This function handles task execution with consistent logging and error
    handling. It will:
    - Log the task call before execution
    - Catch and log any unhandled exceptions (returns None if exception occurs)
    - Check for tracebacks in the result and raise PipelineException if found

    Args:
        task: The task callable to execute.
        task_args: Dictionary of arguments to pass to the task.

    Returns:
        The task result object, or None if an unhandled exception occurred.

    Raises:
        exceptions.PipelineException: If the task result contains tracebacks.
    """
    LOG.info('Executing pipeline task %s', _as_task_call(task, task_args))

    try:
        result = task(**task_args)
    except Exception:
        # Log message if an exception occurred that was not handled by
        # standardtask template (not turned into failed task result).
        call_str = _as_task_call(task, task_args)
        LOG.error('Unhandled error while running pipeline task %s.', call_str)
        traceback.print_exc()
        return None

    # Check for tracebacks in successful task execution
    tracebacks = utils.get_tracebacks(result)
    if tracebacks:
        previous_tracebacks_as_string = '\n'.join(tracebacks)
        raise exceptions.PipelineException(previous_tracebacks_as_string)

    return result


[docs] def run_named_tasks(tasks: list[tuple[str, dict[str, Any]]]) -> Context: """Run a sequence of pipeline tasks specified by name and argument dicts. This utility centralizes the logic previously duplicated in the test framework so that component tests and other callers can invoke a consistent execution path with uniform logging and error handling. The function will: 1. Initialize a fresh pipeline context via `cli.h_init()`. 2. Resolve each task name to a callable using `cli.get_pipeline_task_with_name`. 3. Execute tasks in order, logging the call signature. 4. Raise `exceptions.PipelineException` immediately if any task result reports tracebacks (handled inside `_execute_task`). 5. Save the context via `cli.h_save()`. Args: tasks: Ordered list of (task_name, task_args_dict) tuples. Returns: Context: The final pipeline context (the most recent / 'last'). Raises: exceptions.PipelineException: If any executed task reports tracebacks. """ LOG.info('Initializing context for explicit task list...') cli.h_init() for task_name, task_args in tasks: task = cli.get_pipeline_task_with_name(task_name=task_name) result = _execute_task(task, task_args) # If an unhandled exception occurred, return context early if result is None: return launcher.Pipeline(context='last').context LOG.info('Saving context after explicit task list execution...') cli.h_save() # Return the current ("last") context for caller convenience. return launcher.Pipeline(context='last').context
[docs] def reduce( vis: list[str] | None = None, infiles: list[str] | None = None, procedure: str = 'procedure_hifa_calimage.xml', context: Context | None = None, name: str | None = None, loglevel: str = 'info', plotlevel: str = 'default', session: list[str] | None = None, exitstage: int | None = None, startstage: int | None = None, ) -> Context: """Executes a CASA Pipeline data reduction procedure. This function triggers the execution of pipeline tasks defined in a given XML recipe procedure, managing context creation and task sequencing. Args: vis: List of measurement sets to process. infiles: Supplementary input files for the pipeline execution. procedure: XML procedure file defining the pipeline workflow. context: Existing context object. A new context is created if None. name: Context name, used only if `context` is None. loglevel: Logging level (e.g., 'info', 'debug'). plotlevel: Plotting verbosity level used during task execution. session: List of session names corresponding to each vis. Defaults to "default". exitstage: If provided, stops execution after this stage number. startstage: If provided, begins execution from this stage number. Returns: The pipeline context object with updated state. Raises: exceptions.PipelineException: Raised if a pipeline task fails with tracebacks. """ if vis is None: vis = [] if infiles is None: infiles = [] if context is None: name = name if name else _get_context_name(procedure) context = _create_context(loglevel, plotlevel, name) procedure_title = _get_procedure_title(procedure) context.set_state('ProjectStructure', 'recipe_name', procedure_title) _register_context(loglevel, plotlevel, context) if session is None: session = ['default'] * len(vis) if startstage is None: startstage = 0 task_args = TaskArgs(vis, infiles, session) task_generator = _get_tasks(context, task_args, procedure) try: procedure_stage_nr = 0 while True: task, task_args = next(task_generator) procedure_stage_nr += 1 if procedure_stage_nr < startstage: continue result = _execute_task(task, task_args) # If an unhandled exception occurred, return context early if result is None: return context # Check if we should exit after this stage if result.stage_number is exitstage: break except StopIteration: pass finally: LOG.info('Saving context...') cli.h_save() return context