Running Pipeline

PPR: pipeline processing request

At the highest level of abstraction, we can execute a pipeline processing request (PPR), which is how Pipeline is run in operations at the observatory processing centers (ALMA, VLA).

This can be done at the command line, or at a CASA command prompt.

Execute PPR from the command line:

casa --nologger --nogui --log2term --agg -c $SCIPIPE_HEURISTICS/pipeline/runvlapipeline.py PPRnew_VLAT003.xml

At a CASA command prompt:

# execute a pipeline processing request (PPR)
CASA <1>: import pipeline.infrastructure.executevlappr as eppr
CASA <2>: eppr.executeppr('PPR_VLAT003.xml', importonly=False)

Series of steps invoking CASA Pipeline tasks

A pipeline run generates an executable script summarizing the steps performed. An older reference for VLA is the VLA CASA Pipeline casaguide (CASA 4.5.3 era).

A pipeline run will generate a file like the following:

pipeline_test_data/VLAT003/working/pipeline-20161014T172229/html/casa_pipescript.py

It contains a series of pipeline task calls. We can execute this script from CASA:

CASA <1>: exec(open('casa_pipescript.py').read())

Or we can run it from the command line:

casa --nogui --log2term -c casa_pipescript.py

We can edit the script and turn on memory usage for each task:

CASA <1>: h_init()
CASA <2>: import pipeline
CASA <3>: pipeline.infrastructure.utils.enable_memstats()

We can also turn the weblog off:

CASA <1>: h_init(loglevel='info', plotlevel='summary', weblog=False)

Or turn debug logging on with the weblog enabled:

CASA <1>: h_init(loglevel='debug', plotlevel='summary', weblog=True)

Full example of running Pipeline importdata task on CASA prompt:

CASA <1>: h_init()
CASA <2>: h_save()
CASA <3>: import pipeline
CASA <4>: hifv_importdata(vis=['../rawdata/13A-537.sb24066356.eb24324502.56514.05971091435'], session=['session_1'], overwrite=False)
CASA <5>: h_save()
CASA <6>: exit

Resuming from a previous Pipeline run on CASA prompt:

casa
CASA <1>: context = h_resume(filename='last')

Creating and running Pipeline tasks in Python, bypassing the task interface

At the lowest level of abstraction, we can bypass the CASA Pipeline Task interface, and work directly within CASA / Python, by instantiating a Pipeline InputsContainer object for the Pipeline Task, using it to instantiate a Pipeline Task object, and then running its execute method to get the task result, as shown in the example below. Here, the example should be run in a directory where the Pipeline has been partly run, i.e. a context already exists.

from pipeline.infrastructure import launcher

context = launcher.Pipeline(context='last').context
inputs = pipeline.infrastructure.vdp.InputsContainer(pipeline.hifv.tasks.hanning.Hanning, context)
task = pipeline.hifv.tasks.hanning.Hanning(inputs)
result = task.execute()
result.accept(context)
context.save()

If we don’t have a PPR or an executable script available.

casa
import pipeline.recipes.hifv as hifv
from pipeline.infrastructure import launcher

# the next line will only importevla and save a context, b/c importonly=True
hifv.hifv(['../rawdata/13A-537.foofoof.eb.barbar.2378.2934723984397'], importonly=True)

context = launcher.Pipeline(context='last').context
vis = '13A-537.foofoof.eb.barbar.2378.2934723984397.ms'
# get the domain object
m = context.observing_run.get_ms(vis)
type(m)
# study this m object for INTENTS
# <class 'pipeline.domain.measurementset.MeasurementSet'>
m.intents  # shows a python set of the MS intents
m.polarization  # show a list of polarization objects

Running Pipeline with the “recipereducer”

The recipereducer module is a tool for Pipeline developers to run the Pipeline without using the Pipeline Processing Request (PPR) system used in observatory operations.

