Running Pipeline¶
PPR: pipeline processing request¶
At the highest level of abstraction, we can execute a pipeline processing request (PPR), which is how Pipeline is run in operations at the observatory processing centers (ALMA, VLA).
This can be done at the command line, or at a CASA command prompt.
Execute PPR from the command line:
casa --nologger --nogui --log2term --agg -c $SCIPIPE_HEURISTICS/pipeline/runvlapipeline.py PPRnew_VLAT003.xml
At a CASA command prompt:
# execute a pipeline processing request (PPR)
CASA <1>: import pipeline.infrastructure.executevlappr as eppr
CASA <2>: eppr.executeppr('PPR_VLAT003.xml', importonly=False)
Series of steps invoking CASA Pipeline tasks¶
A pipeline run generates an executable script summarizing the steps performed. An older reference for VLA is the VLA CASA Pipeline casaguide (CASA 4.5.3 era).
A pipeline run will generate a file like the following:
pipeline_test_data/VLAT003/working/pipeline-20161014T172229/html/casa_pipescript.py
It contains a series of pipeline task calls. We can execute this script from CASA:
CASA <1>: exec(open('casa_pipescript.py').read())
Or we can run it from the command line:
casa --nogui --log2term -c casa_pipescript.py
We can edit the script and turn on memory usage for each task:
CASA <1>: h_init()
CASA <2>: import pipeline
CASA <3>: pipeline.infrastructure.utils.enable_memstats()
We can also turn the weblog off:
CASA <1>: h_init(loglevel='info', plotlevel='summary', weblog=False)
Or turn debug logging on with the weblog enabled:
CASA <1>: h_init(loglevel='debug', plotlevel='summary', weblog=True)
Full example of running Pipeline importdata task on CASA prompt:
CASA <1>: h_init()
CASA <2>: h_save()
CASA <3>: import pipeline
CASA <4>: hifv_importdata(vis=['../rawdata/13A-537.sb24066356.eb24324502.56514.05971091435'], session=['session_1'], overwrite=False)
CASA <5>: h_save()
CASA <6>: exit
Resuming from a previous Pipeline run on CASA prompt:
casa
CASA <1>: context = h_resume(filename='last')
Creating and running Pipeline tasks in Python, bypassing the task interface¶
At the lowest level of abstraction, we can bypass the CASA Pipeline Task interface, and work directly within
CASA / Python, by instantiating a Pipeline InputsContainer object for the Pipeline Task, using it to instantiate a
Pipeline Task object, and then running its execute method to get the task result, as shown in the example below. Here,
the example should be run in a directory where the Pipeline has been partly run, i.e. a context already exists.
from pipeline.infrastructure import launcher
context = launcher.Pipeline(context='last').context
inputs = pipeline.infrastructure.vdp.InputsContainer(pipeline.hifv.tasks.hanning.Hanning, context)
task = pipeline.hifv.tasks.hanning.Hanning(inputs)
result = task.execute()
result.accept(context)
context.save()
If we don’t have a PPR or an executable script available.
casa
import pipeline.recipes.hifv as hifv
from pipeline.infrastructure import launcher
# the next line will only importevla and save a context, b/c importonly=True
hifv.hifv(['../rawdata/13A-537.foofoof.eb.barbar.2378.2934723984397'], importonly=True)
context = launcher.Pipeline(context='last').context
vis = '13A-537.foofoof.eb.barbar.2378.2934723984397.ms'
# get the domain object
m = context.observing_run.get_ms(vis)
type(m)
# study this m object for INTENTS
# <class 'pipeline.domain.measurementset.MeasurementSet'>
m.intents # shows a python set of the MS intents
m.polarization # show a list of polarization objects
Running Pipeline with the “recipereducer”¶
The recipereducer module is a tool for Pipeline developers to run the Pipeline without using the Pipeline Processing Request (PPR) system used in observatory operations.
All that you need to pass along to the recipereducer is the recipe name and the
path to a directory containing the raw ASDM datasets.
Simple “recipereducer” example¶
Run pipeline for your ASDM, with default pipeline recipe (hifa_calimage):
import pipeline.recipereducer
pipeline.recipereducer.reduce(
vis=['../rawdata/yourasdm'],
# procedure='procedure_hifa_calimage.xml',
)
About the procedure parameter¶
For the procedure parameter, you can specify either:
the filename of one of the standard recipes (procedure files) located in
pipeline/recipesa relative/absolute path to a file containing your own (customized) recipe file, e.g.
procedure='/path/to/local/procedure.xml', orprocedure='../test_procedure_for_PIPE-1234.xml'
If the procedure parameter is not specified, recipereducer will by default
run the full ALMA calibration + imaging recipe, procedure_hifa_calimage.xml.
Examples with a different procedure file¶
Run Single-Dish calibration + imaging on 2 ASDMs:
import pipeline.recipereducer
pipeline.recipereducer.reduce(
vis=['../rawdata/yourasdm', '../rawdata/yourasdm_2'],
procedure='procedure_hsd_calimage.xml',
)
Run VLA standard recipe:
import pipeline.recipereducer
pipeline.recipereducer.reduce(
vis=['../rawdata/yourasdm'],
procedure='procedure_hifv.xml',
)
Run with a local copy of a recipe that you may have modified for development:
import pipeline.recipereducer
pipeline.recipereducer.reduce(
vis=['../rawdata/yourasdm'],
procedure='../test_procedure_for_PIPE-1234.xml',
)
Examples of stopping / starting at specific stages¶
Stop after stage 3 has completed. It will depend on the recipe which task this stage number corresponds to.
import pipeline.recipereducer
pipeline.recipereducer.reduce(
vis=['../rawdata/yourasdm'],
procedure='procedure_hifa_calimage.xml',
exitstage=3,
)
Assuming the developer developed / debugged stage 4, once that is done, it is
possible to use recipereducer to resume with the remainder of the recipe, with:
import pipeline.recipereducer
from pipeline.infrastructure import launcher
context = launcher.Pipeline(context='last').context
pipeline.recipereducer.reduce(
vis=['../rawdata/yourasdm'],
context=context,
procedure='procedure_hifa_calimage.xml',
startstage=5,
)
Here, since Pipeline is resuming from an earlier run, the existing context must be passed into recipereducer.
Example with sessions¶
Some datasets will have measurement sets grouped together in separate sessions, which can be specified with:
import pipeline.recipereducer
pipeline.recipereducer.reduce(
vis=['../rawdata/yourasdm_1', '../rawdata/yourasdm_2', '../rawdata/yourasdm_3'],
session=['session1', 'session3', 'session3'],
procedure='procedure_hifa_calimage.xml',
exitstage=3,
)
Here, the 1st ASDM belongs to “session1” while the latter 2 ASDMs are both in “session3”. Note, session names are assigned during operations, will normally be populated in the PPR files, and may not always appear to have sequential numbers.
Example changing log level¶
If not specified, the loglevel is info by default, but this can be change to e.g. debug or trace to get
(a lot) more diagnostic information during the run:
import pipeline.recipereducer
pipeline.recipereducer.reduce(
vis=['../rawdata/yourasdm_1'],
loglevel='debug',
plotlevel='summary',
)
Example workflows for a Pipeline developer¶
For developers, a common workflow is to iteratively run a specific task to reproduce an issue, debug a specific task, and/or implement a new stage.
Pipeline does not currently support snapshotting of the state after each run, and various pipeline tasks do permanently change the stage of the measurement set (flagging, corrected, and/or model column) or calibration state in the context. As a result, it would normally not be representative to keep re-running the same stage multiple times in a row, as each subsequent execution starts from a different state.
Since a full Pipeline run can take a significant amount of time (hours to days depending on the size and complexity of the dataset), a recommended workflow for developers would be to:
run with recipereducer and the
exitstageoption up to the start of the stage you want to debug / implementcreate a tarball of the
workingdirectory, or restore theworkingdirectory from a previously created tarballrun the stage-to-develop/debug individually (regular or in interactive debugger mode)
To this end, it may help to create a couple of custom Python scripts:
A template.script that runs pipeline up to just before the stage where you wish to implement / debug a task.
import pipeline.recipereducer
pipeline.recipereducer.reduce(
vis=['../rawdata/yourasdm'],
loglevel='info',
# exitstage=1, # uncomment this line and set correct stage to stop after.
)
Place it in the parent of the working directory, and run with:
cd working
casa -c ../template.script
A debug.script that executes an individual task:
task_to_run = 'hifa_tsysflag'
import pipeline
from pipeline.infrastructure import task_registry, launcher
context = launcher.Pipeline(context='last', loglevel='info', plotlevel='default').context
taskclass = task_registry.get_pipeline_class_for_task(task_to_run)
inputs = pipeline.infrastructure.vdp.InputsContainer(taskclass, context)
# Optionally override input parameter(s) for debugging, e.g.:
# inputs.normalize_tsys = True
task = taskclass(inputs)
result = task.execute()
result.accept(context)
context.save()
Place it in the parent of the working directory, and run with:
casa -c ../debug.script
Note: as an alternative to re-running an entire task for investigating and debugging issues, see the
infrastructure.utils.utils.function_io_dumper decorator functionality that is documented in FunctionIODumper.md,
introduced in PIPE-2354 (2025 release).
Note: a Pipeline run is relocatable, so one can resume from a working directory (and context) that is located at a different path from where the run was originally started. Depending on the ticket at hand, this can allow a developer a couple of short-cuts:
Create a run up through stage N, tarball
workingto snapshot, and optionally unpack to multiple copies (working1, working2, working3) to run (in parallel) a given stage with different checkouts of the Pipeline.When working on a shared cluster such as at NRAO Charlottesville, one can copy the run from a different developer / PWG member into their own workspace, and resume the context (optionally even one of the pickled results from the stage, located under
pipeline-[...]/saved_state) to inspect the context, re-run a stage with the interactive debugger, etc. This can be particularly useful for times where one has to urgently debug an issue that occurred for a large dataset.
Create custom Pipeline processing recipes from SRDP Mustache templates¶
The SRDP templates are mustache templates that can be used to generate custom SRDP processing recipe XML files. All templates are located in the pipeline/recipes directory (template_*.xml). A mustache template contains both the XML and the mustache tags, the latter of which allows the insertion of values from the JSON file into the XML during rendering.
To generate a custom SRDP recipe, you will need to create a JSON file that contains the custom mustache tag values. Once the JSON file is prepared, the SRDP recipes can be rendered for testing. Two recommended rendering options are below:
Use the demo page on the mustache site. You can copy and paste the text from the JSON file and template into the boxes and click the “Render Template” button.
Use a Python Library, e.g. pystache or chevron, you could do something like this:
import json, pystache
with open('pipeline/recipes/tests/test_hifa_cubeimage.json') as f_json, open('template_hifa_cubeimage.xml') as f_template:
d = json.load(f_json)
t = f_template.read()
with open('recipe.xml', 'w') as out:
out.write(pystache.render(t,d))
Workflow Break/Resume¶
Context-by-Stage¶
The context content at individual stages can be pickled after the completion of each PL task.
The implementation is in infrastructure.basetask and will only be switched on if the pipeline is in the DEBUG (or lower) logging level. This feature works for both PPR and recipereducer runs.
The path of pickled context files is: output_dir/context_name/saved_state/context-stage*.pickle, saved along with result-stage*.pickle which is always present in the same directory.
Break/Resume¶
executepproffers a “break/resume” feature at the workflow level (see the optional keywordsbpactionandbreakpoint). One practical example is below, which is based on the test data available in the pipeline-testdata repository:pl-unittest/uid___A002_Xc46ab2_X15ae_repSPW_spw16_17_small.ms
pl-regressiontest/uid___A002_Xc46ab2_X15ae_repSPW_spw16_17_small/PPR.xml
We first put the above MeasurementSet and PPR in the
rawdatadirectory of your workspace. This specific PPR file already has a breakpoint set after the firsthifa_exportdatacall:To run the PPR up to the breakpoint, at a CASA command prompt,
import os import pipeline.infrastructure.executeppr as eppr os.environ['SCIPIPE_ROOTDIR'] = os.getcwd() eppr.executeppr('../rawdata/PPR.xml', importonly=False, loglevel='debug', bpaction='break')
From the current or a new CASA session, the PPR can be resumed,
import os import pipeline.infrastructure.executeppr as eppr os.environ['SCIPIPE_ROOTDIR'] = os.getcwd() eppr.executeppr('../rawdata/PPR.xml', importonly=False, loglevel='debug', bpaction='resume')
Note: if you try to run the PPR with bpaction=‘resume’ again, the subsequent call(s) will likely fail:
executeppris hardcoded to resume from the “last” context (i.e. a.contextfile with the latest timestamp from your working directory), but we need the context content from the “breakpoint” stage to resume. Also, the file states (e.g. MSs, caltables) have changed. The only safe option is making a copy of the working directory before trying resume in case you might tweak the PPR in another attempt.For development/test purposes, one workaround to avoid copying the entire working directory is to create a fresh copy of the context pickled from the “breakpoint” stage (where your loglevel=‘debug’ setting is crucial) in the existing working directory:
First, you back up the context pickle files from different stages:
cp -rf pipeline-20210421T172403/saved_state pipeline-20210421T172403/saved_state_backupThen, at a CASA command prompt,
import os os.environ['SCIPIPE_ROOTDIR'] = os.getcwd() os.system('cp -rf pipeline-20210421T172403/saved_state_backup/context-stage26.pickle current.context') # we need some cleanup as this MS below is blocking the execution of the stage27 hif_mstransform() call. os.system('rm -rf uid___A002_Xc46ab2_X15ae_repSPW_spw16_17_small_target.ms*') import pipeline.infrastructure.executeppr as eppr eppr.executeppr('../rawdata/PPR.xml', importonly=False, loglevel='debug', bpaction='resume')
Again, please note that the above workaround might yield scientifically meaningless results due to the changing file status and is only useful for testing under certain scenarios. All break/resume approaches use the current files (e.g. MSs/caltables) in your working directory. So be aware of the existence of files/versions that might be unexpected to the resumed PL workflow task call(s)!
With
recipereducer, you can load context saved at a specific stage from the working directory and run/rerun the next PL task designed in the workflow (also see the demonstration in the last section).A full recipe run with the above test data example can be done via,
import pipeline.recipereducer, os pipeline.recipereducer.reduce(vis=['../rawdata/uid___A002_Xc46ab2_X15ae_repSPW_spw16_17_small.ms'], procedure='procedure_hifa_calimage.xml', loglevel='debug')
With the context pickled from individual stages, you may pick and test a single pipeline stage in your development:
task_to_run='hif_checkproductsize' task_keywords={'maxcubesize':40.0,'maxcubelimit':60,'maxproductsize':500.0} import pipeline from pipeline.infrastructure import task_registry, launcher context = launcher.Pipeline(context='pipeline-procedure_hifa_calimage/saved_state/context-stage24.pickle', loglevel='debug', plotlevel='default').context taskclass = task_registry.get_pipeline_class_for_task(task_to_run) inputs = pipeline.infrastructure.vdp.InputsContainer(taskclass, context, **task_keywords) task = taskclass(inputs) result = task.execute() result.accept(context) context.save('test-context-stage25.pickle')
recipereducerdoesn’t offer the “breakpoint” feature built intoexecuteppr. However, a careful use of thestartstage/exitstage/contextkeywords can achieve the same workflow-level “break/resume”:from pipeline.infrastructure import task_registry, launcher context = launcher.Pipeline(context='pipeline-procedure_hifa_calimage/saved_state/context-stage3.pickle', loglevel='debug', plotlevel='default').context import pipeline.recipereducer pipeline.recipereducer.reduce(vis=['../rawdata/uid___A002_Xc46ab2_X15ae_repSPW_spw16_17_small.ms'], procedure='procedure_hifa_calimage.xml', loglevel='debug',startstage=4,exitstage=20, context=context)
Re-render Weblog¶
A common development task is improving weblog. Without re-running a time-consuming pipeline task itself, you can only re-render the weblog using the existing context/result to test a small weblog-related change (e.g., minor tweaks in mako templates). Note: this will only rerun the weblog rendering portion of a pipeline stage (therefore limits your testing scope).
import os
import pipeline.infrastructure.renderer.htmlrenderer as hr
from pipeline.infrastructure import launcher
# Modify this to select which stage number to re-render weblog for.
os.environ['WEBLOG_RERENDER_STAGES'] = "16"
context = launcher.Pipeline(context='last', loglevel='debug', plotlevel='default').context
hr.WebLogGenerator.render(context)
Re-run QA heuristics¶
A common development task is to implement or improve the QA heuristics for a task.
The QA step is normally run after the main task heuristics are completed and the
task Results have been created.
As such, after having run the task at least once, one can rapidly re-run the QA heuristic on the existing task result in the context to iterate more rapidly on the heuristic without having to re-run the time-consuming pipeline task itself.
from pipeline.infrastructure import pipelineqa, launcher
context = launcher.Pipeline(context='last', loglevel='debug', plotlevel='default').context
# Modify this to select which stage to run QA for.
stage_number = 14
task_results = context.results[stage_number-1].read()
pipelineqa.qa_registry.do_qa(context, task_results)
Note, the snippet above only re-runs the QA heuristic on a copy of the results read in from disk, and updates the QA scores in this copy of the results in-place. However, this does not modify the results on disk nor does it attempt to re-run the weblog rendering; therefore re-running the QA this way will not update the QA scores that are displayed in the task weblog. Instead, the snippet is primarily useful for interactively debugging the QA heuristics. Once debugged, it would be recommended to re-run the entire stage.
Note on using executeppr/recipereducer and Pipeline CLI tasks¶
All the above use cases rely on a global pipeline context object that holds information about the pipeline processing state. Be careful when executeppr/recipereducer and CLI tasks are used together in a single CASA session, especially when running executeppr/recipereducer in the middle of interactive processing with CLI tasks, e.g.,
h_initto start interactive processing (creates context #1 and registers it in the global scope)run various tasks interactively (with context #1)
run
executeppr(creates context #2 and overwrites global context #1)run some more tasks interactively (with context #2)
In this example, steps 2 and 4 are no longer continuous because they refer to different context objects. In such cases, save the context object to disk before running executeppr/recipereducer and resume it afterwards. The above workflow should be tweaked as follows:
h_initto start interactive processing (creates context #1 and register it to global scope)run various tasks interactively (with context #1)
save current state to disk with
h_save('interactive.context')(save context #1 to disk)run executeppr (creates context #2 and overwrite global context #1)
resume previous state from disk with
h_resume('interactive.context')(load context #1 from disk and overwrite global context #2)run some more tasks interactively (with context #1)