Source code for pipeline.domain.spectralwindow

"""
Store frequency information of MeasurementSet.

This module provides classes to store logical representation of spectral
windows, channel frequency information, and channel selection.
"""

# Do not evaluate type annotations at definition time.
from __future__ import annotations

import decimal
import itertools
import operator
from typing import TYPE_CHECKING

import numpy

import pipeline.infrastructure as infrastructure
from pipeline.infrastructure import casa_tools

from . import measures

if TYPE_CHECKING:
    from collections.abc import Iterable, Iterator, Sequence

LOG = infrastructure.logging.get_logger(__name__)


class ArithmeticProgression:
    """
    A representation of an arithmetic progression that can generate sequence
    elements on demand.
    """
    __slots__ = ('start', 'delta', 'num_terms')

    def __getstate__(self) -> tuple[int | float, int | float, int]:
        return self.start, self.delta, self.num_terms

    def __setstate__(self, state: tuple[int | float, int | float, int]) -> None:
        self.start, self.delta, self.num_terms = state

    def __init__(self, start: int | float, delta: int | float, num_terms: int) -> None:
        """
        Initialize an ArithmeticProgression object.

        Args:
            start: Value of first element in the progression.
            delta: Common difference between each element in the progression.
            num_terms: Number of elements in the progression.
        """
        self.start = start
        self.delta = delta
        self.num_terms = num_terms

    def __iter__(self):
        g = itertools.count(self.start, self.delta)
        return itertools.islice(g, 0, self.num_terms)

    def __len__(self) -> int:
        return self.num_terms

    def __getitem__(self, index: int):
        if abs(index) >= self.num_terms:
            raise IndexError
        if index < 0:
            index += self.num_terms
        return self.start + index * self.delta


def compress(values: Sequence[int | float]) -> Sequence[int | float] | ArithmeticProgression:
    """
    Compress (if possible) a sequence of values.

    If the numbers in the given list constitute an arithmetic progression,
    return an ArithmeticProgression object that summarises it as such. If
    the list cannot be summarised as a simple arithmetic progression,
    return the list as given.
    """
    deltas = set(numpy.diff(values))
    if len(deltas) == 1:
        delta = deltas.pop()
        return ArithmeticProgression(values[0], delta, len(values))
    else:
        return values


class ChannelList:
    """
    A container/generator for Channel objects.

    A spectral window can contain thousands of channels. Rather than store all of
    these objects, a ChannelList generates and returns them lazily, on-demand.
    """
    def __init__(self,
                 chan_freqs: Sequence[int | float] | ArithmeticProgression,
                 chan_widths: Sequence[int | float] | ArithmeticProgression,
                 effbw: Sequence[int | float] | ArithmeticProgression) -> None:
        """
        Initialize a ChannelList object.

        Args:
            chan_freqs: Sequence or ArithmeticProgression of center frequency of
                channel (in Hertz) for all channels in ChannelList.
            chan_widths: Sequence or ArithmeticProgression of channel width (in
                Hertz) for all channels in ChannelList.
            effbw: Sequence or ArithmeticProgression of effective bandwidth (in
                Hertz) for all channels in ChannelList.
        """
        assert len(chan_freqs) == len(chan_widths) == len(effbw)
        self.chan_freqs = chan_freqs
        self.chan_widths = chan_widths
        self.chan_effbws = effbw

    def __iter__(self) -> Iterator[Channel]:
        raw_channel_data = list(zip(self.chan_freqs, self.chan_widths, self.chan_effbws))
        for chan_centre, chan_width, chan_effective_bw in raw_channel_data:
            yield self.__create_channel(chan_centre, chan_width, chan_effective_bw)

    def __len__(self) -> int:
        return len(self.chan_freqs)

    def __getitem__(self, index: int) -> Channel:
        return self.__create_channel(self.chan_freqs[index],
                                     self.chan_widths[index],
                                     self.chan_effbws[index])

    @staticmethod
    def __create_channel(centre: int | float, width: int | float, effective_bw: int | float) -> Channel:
        """
        Return a Channel object for given parameters.

        Args:
            centre: Center frequency of the channel (in Hertz).
            width: Frequency width of the channel (in Hertz).
            effective_bw: Effective bandwidth of the channel (in Hertz).

        Returns:
            Channel object.
        """
        dec_centre = decimal.Decimal(centre)
        dec_width = decimal.Decimal(width)
        # The abs() call is not strictly necessary as Channel extends
        # FrequencyRange, whose .set() method swaps low and high when
        # low > high.
        delta = abs(dec_width) / decimal.Decimal(2)

        f_lo = measures.Frequency(dec_centre - delta,
                                  measures.FrequencyUnits.HERTZ)
        f_hi = measures.Frequency(dec_centre + delta,
                                  measures.FrequencyUnits.HERTZ)
        f_bw = measures.Frequency(effective_bw,
                                  measures.FrequencyUnits.HERTZ)
        return Channel(f_lo, f_hi, f_bw)