All that you need to pass along to the recipereducer is the recipe name and the path to a directory containing the raw ASDM datasets.

Simple “recipereducer” example

Run pipeline for your ASDM, with default pipeline recipe (hifa_calimage):

import pipeline.recipereducer
pipeline.recipereducer.reduce(
  vis=['../rawdata/yourasdm'],
  # procedure='procedure_hifa_calimage.xml',
)

About the procedure parameter

For the procedure parameter, you can specify either:

  • the filename of one of the standard recipes (procedure files) located in pipeline/recipes

  • a relative/absolute path to a file containing your own (customized) recipe file, e.g. procedure='/path/to/local/procedure.xml', or procedure='../test_procedure_for_PIPE-1234.xml'

If the procedure parameter is not specified, recipereducer will by default run the full ALMA calibration + imaging recipe, procedure_hifa_calimage.xml.

Examples with a different procedure file

Run Single-Dish calibration + imaging on 2 ASDMs:

import pipeline.recipereducer
pipeline.recipereducer.reduce(
  vis=['../rawdata/yourasdm', '../rawdata/yourasdm_2'],
  procedure='procedure_hsd_calimage.xml',
)

Run VLA standard recipe:

import pipeline.recipereducer
pipeline.recipereducer.reduce(
  vis=['../rawdata/yourasdm'],
  procedure='procedure_hifv.xml',
)

Run with a local copy of a recipe that you may have modified for development:

import pipeline.recipereducer
pipeline.recipereducer.reduce(
  vis=['../rawdata/yourasdm'],
  procedure='../test_procedure_for_PIPE-1234.xml',
)

Examples of stopping / starting at specific stages

Stop after stage 3 has completed. It will depend on the recipe which task this stage number corresponds to.

import pipeline.recipereducer
pipeline.recipereducer.reduce(
    vis=['../rawdata/yourasdm'],
    procedure='procedure_hifa_calimage.xml',
    exitstage=3,
)

Assuming the developer developed / debugged stage 4, once that is done, it is possible to use recipereducer to resume with the remainder of the recipe, with:

import pipeline.recipereducer
from pipeline.infrastructure import launcher

context = launcher.Pipeline(context='last').context
pipeline.recipereducer.reduce(
    vis=['../rawdata/yourasdm'],
    context=context,
    procedure='procedure_hifa_calimage.xml',
    startstage=5,
)

Here, since Pipeline is resuming from an earlier run, the existing context must be passed into recipereducer.

Example with sessions

Some datasets will have measurement sets grouped together in separate sessions, which can be specified with:

import pipeline.recipereducer
pipeline.recipereducer.reduce(
    vis=['../rawdata/yourasdm_1', '../rawdata/yourasdm_2', '../rawdata/yourasdm_3'],
    session=['session1', 'session3', 'session3'],
    procedure='procedure_hifa_calimage.xml',
    exitstage=3,
)

Here, the 1st ASDM belongs to “session1” while the latter 2 ASDMs are both in “session3”. Note, session names are assigned during operations, will normally be populated in the PPR files, and may not always appear to have sequential numbers.

Example changing log level

If not specified, the loglevel is info by default, but this can be change to e.g. debug or trace to get (a lot) more diagnostic information during the run:

import pipeline.recipereducer
pipeline.recipereducer.reduce(
    vis=['../rawdata/yourasdm_1'],
    loglevel='debug',
    plotlevel='summary',
)

Example workflows for a Pipeline developer

For developers, a common workflow is to iteratively run a specific task to reproduce an issue, debug a specific task, and/or implement a new stage.

Pipeline does not currently support snapshotting of the state after each run, and various pipeline tasks do permanently change the stage of the measurement set (flagging, corrected, and/or model column) or calibration state in the context. As a result, it would normally not be representative to keep re-running the same stage multiple times in a row, as each subsequent execution starts from a different state.

