Source code for pipeline.hifv.tasks.hanning.hanning

# Do not evaluate type annotations at definition time.
from __future__ import annotations

import os
import shutil
from typing import TYPE_CHECKING

from pipeline import infrastructure
from pipeline.domain.measures import FrequencyUnits
from pipeline.infrastructure import basetask, casa_tasks, casa_tools, task_registry, vdp
from pipeline.infrastructure.utils import conversion, find_ranges

LOG = infrastructure.logging.get_logger(__name__)

if TYPE_CHECKING:
    from collections.abc import Callable

    from pipeline.infrastructure.api import Results
    from pipeline.infrastructure.jobrequest import JobRequest
    from pipeline.infrastructure.launcher import Context


class HanningInputs(vdp.StandardInputs):
    """Inputs class for the hifv_hanning pipeline smoothing task.  Used on VLA measurement sets.

    The class inherits from vdp.StandardInputs.

    """
    maser_detection = vdp.VisDependentProperty(default=True)
    spws_to_smooth = vdp.VisDependentProperty(default=None)

    # docstring and type hints: supplements hifv_hanning
    def __init__(
            self,
            context: Context,
            vis: str | None = None,
            maser_detection: bool | None = None,
            spws_to_smooth: str | None = None,
            ):
        """
        Args:
            context: Pipeline context

            vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the hifv_importdata task.

            maser_detection: Run maser detect algorithm on spectral line windows if spws_to_smooth is None. Defaults to True.

            spws_to_smooth: A CASA-style range of spw IDs indicating which ones to smooth.

                Example: '1,2~4,7' indicates spws 1, 2, 3, 4, and 7 should be smoothed.

        """
        super().__init__()
        self.context = context
        self.vis = vis
        self.maser_detection = maser_detection
        self.spws_to_smooth = spws_to_smooth

        if self.spws_to_smooth is not None:
            self.spws_to_smooth = conversion.range_to_list(self.spws_to_smooth)


class HanningResults(basetask.Results):
    """Results class for the hifv_hanning pipeline smoothing task.  Used on VLA measurement sets.

    The class inherits from basetask.Results

    """
    def __init__(
            self,
            task_successful: bool,
            qa_message: str,
            final: list | None = None,
            pool: list | None = None,
            preceding: list | None = None,
            smoothed_spws: dict[int, tuple[bool, str]] | None = None,
            ):
        """
        Args:
            task_successful: Indicates if the task completed successfully or not.
            qa_message: Information about the outcome of hanningsmooth task to be displayed in the weblog.
            final: Final list of tables (not used in this task)
            pool: Pool list (not used in this task)
            preceding: Preceding list (not used in this task)
            smoothed_spws: Information about spws, including whether they were smoothed and the reason for it.
        """
        if final is None:
            final = []
        if pool is None:
            pool = []
        if preceding is None:
            preceding = []
        if smoothed_spws is None:
            smoothed_spws = {}

        super().__init__()

        self.task_successful = task_successful
        self.qa_message = qa_message
        self.vis = None
        self.pool = pool[:]
        self.final = final[:]
        self.preceding = preceding[:]
        self.error = set()
        self.smoothed_spws = smoothed_spws

    def merge_with_context(self, context: Context) -> None:
        """
        Args:
            context: Pipeline context object
        """
        m = context.observing_run.measurement_sets[0]