class Channel:
    """
    Representation of a channel within a spectral window.

    This object can be considered as a FrequencyRange object plus an effective
    bandwidth property. It provides the same interface as a FrequencyRange.

    Attributes:
        frequency_range: FrequencyRange object denoting the low- and high-end
            frequencies of this channel (in Hertz).
        effective_bw: The effective bandwidth of this channel (in Hertz).
    """
    __slots__ = ('frequency_range', 'effective_bw')

    def __init__(self, start: measures.Frequency, end: measures.Frequency, effective_bw: measures.Frequency) -> None:
        """Creates a new Channel with the given start and end frequency and
        effective bandwidth.

        Args:
            start: Start frequency of channel (in Hertz).
            end: End frequency of channel (in Hertz).
            effective_bw: Effective bandwidth of channel (in Hertz).
        """
        self.frequency_range = measures.FrequencyRange(start, end)
        self.effective_bw = effective_bw

    def __getstate__(self) -> tuple[measures.FrequencyRange, measures.Frequency]:
        return self.frequency_range, self.effective_bw

    def __setstate__(self, state: tuple[measures.FrequencyRange, measures.Frequency]) -> None:
        self.frequency_range, self.effective_bw = state

    def __eq__(self, other: Channel) -> bool:
        if not isinstance(other, Channel):
            return False

        if other.frequency_range != self.frequency_range:
            return False
        if other.effective_bw != self.effective_bw:
            return False

        return True

    def __ne__(self, other: Channel) -> bool:
        return not self.__eq__(other)

    def __repr__(self) -> str:
        return 'Channel(%s, %s, %s)' % (self.frequency_range.low,
                                        self.frequency_range.high,
                                        self.effective_bw)

    @property
    def low(self) -> measures.Frequency:
        """Return the low end frequency of the Channel."""
        return self.frequency_range.low

    @property
    def high(self) -> measures.Frequency:
        """Returns the high end frequency of the Channel."""
        return self.frequency_range.high

    def contains(self, frequency: measures.Frequency | measures.FrequencyRange) -> bool:
        """Returns whether the Channel contains given frequency (range)."""
        return self.frequency_range.contains(frequency)

    def convert_to(self, newUnits: dict = measures.FrequencyUnits.GIGAHERTZ) -> measures.FrequencyRange:
        """Converts both endpoints of this Channel (frequency range) to the
        given units.

        This converts the endpoint frequency of the endpoints in-place, as well
        as returns a reference to the frequency range of this Channel.

        Args:
            newUnits: New units to convert the frequencies to. By default, this
                will convert to Gigahertz.
        """
        return self.frequency_range.convert_to(newUnits)

    def getCentreFrequency(self) -> measures.Frequency:
        """Returns the center frequency of the Channel."""
        return self.frequency_range.getCentreFrequency()

    def getOverlapWith(self, other: measures.FrequencyRange) -> measures.FrequencyRange:
        """Return a new frequency range that represents the region of overlap
        between this Channel (frequency range) and "other". If there is no
        overlap, None is returned.

        Args:
            other: Another frequency range that may overlap this Channel.

        Returns:
            FrequencyRange object representing the overlapping region of this
            range and other, or None if there is no overlap.
        """
        return self.frequency_range.getOverlapWith(other)

    def getGapBetween(self, other: measures.FrequencyRange) -> measures.FrequencyRange | None:
        """Returns a new frequency range that represents the region of frequency
        space between this Channel (frequency range) and ``other``. If the other
        range is coincident with, or overlaps, this Channel, None is returned.
        If the other range is None, None is returned.

        Args:
            other: Channel/frequency range for which to assess frequency gap
                with this Channel.

        Returns:
            The frequency gap between this range and ``other``."""
        return self.frequency_range.getGapBetween(other)

    def getWidth(self) -> measures.Frequency:
        """Returns the width of the Channel."""
        return self.frequency_range.getWidth()

    def overlaps(self, other: Channel) -> bool:
        """
        Returns whether this Channel overlaps with given ``other`` Channel.

        The Channel spans a frequency range that is a closed interval, that is,
        one that contains both of its endpoints.

        Args:
            other: Another Channel (frequency range) that may overlap this one.

        Returns:
            True if this Channel overlaps with ``other``. If ``other`` is not a
            Channel, the return value is False.
        """
        if isinstance(other, Channel):
            return self.frequency_range.overlaps(other.frequency_range)
        return False

    def set(self, frequency1: measures.Frequency, frequency2: measures.Frequency) -> None:
        """
        Set frequency range for this Channel based on given frequency endpoints.

        Args:
            frequency1: One endpoint of frequency range for this Channel.
            frequency2: Other endpoint of frequency range for this Channel.
        """
        self.frequency_range.set(frequency1, frequency2)