Since a full Pipeline run can take a significant amount of time (hours to days depending on the size and complexity of the dataset), a recommended workflow for developers would be to:

  • run with recipereducer and the exitstage option up to the start of the stage you want to debug / implement

  • create a tarball of the working directory, or restore the working directory from a previously created tarball

  • run the stage-to-develop/debug individually (regular or in interactive debugger mode)

To this end, it may help to create a couple of custom Python scripts:

A template.script that runs pipeline up to just before the stage where you wish to implement / debug a task.

import pipeline.recipereducer
pipeline.recipereducer.reduce(
    vis=['../rawdata/yourasdm'],
    loglevel='info',
    # exitstage=1,  # uncomment this line and set correct stage to stop after.
)

Place it in the parent of the working directory, and run with:

cd working
casa -c ../template.script

A debug.script that executes an individual task:

task_to_run = 'hifa_tsysflag'

import pipeline
from pipeline.infrastructure import task_registry, launcher

context = launcher.Pipeline(context='last', loglevel='info', plotlevel='default').context

taskclass = task_registry.get_pipeline_class_for_task(task_to_run)
inputs = pipeline.infrastructure.vdp.InputsContainer(taskclass, context)

# Optionally override input parameter(s) for debugging, e.g.:
# inputs.normalize_tsys = True

task = taskclass(inputs)
result = task.execute()
result.accept(context)
context.save()

Place it in the parent of the working directory, and run with:

casa -c ../debug.script

Note: as an alternative to re-running an entire task for investigating and debugging issues, see the infrastructure.utils.utils.function_io_dumper decorator functionality that is documented in FunctionIODumper.md, introduced in PIPE-2354 (2025 release).

Note: a Pipeline run is relocatable, so one can resume from a working directory (and context) that is located at a different path from where the run was originally started. Depending on the ticket at hand, this can allow a developer a couple of short-cuts:

  • Create a run up through stage N, tarball working to snapshot, and optionally unpack to multiple copies (working1, working2, working3) to run (in parallel) a given stage with different checkouts of the Pipeline.

  • When working on a shared cluster such as at NRAO Charlottesville, one can copy the run from a different developer / PWG member into their own workspace, and resume the context (optionally even one of the pickled results from the stage, located under pipeline-[...]/saved_state) to inspect the context, re-run a stage with the interactive debugger, etc. This can be particularly useful for times where one has to urgently debug an issue that occurred for a large dataset.

Create custom Pipeline processing recipes from SRDP Mustache templates

The SRDP templates are mustache templates that can be used to generate custom SRDP processing recipe XML files. All templates are located in the pipeline/recipes directory (template_*.xml). A mustache template contains both the XML and the mustache tags, the latter of which allows the insertion of values from the JSON file into the XML during rendering.

To generate a custom SRDP recipe, you will need to create a JSON file that contains the custom mustache tag values. Once the JSON file is prepared, the SRDP recipes can be rendered for testing. Two recommended rendering options are below:

  • Use the demo page on the mustache site. You can copy and paste the text from the JSON file and template into the boxes and click the “Render Template” button.

  • Use a Python Library, e.g. pystache or chevron, you could do something like this:

import json, pystache
with open('pipeline/recipes/tests/test_hifa_cubeimage.json') as f_json, open('template_hifa_cubeimage.xml') as f_template:
    d = json.load(f_json)
    t = f_template.read()
    with open('recipe.xml', 'w') as out:
        out.write(pystache.render(t,d))

Workflow Break/Resume

Context-by-Stage

The context content at individual stages can be pickled after the completion of each PL task. The implementation is in infrastructure.basetask and will only be switched on if the pipeline is in the DEBUG (or lower) logging level. This feature works for both PPR and recipereducer runs.

The path of pickled context files is: output_dir/context_name/saved_state/context-stage*.pickle, saved along with result-stage*.pickle which is always present in the same directory.

