Source code for pipeline.hifa.tasks.antpos.almaantpos

from __future__ import annotations

import json
import os
import traceback
from typing import TYPE_CHECKING

import numpy as np

from pipeline import infrastructure
from pipeline.hif.tasks.antpos import antpos
from pipeline.hif.tasks.antpos.antpos import AntposResults
from pipeline.infrastructure import casa_tasks, casa_tools, exceptions, task_registry, utils, vdp

if TYPE_CHECKING:
    from typing import Literal

    from numpy import floating
    from numpy.typing import NDArray

    from pipeline.infrastructure.launcher import Context

__all__ = [
    'ALMAAntpos',
    'ALMAAntposInputs'
]

LOG = infrastructure.logging.get_logger(__name__)


class ALMAAntposInputs(antpos.AntposInputs):
    """
    ALMAAntposInputs defines the inputs for the ALMAAntpos pipeline task.
    """
    # These are ALMA specific settings and override the defaults in
    # the base class.

    hm_antpos = vdp.VisDependentProperty(default='online')

    @vdp.VisDependentProperty
    def antposfile(self):
        return "antennapos.json"

    @antposfile.convert
    def antposfile(self, value):
        if not value and self.hm_antpos == 'online':
            value = "antennapos.json"
            LOG.info("Input parameter antposfile cannot be empty when hm_antpos='online', using antposfile=%s instead.", value)
        return value

    threshold = vdp.VisDependentProperty(default=1.0)
    snr = vdp.VisDependentProperty(default="default")
    search = vdp.VisDependentProperty(default='auto')

    def __init__(
            self,
            context: Context,
            output_dir: str | None = None,
            vis: list[str] | None = None,
            caltable: list[str] | None = None,
            hm_antpos: Literal['online', 'manual', 'file'] | None = None,
            antposfile: str | None = None,
            antenna: str | None = None,
            offsets: list[float] | None = None,
            threshold: float | None = None,
            snr: float | None = None,
            search: Literal['both_latest', 'both_closest'] | None = None,
            ):
        """
        Initializes the pipeline input parameters for antenna position corrections.

        Args:
            context: The pipeline execution context.

            output_dir: Directory where output files will be stored.
                Defaults to the current working directory.

            vis: List of input MeasurementSets.
                Defaults to those specified in the pipeline context.

                Example: ['ngc5921.ms']

            caltable: List of output calibration table names.
                Defaults to the standard pipeline naming convention.

                Example: ['ngc5921.gcal']

            hm_antpos: 
                - `'online'` : Query ALMA database through CASA task `getantposalma` or reuse 
                pre-existing queried/downloaded JSON files. Files follow the naming pattern 
                `{eb_name}.{antposfile}`. For multi-MS pipeline runs, the MS basename 
                is appended to the filename (e.g., `uid___A002_X123_X4567.antennapos.json`).
                - `'manual'` : Use user-provided corrections.
                - `'file'` : Load corrections from a single old-style CSV antenna position file.
                
                Example: 'manual'

            antposfile: 
                Path to a csv file containing antenna position offsets for `hm_antpos='file'` (required) or the name
                 of the outfile created by `getantposalma` for `hm_antpos='online'`. In order to work with multi-MS
                 pipeline runs, the MS basename will be appended to the file name when using `hm_antpos='online'` (i.e.
                 'uid___A002_X123_X4567.antennapos.json').

                Default: 'antennapos.json'                

            antenna: 
                A comma-separated string of antennas whose positions are to be corrected (if `hm_antpos` is 'manual' 
                or 'online').

                Example: 'DV05,DV07'

            offsets: 
                A flat list of floating-point offsets (X, Y, Z) for all specified antennas.
                The length of the list must be three times the number of antennas.

                Example (for two antennas): `[0.01, 0.02, 0.03, 0.03, 0.02, 0.01]`

            threshold: 
                Threshold value (in wavelengths) above which antenna position offsets are highlighted in the weblog.
                Defaults to 1.0.

                Example: 1.0

            snr:
                A float value describing the signal-to-noise threshold used by the getantposalma task. Antennas with 
                snr below the threshold will not be retrieved. Only used with `hm_antpos='online'`. Defaults to 'default'.

                Example: 5.0

            search:
                Search algorithm used by the getantposalma task. Supports 'both_latest', 'both_closest', and 'auto'.
                Only used with `hm_antpos='online'`. Defaults to 'auto'.

                Example: 'both_closest'
        """
        super().__init__(
            context,
            output_dir=output_dir,
            vis=vis,
            caltable=caltable,
            hm_antpos=hm_antpos,
            antposfile=antposfile,
            antenna=antenna,
            offsets=offsets
            )
        self.threshold = threshold
        self.snr = snr
        self.search = search

    def to_casa_args(self) -> dict[str, str | list[float]]:
        """Configure gencal task arguments and return them in dictionary format.

        Returns:
            vis: Name of the input visibility file (MS).
            caltable: Name of the input calibration table.
            infile: Antenna positions file obtained with getantposalma task.
            antenna: Filter data selection based on antenna/baseline.
            parameter: List of calibration values; For this purpose, the offsets for all specified antennas.
        """
        infile = ''
        if self.hm_antpos == 'online':
            infile = self.online_antpos_filename

        # Get the antenna and offset lists.
        if self.hm_antpos == 'file':
            filename = os.path.join(self.output_dir, self.antposfile)
            antenna, offsets = self._read_antpos_csvfile(filename, os.path.basename(self.vis))
        else:
            antenna = self.antenna
            offsets = self.offsets

        return {'vis': self.vis, 'caltable': self.caltable, 'infile': infile, 'antenna': antenna, 'parameter': offsets}

    def to_antpos_args(self) -> dict[str, str | bool | list[str] | Literal['both_latest', 'both_closest'] | None]:
        """Configure getantposalma task arguments and return them in dictionary format.

        Returns:
            outfile: Name of file to write antenna positions retrieved from DB.
            overwrite: Tells `getantposalma` whether to overwrite the file if it exists. If False and the file exists,
                it will throw an error.
            asdm: The execution block ID (ASDM) of the dataset.
            snr: A float value describing the signal-to-noise threshold. Antennas with snr below the threshold will 
                not be retrieved.
            search: Search algorithm to use. Supports 'both_latest' and 'both_closest'.
            hosts: Priority-ranked list of hosts to query. Can be customized with ANTPOS_SERVICE_URL environment
                variable or ``pipeconfig.services.antpos_url`` in config.yaml.
        """
        from pipeline.config import get_pipeline_config
        default_url = get_pipeline_config('services.antpos_url', '')
        if isinstance(default_url, str):
            default_url = [default_url]
        hosts = utils.get_valid_url('ANTPOS_SERVICE_URL', default_url)
        return {'outfile': self.online_antpos_filename,
                'overwrite': True,
                'asdm': self.context.observing_run.get_ms(self.vis).execblock_id,
                'snr': self.snr,
                'search': self.search,
                'hosts': hosts}

    @property
    def online_antpos_filename(self) -> str:
        eb_antposfile = f"{self.vis.split('.ms')[0]}.{self.antposfile}"
        return os.path.join(self.output_dir, eb_antposfile)

    def __str__(self):
        return (
            f"AlmaAntposInputs:\n"
            f"\tvis: {self.vis}\n"
            f"\tcaltable: {self.caltable}\n"
            f"\thm_antpos: {self.hm_antpos}\n"
            f"\tantposfile: {self.online_antpos_filename}\n"
            f"\tantenna: {self.antenna}\n"
            f"\toffsets: {self.offsets}\n"
            f"\tthreshold: {self.threshold}\n"
            f"\tsnr: {self.snr}\n"
            f"\tsearch: {self.search}"
        )