[docs] class SpectralWindow: """A logical representation of a spectral window (spw). Attributes: id: The numerical identifier of this spectral window within the SPECTRAL_WINDOW subtable of the MeasurementSet. band: Frequency band. bandwidth: The total bandwidth. baseband: The baseband. channels: Frequency information of each channel in spectral window, i.e., frequencies, channel width, effective bandwidth. freq_lo: A list of LO frequencies. intents: the observing intents that have been observed using this spectral window. mean_frequency: Mean frequency of spectral window. name: Spectral window name. receiver: Receiver type, e.g., 'TSB'. ref_frequency: The reference frequency. sideband: Side band. transitions: Spectral transitions recorded associated with spectral window. type: Spectral window type, e.g., 'TDM'. sdm_num_bin: Number of bins for online spectral averaging. correlation_bits: Number of bits used for correlation. median_receptor_angle: Median feed receptor angle. specline_window: Whether spw is intended for spectral line or continuum (VLA only). grouping_id: Grouping ID to uniquely identify this spw in different MOUS within the same GOUS (ALMA-only, introduced in Cycle 12). """ __slots__ = ('id', 'band', 'bandwidth', 'type', 'intents', 'ref_frequency', 'name', 'baseband', 'sideband', 'receiver', 'freq_lo', 'mean_frequency', '_min_frequency', '_max_frequency', '_centre_frequency', 'channels', '_ref_frequency_frame', 'spectralspec', 'transitions', 'sdm_num_bin', 'correlation_bits', 'median_receptor_angle', 'specline_window', 'grouping_id') def __init__( self, spw_id: int, name: str, spw_type: str, bandwidth: float, ref_freq: dict, mean_freq: float, chan_freqs: numpy.ndarray, chan_widths: numpy.ndarray, chan_effective_bws: numpy.ndarray, sideband: int, baseband: int, receiver: str | None, freq_lo: list[float] | numpy.ndarray | None, band: str = 'Unknown', spectralspec: str | None = None, transitions: list[str] | None = None, sdm_num_bin: int | None = None, correlation_bits: str | None = None, median_receptor_angle: numpy.ndarray | None = None, specline_window: bool = False, grouping_id: str | None = None ): """ Initialize SpectralWindow class. Args: spw_id: Spw ID. name: Spw name. spw_type: Spectral window type, e.g., 'TDM'. bandwidth: The total bandwidth. ref_freq: The reference frequency, as a CASA 'frequency' measure dictionary. mean_freq: Mean frequency of spectral window in Hz. chan_freqs: A list of frequency of each channel in spw in Hz. chan_widths: A list of channel width of each channel in spw in Hz. chan_effective_bws: A list of effective bandwidth of each channel in spw in Hz. sideband: Side band. baseband: The baseband. receiver: Receiver type, e.g., 'TSB'. freq_lo: A list of LO frequencies in Hz. band: Frequency band. spectralspec: SpectralSpec name. transitions: Spectral transitions recorded associated with spectral window. sdm_num_bin: Number of bins for online spectral averaging. correlation_bits: Number of bits used for correlation. median_receptor_angle: Median feed receptor angle. specline_window: Whether spw is intended for spectral line or continuum (VLA only). grouping_id: GOUS ID used to identify project group level (new in Cycle 12 ALMA data) TODO improve description """ if transitions is None: transitions = ['Unknown'] self.id = spw_id self.bandwidth = measures.Frequency(bandwidth, measures.FrequencyUnits.HERTZ) ref_freq_hz = casa_tools.quanta.convertfreq(ref_freq['m0'], 'Hz') ref_freq_val = casa_tools.quanta.getvalue(ref_freq_hz)[0] self.ref_frequency = measures.Frequency(ref_freq_val, measures.FrequencyUnits.HERTZ) self._ref_frequency_frame = ref_freq['refer'] self.mean_frequency = measures.Frequency(mean_freq, measures.FrequencyUnits.HERTZ) self.band = band self.type = spw_type self.spectralspec = spectralspec self.intents = set() # work around NumPy bug with empty strings # http://projects.scipy.org/numpy/ticket/1239 self.name = str(name) self.sideband = str(sideband) self.baseband = str(baseband) self.receiver = receiver if freq_lo is not None: self.freq_lo = [measures.Frequency(freq, measures.FrequencyUnits.HERTZ) for freq in freq_lo] else: self.freq_lo = freq_lo chan_freqs = compress(chan_freqs) chan_widths = compress(chan_widths) chan_effective_bws = compress(chan_effective_bws) self.channels = ChannelList(chan_freqs, chan_widths, chan_effective_bws) self._min_frequency: measures.Frequency = min(self.channels, key=lambda r: r.low).low self._max_frequency: measures.Frequency = max(self.channels, key=lambda r: r.high).high self._centre_frequency: measures.Frequency = (self._min_frequency + self._max_frequency) / 2.0 self.transitions = transitions self.sdm_num_bin = sdm_num_bin self.correlation_bits = correlation_bits self.median_receptor_angle = median_receptor_angle self.specline_window = specline_window self.grouping_id = grouping_id def __getstate__(self) -> tuple: """Define what to pickle as a class instance.""" return (self.id, self.band, self.bandwidth, self.type, self.intents, self.ref_frequency, self.name, self.baseband, self.sideband, self.receiver, self.freq_lo, self.mean_frequency, self._min_frequency, self._max_frequency, self._centre_frequency, self.channels, self._ref_frequency_frame, self.spectralspec, self.transitions, self.sdm_num_bin, self.correlation_bits, self.median_receptor_angle, self.specline_window, self.grouping_id) def __setstate__(self, state: tuple) -> None: """Define how to unpickle a class instance.""" (self.id, self.band, self.bandwidth, self.type, self.intents, self.ref_frequency, self.name, self.baseband, self.sideband, self.receiver, self.freq_lo, self.mean_frequency, self._min_frequency, self._max_frequency, self._centre_frequency, self.channels, self._ref_frequency_frame, self.spectralspec, self.transitions, self.sdm_num_bin, self.correlation_bits, self.median_receptor_angle, self.specline_window, self.grouping_id) = state def __repr__(self) -> str: chan_freqs = self.channels.chan_freqs if isinstance(chan_freqs, ArithmeticProgression): chan_freqs = numpy.array(list(chan_freqs)) chan_widths = self.channels.chan_widths if isinstance(chan_widths, ArithmeticProgression): chan_widths = numpy.array(list(chan_widths)) chan_effective_bws = self.channels.chan_effbws if isinstance(chan_effective_bws, ArithmeticProgression): chan_effective_bws = numpy.array(list(chan_effective_bws)) return ('SpectralWindow({0!r}, {1!r}, {2!r}, {3!r}, {4!r}, {5!r}, {6}, {7}, {8}, {9!r}, {10!r}, {11!r}, ' '{12!r}, {13}, {14}, {15!r}, {16}, {17})').format( self.id, self.name, self.type, float(self.bandwidth.to_units(measures.FrequencyUnits.HERTZ)), dict(m0={'unit': 'Hz', 'value': float(self.ref_frequency.to_units(measures.FrequencyUnits.HERTZ))}, refer=self._ref_frequency_frame, type='frequency'), float(self.mean_frequency.to_units(measures.FrequencyUnits.HERTZ)), 'numpy.array(%r)' % chan_freqs.tolist(), 'numpy.array(%r)' % chan_widths.tolist(), 'numpy.array(%r)' % chan_effective_bws.tolist(), self.sideband, self.baseband, self.band, self.transitions, self.sdm_num_bin, self.correlation_bits, self.median_receptor_angle, self.specline_window, self.grouping_id ) @property def centre_frequency(self) -> measures.Frequency: """Return the center frequency of the SpectralWindow.""" return self._centre_frequency
[docs] def channel_range(self, minfreq: measures.Frequency, maxfreq: measures.Frequency) \ -> tuple[int, int] | tuple[None, None]: """ Return the channel range for given minimum/maximum frequency. Args: minfreq: Minimum frequency as measures.Frequency object. maxfreq: Maximum frequency as measures.Frequency object. Returns: 2-Tuple of integers representing indices (min, max) of channel range end-points corresponding to given min/max frequency. """ freqmin = minfreq freqmax = maxfreq # Check for the no overlap case. nchan = self.num_channels if freqmax < self.min_frequency: return None, None if freqmin > self.max_frequency: return None, None # Find the minimum channel chan_min = 0 if self.channels[0].low < self.channels[nchan-1].low: for i in range(nchan): if self.channels[i].low > freqmin: break chan_min = i else: for i in range(nchan): if self.channels[i].high < freqmax: break chan_min = i # Find the maximum channel chan_max = nchan - 1 if self.channels[0].low < self.channels[nchan-1].low: for i in range(nchan-1, -1, -1): if self.channels[i].high < freqmax: break chan_max = i else: for i in range(nchan-1, -1, -1): if self.channels[i].low > freqmin: break chan_max = i return chan_min, chan_max
@property def frame(self) -> str: """Return the reference frame code of the SpectralWindow.""" return self._ref_frequency_frame @property def min_frequency(self) -> measures.Frequency: """Return the minimum frequency of the SpectralWindow.""" return self._min_frequency @property def max_frequency(self) -> measures.Frequency: """Return the maximum frequency of the SpectralWindow.""" return self._max_frequency @property def num_channels(self) -> int: """Return the number of channels in the SpectralWindow.""" return len(self.channels) def __str__(self) -> str: args = [str(x) for x in [self.id, self.centre_frequency, self.bandwidth, self.type]] return 'SpectralWindow({0})'.format(', '.join(args))
[docs] class SpectralWindowWithChannelSelection: """Wrapper for SpectralWindow that includes channel selection in the ID. This class wraps a SpectralWindow and modifies its ID property to include a channel selection string (e.g., '0:10~20;30~40'). Attributes: id: Spectral window ID with appended channel selection string. """ def __init__(self, subject: SpectralWindow, channels: Iterable[int]) -> None: self._subject = subject channels = sorted(list(channels)) # prepare a string representation of the number of channels. If all # channels are specified for this spw, just specify the spw in the # string representation if set(channels).issuperset(set(range(0, subject.num_channels))): ranges = [] else: ranges = [] for _, g in itertools.groupby(enumerate(channels), lambda i_x: i_x[0] - i_x[1]): rng = list(map(operator.itemgetter(1), g)) if len(rng) == 1: ranges.append('%s' % rng[0]) else: ranges.append('%s~%s' % (rng[0], rng[-1])) self._channels = ';'.join(ranges) def __getattr__(self, name): return getattr(self._subject, name) @property def id(self) -> str: """Returns the spectral window ID including channel selection.""" channels = ':%s' % self._channels if self._channels else '' return f'{self._subject.id}{channels}'
def match_spw_basename(name1: str, name2: str) -> bool: """Determines if one SPW name is a suffix of the other, indicating equivalent SPW configurations. This function identifies spectral window settings that are functionally identical across different observation cycles, despite variations in naming conventions due to evolving metadata convention. The comparison is performed bidirectionally to accommodate naming changes where prefixes are usually added in newer cycles. ALMA naming convention change examples: Cycle 1-2 to Cycle 3-11 (partID addition for spectralspec identifiers): Before: `ALMA_RB_03#BB_1#SW-01#FULL_RES` After: `X425992413#ALMA_RB_03#BB_2#SW-01#FULL_RES` Cycle 3-11 to Cycle 12 (groupID addition): Before: `X870829884#ALMA_RB_03#BB_1#SW-01#FULL_RES` After: `X127/X45069401/X3370#X870829884#ALMA_RB_03#BB_1#SW-01#FULL_RES` Note: Cycle 0 data have fallback SPW names ("spw_i" where `i` is the SPW ID) in the `name` property of `SpectralWindow` domain objects since no specific names are assigned in the MS subtables. Args: name1: First spectral window name for comparison name2: Second spectral window name for comparison Returns: True if either name is a suffix of the other, False otherwise Raises: ValueError: If either name is shorter than 5 characters (insufficient for reliable matching) """ # Validate minimum length requirement based on Pipeline fallback naming (spw_i) min_length = 5 if len(name1) < min_length or len(name2) < min_length: msg = ( f"SPW names too short for comparison: name1='{name1}', name2='{name2}'. " f"Minimum length is {min_length} characters." ) LOG.error(msg) raise ValueError(msg) return name1.endswith(name2) or name2.endswith(name1)