Break/Resume

  • executeppr offers a “break/resume” feature at the workflow level (see the optional keywords bpaction and breakpoint). One practical example is below, which is based on the test data available in the pipeline-testdata repository:

    • pl-unittest/uid___A002_Xc46ab2_X15ae_repSPW_spw16_17_small.ms

    • pl-regressiontest/uid___A002_Xc46ab2_X15ae_repSPW_spw16_17_small/PPR.xml

    We first put the above MeasurementSet and PPR in the rawdata directory of your workspace. This specific PPR file already has a breakpoint set after the first hifa_exportdata call:

    To run the PPR up to the breakpoint, at a CASA command prompt,

    import os
    import pipeline.infrastructure.executeppr as eppr
    os.environ['SCIPIPE_ROOTDIR'] = os.getcwd()
    eppr.executeppr('../rawdata/PPR.xml', importonly=False, loglevel='debug', bpaction='break')
    

    From the current or a new CASA session, the PPR can be resumed,

    import os
    import pipeline.infrastructure.executeppr as eppr
    os.environ['SCIPIPE_ROOTDIR'] = os.getcwd()
    eppr.executeppr('../rawdata/PPR.xml', importonly=False, loglevel='debug', bpaction='resume')
    

    Note: if you try to run the PPR with bpaction=‘resume’ again, the subsequent call(s) will likely fail: executeppr is hardcoded to resume from the “last” context (i.e. a .context file with the latest timestamp from your working directory), but we need the context content from the “breakpoint” stage to resume. Also, the file states (e.g. MSs, caltables) have changed. The only safe option is making a copy of the working directory before trying resume in case you might tweak the PPR in another attempt.

    For development/test purposes, one workaround to avoid copying the entire working directory is to create a fresh copy of the context pickled from the “breakpoint” stage (where your loglevel=‘debug’ setting is crucial) in the existing working directory:

    First, you back up the context pickle files from different stages:

    cp -rf pipeline-20210421T172403/saved_state pipeline-20210421T172403/saved_state_backup
    

    Then, at a CASA command prompt,

    import os
    os.environ['SCIPIPE_ROOTDIR'] = os.getcwd()
    os.system('cp -rf pipeline-20210421T172403/saved_state_backup/context-stage26.pickle current.context')
    # we need some cleanup as this MS below is blocking the execution of the stage27 hif_mstransform() call.
    os.system('rm -rf uid___A002_Xc46ab2_X15ae_repSPW_spw16_17_small_target.ms*')
    import pipeline.infrastructure.executeppr as eppr
    eppr.executeppr('../rawdata/PPR.xml', importonly=False, loglevel='debug', bpaction='resume')
    

    Again, please note that the above workaround might yield scientifically meaningless results due to the changing file status and is only useful for testing under certain scenarios. All break/resume approaches use the current files (e.g. MSs/caltables) in your working directory. So be aware of the existence of files/versions that might be unexpected to the resumed PL workflow task call(s)!

  • With recipereducer, you can load context saved at a specific stage from the working directory and run/rerun the next PL task designed in the workflow (also see the demonstration in the last section).

    A full recipe run with the above test data example can be done via,

    import pipeline.recipereducer, os
    pipeline.recipereducer.reduce(vis=['../rawdata/uid___A002_Xc46ab2_X15ae_repSPW_spw16_17_small.ms'],
                                procedure='procedure_hifa_calimage.xml', loglevel='debug')
    

    With the context pickled from individual stages, you may pick and test a single pipeline stage in your development:

    task_to_run='hif_checkproductsize'
    task_keywords={'maxcubesize':40.0,'maxcubelimit':60,'maxproductsize':500.0}
    import pipeline
    from pipeline.infrastructure import task_registry, launcher
    context = launcher.Pipeline(context='pipeline-procedure_hifa_calimage/saved_state/context-stage24.pickle', loglevel='debug', plotlevel='default').context
    taskclass = task_registry.get_pipeline_class_for_task(task_to_run)
    inputs = pipeline.infrastructure.vdp.InputsContainer(taskclass, context, **task_keywords)
    task = taskclass(inputs)
    result = task.execute()
    result.accept(context)
    context.save('test-context-stage25.pickle')
    

    recipereducer doesn’t offer the “breakpoint” feature built into executeppr. However, a careful use of the startstage/exitstage/context keywords can achieve the same workflow-level “break/resume”:

    from pipeline.infrastructure import task_registry, launcher
    context = launcher.Pipeline(context='pipeline-procedure_hifa_calimage/saved_state/context-stage3.pickle', loglevel='debug', plotlevel='default').context
    import pipeline.recipereducer
    pipeline.recipereducer.reduce(vis=['../rawdata/uid___A002_Xc46ab2_X15ae_repSPW_spw16_17_small.ms'],
                                  procedure='procedure_hifa_calimage.xml', loglevel='debug',startstage=4,exitstage=20,
                                  context=context)
    