[docs] @task_registry.set_equivalent_casa_task('hifa_antpos') class ALMAAntpos(antpos.Antpos): Inputs = ALMAAntposInputs
[docs] def prepare(self): inputs = self.inputs if inputs.hm_antpos == 'online': # PIPE-51: retrieve json file for MS to include in the call to gencal antpos_args = inputs.to_antpos_args() if not os.path.exists(antpos_args['outfile']): antpos_job = casa_tasks.getantposalma(**antpos_args) try: # PIPE-2868: catch RuntimeError from getantposalma task and continue with the current session. self._executor.execute(antpos_job) except RuntimeError as ex: LOG.warning('CASA/getantposalma task failed without generating %s: %s', antpos_args['outfile'], ex) traceback_msg = traceback.format_exc() from pipeline.config import get_pipeline_config _allow = get_pipeline_config( 'heuristics.allow_getantposalma_failure', False, env_var='ALLOW_GETANTPOSALMA_FAILURE', as_type=bool) if _allow: LOG.info(traceback_msg) else: raise exceptions.PipelineException(traceback_msg) else: LOG.warning('Antenna position file %s exists. Skipping getantposalma task.', antpos_args['outfile']) if os.path.exists(antpos_args['outfile']): # PIPE-2653 remove antennas from JSON file that are missing from the MS self._remove_missing_antennas_from_json(antpos_args['outfile']) else: # PIPE-2868: likely getantposalma failed and no json file was created. # set result offsets to None instead of empty list [] and continue with the current session. return AntposResults(offsets=None) return super().prepare()
def _remove_missing_antennas_from_json(self, antposfile: str) -> None: """Removes antennas from JSON file that are missing from the measurement set. Handles inconsistencies between antennas in the MS and JSON file, which can occur due to online database issues or truncated datasets. Args: antposfile: Path to the antenna position JSON file to modify. """ # get sorted antenna names from measurement set ant_names_from_ms = sorted([antenna.name for antenna in self.inputs.ms.antennas]) try: with open(antposfile, 'r', encoding='utf-8') as f: query_dict = json.load(f) except (OSError, json.JSONDecodeError) as e: LOG.error('Failed to read JSON file %s: %s', antposfile, e) return if 'data' not in query_dict: LOG.error("The antpos JSON file %s missing required 'data' key.", antposfile) return # find antennas to remove (in JSON but not in MS) ants_from_json = set(query_dict['data'].keys()) ants_from_ms = set(ant_names_from_ms) # PIPE-2652: issue warnings if some antennas from MS are missing in JSON. ants_not_in_json = sorted(ants_from_ms - ants_from_json) if ants_not_in_json: LOG.warning( 'Antenna(s) from %s are not found in the corresponding antpos JSON file %s : %s', self.inputs.vis, antposfile, utils.commafy(ants_not_in_json, quotes=False), ) # Remove missing antennas and update file remove_ants = sorted(ants_from_json - ants_from_ms) for ant in remove_ants: query_dict['data'].pop(ant) if remove_ants: LOG.warning( 'Removed antenna(s) that are missing in %s from the corresponding antpos JSON file %s : %s', self.inputs.vis, antposfile, utils.commafy(remove_ants, quotes=False), ) os.rename(antposfile, antposfile + '.original') with open(antposfile, 'w', encoding='utf-8') as f: json.dump(query_dict, f, separators=(', ', ': '))
[docs] def analyse(self, result): result = super().analyse(result) # add the offsets to the result for online query antpos_args = self.inputs.to_antpos_args() if self.inputs.hm_antpos == 'online' and os.path.exists(antpos_args['outfile']): offsets_dict = self._get_antenna_offsets() antenna_names, offsets = self._get_antennas_with_significant_offset(offsets_dict) if antenna_names: result.antenna = ','.join(antenna_names) result.offsets = offsets LOG.info('Antennas with non-zero corrections applied: %s', result.antenna) return result
def _get_antenna_offsets(self) -> dict[np.str_, NDArray[floating]]: """Retrieves the antenna names and positions from the vis ANTENNA table and computes the offsets. Returns: Dictionary mapping antenna names to (x, y, z) offset tuples. """ with casa_tools.TableReader(self.inputs.vis + '/ANTENNA') as tb: antennas = tb.getcol('NAME') # Retrieve antenna positions from the ANTENNA table # For ALMA, this is in meters / ITRF. tb_positions = tb.getcol('POSITION') tb_antpos_dict = dict(zip(antennas, tb_positions.T)) # Retrieve antenna corrections from antennapos.json with open(self.inputs.online_antpos_filename, 'r') as f: query_dict = json.load(f) db_antpos_dict = query_dict['data'] # calculate offsets offsets_dict = {} for antenna in antennas: if str(antenna) in db_antpos_dict: offsets_dict[antenna] = db_antpos_dict[antenna] - tb_antpos_dict[antenna] return offsets_dict def _get_antennas_with_significant_offset( self, offsets_dict: dict[np.str_, NDArray[floating]], ) -> tuple[list[np.str_], list[floating]]: """ Identifies antennas with significant non-zero coordinate offsets. Args: offsets_dict: Dictionary mapping antenna names to (x, y, z) offset tuples. Returns: A list of antenna names with significant offsets, and a flattened list of their corresponding offset values. """ names = [] flattened_offsets = [] for antenna, offsets in offsets_dict.items(): if np.any(np.abs(offsets) > 0): names.append(antenna) flattened_offsets.extend(offsets) return names, flattened_offsets