[docs] @task_registry.set_equivalent_casa_task('hifv_hanning') class Hanning(basetask.StandardTaskTemplate): """Class for the hifv_hanning pipeline smoothing task. Used on VLA measurement sets. The class inherits from basetask.StandardTaskTemplate """ Inputs = HanningInputs
[docs] def prepare(self) -> HanningResults: """Method where the hanning smoothing operation is executed. The MS SPECTRAL_WINDOW table is examined to see if the SDM_NUM_BIN value is greater than 1. If the value is great than 1, then hanning smoothing does not proceed. The CASA task hanningsmooth() is executed on the data, creating a temporary measurement set (MS). The original MS is removed from disk, and the temporary MS is renamed to the original MS. An exception in thrown if an error occurs. Returns: HanningResults() type object """ with casa_tools.TableReader(self.inputs.vis + '/SPECTRAL_WINDOW') as table: if 'OFFLINE_HANNING_SMOOTH' in table.colnames(): qa_message = "MS has already had offline hanning smoothing applied. Skipping this stage." LOG.warning(qa_message) return HanningResults(task_successful=True, qa_message=qa_message) spws = self.inputs.context.observing_run.get_ms(self.inputs.vis).get_spectral_windows(science_windows_only=True) smoothing_dict = {} # Smooth input spws only if applicable. Overrides everything else if self.inputs.spws_to_smooth is not None: for spw in spws: if spw.id in self.inputs.spws_to_smooth: smoothing_dict[spw.id] = (True, "restored or user-defined smoothing") else: smoothing_dict[spw.id] = (False, "") else: # Retrieve SPWs information and determine which to smooth if not self.inputs.maser_detection: LOG.info("Maser detection turned off.") # If any spws had online smoothing applied, do not smooth any spws if any([spw.sdm_num_bin > 1 for spw in spws]): for spw in spws: smoothing_dict[spw.id] = (False, "online smoothing applied") else: for spw in spws: smoothing_dict[spw.id] = (False, "") if spw.specline_window: if self.inputs.maser_detection and self._checkmaserline(str(spw.id)): smoothing_dict[spw.id] = (True, "spectral line, maser line") else: smoothing_dict[spw.id] = (False, "spectral line") else: smoothing_dict[spw.id] = (True, "continuum") hs_dict = {key: val[0] for key, val in smoothing_dict.items()} # Update spws_to_smooth if it was automatically calculated (not user-provided) if self.inputs.spws_to_smooth is None: self.inputs.spws_to_smooth = [spw_id for spw_id, should_smooth in hs_dict.items() if should_smooth] task_successful = True qa_message = "Hanning smoothing task completed successfully." if not any(hs_dict.values()): qa_message = "None of the science spectral windows were selected for smoothing." LOG.info(qa_message) self._track_hsmooth(hs_dict) return HanningResults(task_successful=True, qa_message=qa_message, smoothed_spws=smoothing_dict) # Only proceed with smoothing if needed if all(hs_dict.values()): LOG.info("All science spectral windows were selected for hanning smoothing.") else: smoothing_windows = [str(x) for x, y in hs_dict.items() if y] message = find_ranges(smoothing_windows) LOG.info("Smoothing spectral window(s) %s.", message) try: self._do_hanningsmooth() temp_ms = 'temphanning.ms' # Verify temp MS was created if not os.path.exists(temp_ms): raise RuntimeError(f'Expected output {temp_ms} was not created') LOG.info("Removing original VIS %s", self.inputs.vis) shutil.rmtree(self.inputs.vis) LOG.info("Renaming %s to %s", temp_ms, self.inputs.vis) os.rename(temp_ms, self.inputs.vis) except Exception as ex: qa_message = f'Problem encountered with hanning smoothing task: {ex}' LOG.warning(qa_message) task_successful = False # Cleanup: if temp MS exists but original is gone, try to restore if os.path.exists(temp_ms) and not os.path.exists(self.inputs.vis): try: LOG.warning("Attempting to recover by renaming temporary MS") os.rename(temp_ms, self.inputs.vis) except Exception as cleanup_ex: LOG.error("Failed to recover MS: %s", cleanup_ex) # If both exist, remove temp to avoid confusion elif os.path.exists(temp_ms) and os.path.exists(self.inputs.vis): try: LOG.info("Removing incomplete temporary MS") shutil.rmtree(temp_ms) except Exception as cleanup_ex: LOG.warning("Failed to cleanup temporary MS: %s", cleanup_ex) # Adding column to SPECTRAL_WINDOW table to indicate whether the SPW was smoothed (True) or not (False) self._track_hsmooth(hs_dict) return HanningResults(task_successful=task_successful, qa_message=qa_message, smoothed_spws=smoothing_dict)
[docs] def analyse(self, results: Results) -> Results: """Method to analyse the results of the hanning smoothing operation. Args: results: Results object from the prepare() method. Returns: The same Results object passed in, unmodified. """ return results
def _do_hanningsmooth(self) -> Callable[[JobRequest], Results]: """Execute the CASA task hanningsmooth Returns: The `execute` function of an Executor class, which returns a result dictionary """ task = casa_tasks.hanningsmooth(vis=self.inputs.vis, datacolumn='data', outputvis='temphanning.ms', smooth_spw=self.inputs.spws_to_smooth) return self._executor.execute(task) def _checkmaserline(self, spw: str) -> bool: """Confirm if known maser line(s) appear in frequency range of spectral window Args: spw: spectral window number Returns: True if maser line may exist in window; False otherwise """ LOG.debug("Checking for maser line contamination in spw %s.", spw) def freq_to_vel(rest_freq: float, obs_freq: float) -> float: c_kms = 2.99792458e5 return ((rest_freq - obs_freq) / rest_freq) * c_kms maser_dict = { 'OH (1)': 1612231000, 'OH (2)': 1665401800, 'OH (3)': 1667359000, 'OH (4)': 1720530000, 'H2O': 22235080000, 'CH3OH (1)': 6668519200, 'CH3OH (2)': 1217859700, 'SiOv0': 43423858000, 'SiOv1': 43122079000, 'SiOv2': 42820582000, 'SiOv3': 42519373000, '29SiOv0': 42879916000, '30SiOv0': 42373359000, 'SiS': 18154880000, } qaTool = casa_tools.quanta suTool = casa_tools.synthesisutils ms_info = self.inputs.context.observing_run.get_ms(self.inputs.vis) if ms_info is None: LOG.error("Measurement set %s not found in observing run.", self.inputs.vis) return False spw_info = ms_info.get_spectral_window(spw) freq_low = spw_info._min_frequency.convert_to(newUnits=FrequencyUnits.HERTZ).value freq_high = spw_info._max_frequency.convert_to(newUnits=FrequencyUnits.HERTZ).value if spw_info._ref_frequency_frame != 'TOPO': LOG.info("Spectral window reference frame not TOPO. Skipping maser detection.") return False to_lsrk = suTool.advisechansel(msname=ms_info.name, spwselection=spw, getfreqrange=True, freqframe='LSRK') freq_low = float(qaTool.getvalue(qaTool.convert(to_lsrk['freqstart'], 'Hz'))[0]) freq_high = float(qaTool.getvalue(qaTool.convert(to_lsrk['freqend'], 'Hz'))[0]) LOG.debug("Freq low: %f; Freq high: %f", freq_low, freq_high) for value in maser_dict.values(): vel_low = freq_to_vel(value, freq_low) vel_high = freq_to_vel(value, freq_high) if (freq_low <= value <= freq_high) or abs(vel_low) <= 200 or abs(vel_high) <= 200: LOG.info("Maser line possible in spw %s. Hanning smoothing will be applied.", spw) return True return False def _track_hsmooth(self, hs_dict: dict[int, bool]) -> None: """Modify SPECTRAL_WINDOW table to track hanning smoothing Args: hs_dict: hanning smoothing dictionary to write in SPECTRAL_WINDOW table """ LOG.info("Writing Hanning smoothing information to SPECTRAL_WINDOW table of MS %s.", self.inputs.vis) desc = {'OFFLINE_HANNING_SMOOTH': {'comment': 'Offline Hanning Smooth Flag', 'dataManagerGroup': 'StandardStMan', 'dataManagerType': 'StandardStMan', 'keywords': {}, 'maxlen': 0, 'option': 0, 'valueType': 'boolean'}} with casa_tools.TableReader(self.inputs.vis + '/SPECTRAL_WINDOW', nomodify=False) as tb: tb.addcols(desc) for spw, value in hs_dict.items(): tb.putcell('OFFLINE_HANNING_SMOOTH', spw, value)