Re-render Weblog

A common development task is improving weblog. Without re-running a time-consuming pipeline task itself, you can only re-render the weblog using the existing context/result to test a small weblog-related change (e.g., minor tweaks in mako templates). Note: this will only rerun the weblog rendering portion of a pipeline stage (therefore limits your testing scope).

import os
import pipeline.infrastructure.renderer.htmlrenderer as hr
from pipeline.infrastructure import launcher

# Modify this to select which stage number to re-render weblog for.
os.environ['WEBLOG_RERENDER_STAGES'] = "16"

context = launcher.Pipeline(context='last', loglevel='debug', plotlevel='default').context
hr.WebLogGenerator.render(context)

Re-run QA heuristics

A common development task is to implement or improve the QA heuristics for a task. The QA step is normally run after the main task heuristics are completed and the task Results have been created.

As such, after having run the task at least once, one can rapidly re-run the QA heuristic on the existing task result in the context to iterate more rapidly on the heuristic without having to re-run the time-consuming pipeline task itself.

from pipeline.infrastructure import pipelineqa, launcher
context = launcher.Pipeline(context='last', loglevel='debug', plotlevel='default').context

# Modify this to select which stage to run QA for.
stage_number = 14

task_results = context.results[stage_number-1].read()
pipelineqa.qa_registry.do_qa(context, task_results)

Note, the snippet above only re-runs the QA heuristic on a copy of the results read in from disk, and updates the QA scores in this copy of the results in-place. However, this does not modify the results on disk nor does it attempt to re-run the weblog rendering; therefore re-running the QA this way will not update the QA scores that are displayed in the task weblog. Instead, the snippet is primarily useful for interactively debugging the QA heuristics. Once debugged, it would be recommended to re-run the entire stage.

Note on using executeppr/recipereducer and Pipeline CLI tasks

All the above use cases rely on a global pipeline context object that holds information about the pipeline processing state. Be careful when executeppr/recipereducer and CLI tasks are used together in a single CASA session, especially when running executeppr/recipereducer in the middle of interactive processing with CLI tasks, e.g.,

  1. h_init to start interactive processing (creates context #1 and registers it in the global scope)

  2. run various tasks interactively (with context #1)

  3. run executeppr (creates context #2 and overwrites global context #1)

  4. run some more tasks interactively (with context #2)

In this example, steps 2 and 4 are no longer continuous because they refer to different context objects. In such cases, save the context object to disk before running executeppr/recipereducer and resume it afterwards. The above workflow should be tweaked as follows:

  1. h_init to start interactive processing (creates context #1 and register it to global scope)

  2. run various tasks interactively (with context #1)

  3. save current state to disk with h_save('interactive.context') (save context #1 to disk)

  4. run executeppr (creates context #2 and overwrite global context #1)

  5. resume previous state from disk with h_resume('interactive.context') (load context #1 from disk and overwrite global context #2)

  6. run some more tasks interactively (with context #1)