Source code for pipeline.infrastructure.utils.conversion

"""Utility functions for data type conversion and string formatting.

The conversion module contains utility functions that convert between data
types and assist in formatting objects as strings for presentation to the
user.
"""
from __future__ import annotations

import collections
import collections.abc
import datetime
import decimal
import math
import os
import re
import string
from pathlib import Path
from typing import TYPE_CHECKING

import astropy.units as u
import cachetools
import pyparsing
from astropy.coordinates import SkyCoord

from pipeline import infrastructure
from pipeline.infrastructure import casa_tools

if TYPE_CHECKING:
    from collections.abc import Iterable, Iterator, Sequence
    from typing import Any

    from numpy import generic
    from numpy.typing import NDArray

    from pipeline.domain import Field, MeasurementSet

LOG = infrastructure.logging.get_logger(__name__)

__all__ = [
    'ant_arg_to_id',
    'commafy',
    'dequote',
    'field_arg_to_id',
    'flatten',
    'format_datetime',
    'format_timedelta',
    'get_epoch_as_datetime',
    'invert_dict',
    'mjd_seconds_to_datetime',
    'range_to_list',
    'record_to_quantity',
    'safe_split',
    'spw_arg_to_id',
    'to_CASA_intent',
    'to_pipeline_intent',
    'convert_paths_to_basenames',
    'human_file_size',
    ]

# By default we use CASA to parse arguments into spw/field/ant IDs. However, this
# requires access to the data. Setting this property to False uses the pipeline's
# legacy parsing routines, which operate off context data. This is useful when
# importing a remote context for debugging purposes, when we don't have access to
# the data.
USE_CASA_PARSING_ROUTINES = True

_ANGLE_UNITS = ('rad', 'deg', 'arcmin', 'arcsec', 'amin', 'asec')


class LoggingLRUCache(cachetools.LRUCache):
    """'Least recently used' cache that logs when cache entries are evicted.

    Underestimating the required cache size leads to poor performance, as seen
    in PIPE-327, where a lack of cached entries for the 33 EBs leads to
    millions of 'unnecessary' MS tool open calls, each open() taking several
    tens of milliseconds. Hence, we want to be notified when the cache size
    limit is hit.
    """

    def __init__(self, name: str, *args, **kwargs):
        self.name = name
        super().__init__(*args, **kwargs)

    def popitem(self) -> tuple[Any, Any]:
        """Remove and return the (key, value) pair least recently used.

        Override popitem method to create a log entry when a cache entry is
        evicted.
        """
        key, value = super().popitem()
        LOG.info(f'Evicting cache entry for {self.name}. '
                 f'Cache size ({self.maxsize}) is too small!')
        LOG.trace(f'Key {key} evicted with value {value}')
        return key, value


# Cache for ms.msselectedindices calls. Without this cache, the MS tool would
# open and close the measurement set on each query, which is an expensive
# operation.
MSTOOL_SELECTEDINDICES_CACHE: dict[str, LoggingLRUCache] = {}


[docs] def commafy(l: Iterable, quotes: bool = True, multi_prefix: str = '', separator: str = ', ', conjunction: str = 'and') -> str: """Convert the string list into the textual description. Example: >>> commafy(['a','b','c']) "'a', 'b' and 'c'" Args: l: Python string list. quotes: If quote is True, 'l' arg elements are enclosed in quotes by each. multi_prefix: If the 'l' arg has three or more elements, the 'multi_prefix' attach to the head. separator: The 'separator' arg is used as separator instead of ','. conjunction: The 'conjunction' arg is used as conjunction instead of 'and'. Return: The textual description of the given list. """ if not isinstance(l, list) and isinstance(l, collections.abc.Iterable): l = [i for i in l] # turn 's' into 's ' if multi_prefix: multi_prefix += ' ' length = len(l) if length == 0: return '' if length == 1: if multi_prefix: prefix = ' ' else: prefix = '' if quotes: return '%s\'%s\'' % (prefix, l[0]) else: return '%s%s' % (prefix, l[0]) if length == 2: if quotes: return '%s\'%s\' %s \'%s\'' % (multi_prefix, l[0], conjunction, l[1]) else: return '%s%s %s %s' % (multi_prefix, l[0], conjunction, l[1]) else: if quotes: return '%s\'%s\'%s%s' % ( multi_prefix, l[0], separator, commafy(l[1:], separator=separator, quotes=quotes, conjunction=conjunction)) else: return '%s%s%s%s' % ( multi_prefix, l[0], separator, commafy(l[1:], separator=separator, quotes=quotes, conjunction=conjunction))
[docs] def flatten(l: Sequence[Any]) -> Iterator[Any]: """Flatten a list of lists into a single list without pipelineaq.QAScore. Example: >>> obj = flatten([1,2,[3,4,[5,6]],7]) >>> obj.__next__() 1 >>> obj.__next__() 2 >>> obj.__next__() 3 >>> list(flatten([1,2,['c',4,['e',6]],7])) [1, 2, 'c', 4, 'e', 6, 7] Args: l: A list with list or any object. Yields: Single list. """ for el in l: if isinstance(el, collections.abc.Iterable) and not isinstance(el, str): for sub in flatten(el): yield sub else: yield el
def unix_seconds_to_datetime(unix_secs: list[int | float]) -> list[datetime.datetime]: """Convert list of UNIX epoch times to a list of equivalent datetime objects. Args: unix_secs: list of elapsed seconds since 1970-01-01. Returns: List of equivalent Python datetime objects. """ return [datetime.datetime.fromtimestamp(s, datetime.timezone.utc) for s in unix_secs]
[docs] def mjd_seconds_to_datetime(mjd_secs: list[int | float]) -> list[datetime.datetime]: """Convert list of MJD seconds to a list of equivalent datetime objects. Convert the input list of elapsed seconds since MJD epoch to the equivalent Python datetime objects. Args: mjd_secs: list of elapsed seconds since MJD epoch. Returns: List of equivalent Python datetime objects. """ # 1970-01-01 is JD 40587. 86400 = seconds in a day unix_offset = 40587 * 86400 mjd_secs_with_offsets = [s - unix_offset for s in mjd_secs] return unix_seconds_to_datetime(mjd_secs_with_offsets)
[docs] def get_epoch_as_datetime(epoch: dict) -> datetime.datetime: """Convert a CASA 'epoch' measure into a Python datetime. Args: epoch: CASA 'epoch' measure dictionary. Returns: The equivalent Python datetime. """ mt = casa_tools.measures qt = casa_tools.quanta # calculate UTC standard offset datetime_base = mt.epoch('UTC', '40587.0d') base_time = mt.getvalue(datetime_base)['m0'] base_time = qt.convert(base_time, 'd') base_time = qt.floor(base_time) # subtract offset from UTC equivalent time epoch_utc = mt.measure(epoch, 'UTC') t = mt.getvalue(epoch_utc)['m0'] t = qt.sub(t, base_time) t = qt.convert(t, 's') t = datetime.datetime.fromtimestamp(qt.getvalue(t)[0], datetime.timezone.utc) return t
[docs] def range_to_list(arg: str) -> list[int]: """Expand a numeric range expressed in CASA syntax to the list of integer. Expand a numeric range expressed in CASA syntax to the equivalent Python list of integers. Example: >>> range_to_list('1~5,7~9') [1, 2, 3, 4, 5, 7, 8, 9] Args: arg: The numeric range expressed in CASA syntax. Returns: The equivalent Python list of integers. """ if arg == '': return [] # recognise but suppress the mode-switching tokens TILDE = pyparsing.Suppress('~') # recognise '123' as a number, converting to an integer number = pyparsing.Word(pyparsing.nums).set_parse_action(lambda tokens: int(tokens[0])) # convert '1~10' to a range rangeExpr = number('start') + TILDE + number('end') rangeExpr.set_parse_action(lambda tokens: list(range(tokens.start, tokens.end + 1))) casa_chars = ''.join([c for c in string.printable if c not in ',;"/' + string.whitespace]) textExpr = pyparsing.Word(casa_chars) # numbers can be expressed as ranges or single numbers atomExpr = rangeExpr | number | textExpr # we can have multiple items separated by commas atoms = pyparsing.DelimitedList(atomExpr, delim=',')('atoms') return atoms.parse_string(str(arg)).asList()
[docs] def to_CASA_intent(ms: MeasurementSet, intents: str) -> str: """Convert pipeline intents back to the equivalent intents recorded in the measurement set. Example: > to_CASA_intent(ms, 'PHASE,BANDPASS') 'CALIBRATE_PHASE_ON_SOURCE,CALIBRATE_BANDPASS_ON_SOURCE' Args: ms: MeasurementSet object. intents: pipeline intents to convert. Returns: The CASA intents recorded. """ obs_modes = ms.get_original_intent(intents) return ','.join(obs_modes)
[docs] def to_pipeline_intent(ms: MeasurementSet, intents: str) -> str: """Convert CASA intents to pipeline intents. Args: ms: MeasurementSet object. intents: CASA intents to convert. Returns: The pipeline intents. """ casa_intents = {i.strip('*') for i in intents.split(',') if i is not None} state = ms.states[0] pipeline_intents = {pipeline_intent for casa_intent in casa_intents for obsmode, pipeline_intent in state.obs_mode_mapping.items() if casa_intent in obsmode} return ','.join(pipeline_intents)
[docs] def field_arg_to_id(ms_path: str, field_arg: str | int, all_fields) -> list[int]: """Convert a string to the corresponding field IDs. Args: ms_path: A path to the measurement set. field_arg: A field selection in CASA format. all_fields: All Field objects, for use when CASA msselect is not used. Returns: A list of field IDs. """ if USE_CASA_PARSING_ROUTINES: try: all_indices = _convert_arg_to_id('field', ms_path, str(field_arg)) return all_indices['field'].tolist() except RuntimeError: # SCOPS-1666 # msselect throws exceptions with numeric field names beginning with # zero. Try again, encapsulating the argument in quotes. quoted_arg = '"%s"' % str(field_arg) all_indices = _convert_arg_to_id('field', ms_path, quoted_arg) return all_indices['field'].tolist() else: return _parse_field(field_arg, all_fields)
[docs] def spw_arg_to_id(ms_path: str, spw_arg: str | int, all_spws) -> list[tuple[int, int, int, int]]: """Convert a string to spectral window IDs and channels. Args: ms_path: A path to the measurement set. spw_arg: A spw selection in CASA format. all_spws: List of all SpectralWindow objects, for use when CASA msselect is not used. Returns: A list of (spw, chan_start, chan_end, step) lists. """ if USE_CASA_PARSING_ROUTINES: all_indices = _convert_arg_to_id('spw', ms_path, str(spw_arg)) # filter out channel tuples whose spw is not in the spw entry return [(spw, start, end, step) for (spw, start, end, step) in all_indices['channel'] if spw in all_indices['spw']] else: atoms = _parse_spw(spw_arg, all_spws) spws = [] for atom in atoms: spw = [spw for spw in all_spws if spw.id == atom.spw].pop() spws.append((spw.id, 0, len(spw.channels.chan_freqs), 1)) return spws
[docs] def ant_arg_to_id(ms_path: str, ant_arg: str | int, all_antennas) -> list[int]: """Convert a string to the corresponding antenna IDs. Args ms_path: A path to the measurement set. ant_arg: A antenna selection in CASA format. all_antennas: All antenna domain objects for use when CASA msselect is disabled. Returns A list of antenna IDs. """ if USE_CASA_PARSING_ROUTINES: all_indices = _convert_arg_to_id('baseline', ms_path, str(ant_arg)) return all_indices['antenna1'].tolist() else: return _parse_antenna(ant_arg, all_antennas)
def _convert_arg_to_id(arg_name: str, ms_path: str, arg_val: str) -> dict[str, NDArray[generic]]: """Parse the CASA input argument and return the matching IDs. Originally the cache was set on this function with the cache size fixed at import time (originally 1000). In PIPE-327 this cache size proved too small due to the number of EBs (33) and data shape and so we need a way to scale the cache with the input data. Hence, a way to scale the cache at runtime was created (via the MSSelectedIndicesCache class) and this function delegates to the instance held in the module namespace. Args: arg_name: Name of selection argument to use in MS selection query ms_path: A path to the measurement set arg_val: Value for selection argument to use in MS selection query, formatted with CASA syntax. Returns: A list of IDs matching the input selection. """ ms_abspath = os.path.abspath(ms_path) if ms_abspath not in MSTOOL_SELECTEDINDICES_CACHE: # PIPE-327: # Historically, a cache size of 1000 entries per EB has been # sufficient to avoid cache eviction. It would be possible to # calculate a more accurate required cache size (some function of # spws, fields, field combinations, etc.) but it's probably not # worth the effort as cache entries left unoccupied should take # minimal space. # PIPE-1008: # increase maxsize to 40k entries for VLASS calibration # A typical VLASS observation can have 15-20k fields MSTOOL_SELECTEDINDICES_CACHE[ms_abspath] = LoggingLRUCache(ms_abspath, maxsize=40000) cache_for_ms = MSTOOL_SELECTEDINDICES_CACHE[ms_abspath] cache_key = (arg_name, arg_val) try: return cache_for_ms[cache_key] except KeyError: taql = {arg_name: str(arg_val)} LOG.trace('Executing msselect({%r:%r} on %s', arg_name, arg_val, ms_path) with casa_tools.MSReader(ms_path) as ms: ms.msselect(taql, onlyparse=True) result = ms.msselectedindices() cache_for_ms[cache_key] = result return result
[docs] def safe_split(fields: str) -> list[str]: """Split a string containing field names into a list. Split a string containing field names into a list, taking account of field names within quotes. Args: fields: A string containing field names. Returns: A list, taking account of field names within quotes. """ return pyparsing.pyparsing_common.comma_separated_list.parse_string(str(fields)).asList()
[docs] def dequote(s: str) -> str: """Remove any kind of quotes from a string to facilitate comparisons. Args: s: A string. Returns: String removed any kind of quotes. """ return s.replace('"', '').replace("'", "")
[docs] def format_datetime(dt: datetime.datetime, dp: int = 0) -> str: """Convert a datetime to a formatted string representation. Convert a Python datetime object into a string representation, including microseconds to the requested precision. Args: dt: Python datetime. dp: A number of decimal places for microseconds (0=do not show). Returns: Formatted string representation of datetime. """ s = dt.strftime('%Y-%m-%d %H:%M:%S') if dp > 6: raise ValueError('Cannot exceed 6 decimal places as datetime stores to microsecond precision') elif 0 < dp <= 6: microsecs = dt.microsecond / 1e6 f = '{0:.%sf}' % dp return s + f.format(microsecs)[1:] else: return s
[docs] def format_timedelta(td: datetime.timedelta, dp: int = 0) -> str: """Convert a timedelta to a formatted string representation. Convert a Python timedelta object into a string representation, including microseconds to the requested precision. Args td: A timedelta object. dp: A number of decimal places for microseconds (0=do not show). The number should be natural number with 0. Returns: Formatted string representation of timedelta. """ secs = decimal.Decimal(td.seconds) microsecs = decimal.Decimal(td.microseconds) / decimal.Decimal('1e6') rounded_secs = (secs + microsecs).quantize(decimal.Decimal(10) ** -dp) rounded = datetime.timedelta(days=td.days, seconds=math.floor(rounded_secs)) # get rounded number of microseconds as an integer rounded_microsecs = int((rounded_secs % 1).shift(6)) # .. which we can pad with zeroes.. str_microsecs = '{0:06d}'.format(rounded_microsecs) # .. which we can append onto the end of the default timedelta string # representation if dp > 6: raise ValueError('Cannot exceed 6 decimal places as datetime stores to microsecond precision') elif 0 < dp <= 6: fraction = str_microsecs[0:dp] return str(rounded) + '.' + str(fraction) else: return str(rounded)
def _parse_spw(task_arg: str, all_spw_ids: tuple = None) -> list[tuple[str, list[Any, Any]]]: """Convert the CASA-style spw argument to a list of spw IDs. Channel limits are also parsed in this function but are not currently used. The channel limits may be found as the channels property of an atom. Example: > _parse_spw('0:0~6^2,2:6~38^4 (0, 1, 4, 5, 6, 7)') <result> <atom> <spws> <ITEM>0</ITEM> </spws> <channels> <ITEM>0</ITEM> <ITEM>2</ITEM> <ITEM>4</ITEM> <ITEM>6</ITEM> </channels> </atom> <atom> <spws> <ITEM>2</ITEM> </spws> <channels> <ITEM>6</ITEM> <ITEM>10</ITEM> <ITEM>14</ITEM> <ITEM>18</ITEM> <ITEM>22</ITEM> <ITEM>26</ITEM> <ITEM>30</ITEM> <ITEM>34</ITEM> <ITEM>38</ITEM> </channels> </atom> </result> Args: task_arg: all_spw_ids: Returns: """ if task_arg in (None, ''): return all_spw_ids if all_spw_ids is None: all_spw_ids = [] # recognise but suppress the mode-switching tokens TILDE, LESSTHAN, CARET, COLON, ASTERISK = list(map(pyparsing.Suppress, '~<^:*')) # recognise '123' as a number, converting to an integer number = pyparsing.Word(pyparsing.nums).set_parse_action(lambda tokens: int(tokens[0])) # convert '1~10' to a range rangeExpr = number('start') + TILDE + number('end') rangeExpr.set_parse_action(lambda tokens: list(range(tokens.start, tokens.end + 1))) # convert '1~10^2' to a range with the given step size rangeWithStepExpr = number('start') + TILDE + number('end') + CARET + number('step') rangeWithStepExpr.set_parse_action(lambda tokens: list(range(tokens.start, tokens.end + 1, tokens.step))) # convert <10 to a range ltExpr = LESSTHAN + number('max') ltExpr.set_parse_action(lambda tokens: list(range(0, tokens.max))) # convert * to all spws allExpr = ASTERISK.set_parse_action(lambda tokens: all_spw_ids) # spw and channel components can be any of the above patterns numExpr = rangeWithStepExpr | rangeExpr | ltExpr | allExpr | number # recognise and group multiple channel definitions separated by semi-colons channelsExpr = pyparsing.Group(pyparsing.DelimitedList(numExpr, delim=';')) # group the number so it converted to a node, spw in this case spwsExpr = pyparsing.Group(numExpr) # the complete expression is either spw or spw:chan atomExpr = pyparsing.Group(spwsExpr('spws') + COLON + channelsExpr('channels') | spwsExpr('spws')) # and we can have multiple items separated by commas finalExpr = pyparsing.DelimitedList(atomExpr('atom'), delim=',')('result') parse_result = finalExpr.parse_string(str(task_arg)) results = {} for atom in parse_result.result: for spw in atom.spws: if spw not in results: results[spw] = set(atom.channels) else: results[spw].update(atom.channels) Atom = collections.namedtuple('Atom', ['spw', 'channels']) return [Atom(spw=k, channels=v) for k, v in results.items()] def _parse_field(task_arg: str | None, fields: Field | None = None) -> list[int]: """Convert the field section in CASA format to list of field IDs. Inner method. Args: task_arg: The field selection in CASA format. fields: Field objects Returns: A list of field IDs that matches field selection criteria """ if task_arg in (None, ''): return [f.id for f in fields] if fields is None: fields = [] # recognise but suppress the mode-switching tokens TILDE = pyparsing.Suppress('~') # recognise '123' as a number, converting to an integer number = pyparsing.Word(pyparsing.nums).set_parse_action(lambda tokens: int(tokens[0])) # convert '1~10' to a range rangeExpr = number('start') + TILDE + number('end') rangeExpr.set_parse_action(lambda tokens: list(range(tokens.start, tokens.end + 1))) boundary = ''.join([c for c in pyparsing.printables if c not in (' ', ',')]) field_id = pyparsing.WordStart(boundary) + (rangeExpr | number) + pyparsing.WordEnd(boundary) casa_chars = ''.join([c for c in string.printable if c not in string.whitespace]) field_name = pyparsing.Word(casa_chars + ' ') def get_ids_for_matching(tokens): search_term = tokens[0] if '*' in search_term: regex = search_term.replace('*', '.*') + '$' return [f.id for f in fields if re.match(regex, f.name)] return [f.id for f in fields if f.name == search_term] field_name.set_parse_action(get_ids_for_matching) results = set() for atom in pyparsing.pyparsing_common.comma_separated_list.parse_string(str(task_arg)): for parser in [field_name('fields'), field_id('fields')]: for match in parser.search_string(atom): results.update(match.asList()) return sorted(list(results)) def _parse_antenna(task_arg: str | None, antennas: dict[str, NDArray[generic]] | None = None) -> list[int]: """Convert the antenna selection in CASA format to a list of antenna IDs. Inner method. Args: task_arg: The antenna selection in CASA format. antennas: Antenna domain objects. Returns: List of antenna IDs that matches antenna selection criteria. """ if task_arg in (None, ''): return [a.id for a in antennas] if antennas is None: antennas = [] # recognise but suppress the mode-switching tokens TILDE = pyparsing.Suppress('~') # recognise '123' as a number, converting to an integer number = pyparsing.Word(pyparsing.nums).set_parse_action(lambda tokens: int(tokens[0])) # convert '1~10' to a range rangeExpr = number('start') + TILDE + number('end') rangeExpr.set_parse_action(lambda tokens: list(range(tokens.start, tokens.end + 1))) # antenna-oriented 'by ID' expressions can be any of the above patterns boundary = ''.join([c for c in pyparsing.printables if c not in (' ', ',')]) numExpr = pyparsing.WordStart(boundary) + (rangeExpr | number) + pyparsing.WordEnd(boundary) # group the number so it converted to a node, fields in this case antenna_id_expr = pyparsing.Group(numExpr) casa_chars = ''.join([c for c in string.printable if c not in ',;"/' + string.whitespace]) antenna_name = pyparsing.Word(casa_chars) def get_antenna(tokens): search_term = tokens[0] if '*' in search_term: regex = search_term.replace('*', '.*') + '$' return [a.id for a in antennas if re.match(regex, a.name)] return [a.id for a in antennas if a.name == search_term] antenna_name.set_parse_action(get_antenna) antenna_name_expr = pyparsing.Group(antenna_name) # the complete expression atomExpr = pyparsing.Group(antenna_id_expr('antennas') | antenna_name_expr('antennas')) results = set() for substr in pyparsing.pyparsing_common.comma_separated_list.parse_string(str(task_arg)): atoms = atomExpr.parse_string(substr) for atom in atoms: for ant in atom.antennas: results.add(ant) return sorted(list(results))
[docs] def record_to_quantity( record: dict | list[dict] | tuple[dict] ) -> u.Quantity | list[u.Quantity] | tuple[u.Quantity]: """Convert a CASA record to an Astropy quantity. Optionally, the input can be a list/tuple in which each element is a CASA record. """ if isinstance(record, (list, tuple)): quantities = [record_to_quantity(r) for r in record] if isinstance(record, tuple): return tuple(quantities) return quantities return record['value'] * u.Unit(record['unit'])
def phasecenter_to_skycoord(phasecenter: str) -> SkyCoord: """Convert a CASA-style coordinate string to an Astropy SkyCoord object.""" phasecenter_list = phasecenter.split() if len(phasecenter_list) == 2: ra = phasecenter_list[0] dec = phasecenter_list[1] refcode = 'icrs' elif len(phasecenter_list) == 3: ra = phasecenter_list[1] dec = phasecenter_list[2] refcode = phasecenter_list[0] else: raise ValueError(f"Cannot parse phasecenter string: {phasecenter}") frame = refcode_to_skyframe(refcode) # handle common case of Dec expressed with two dots instead of colons if ( dec.count('.') >= 2 and ':' not in dec and 'deg' not in dec and 'd' not in dec and 'rad' not in dec ): dec = dec.replace('.', ':', 2) # determine RA unit if any(u in ra for u in _ANGLE_UNITS): # if units are specified, let astropy handle it ra_unit = None elif 'h' in ra or ':' in ra: ra_unit = u.hourangle else: try: _ = float(ra) ra_unit = u.deg except ValueError: ra_unit = u.hourangle LOG.info("Unable to determine RA unit, assuming hourangle for RA value %s", ra) # determine Dec unit if any(u in dec for u in _ANGLE_UNITS): # if units are specified, let astropy handle it dec_unit = None else: dec_unit = u.deg LOG.info("Unable to determine Dec unit, assuming degrees for Dec value %s", dec) coord = SkyCoord(ra, dec, unit=(ra_unit, dec_unit), frame=frame) return coord def refcode_to_skyframe(refcode: str) -> str: """Convert a CASA coordsysy refcode to an Astropy SkyCoord frame name. Limitations: Currently, it only handles the common cases, e.g. J2000, B1950, ICRS To get a list of built-in astropy.coordinates frame names: from astropy.coordinates import frame_transform_graph print(frame_transform_graph.get_names()) To get a list of CASA csys reference code: csys = cs.newcoordsys(direction=True) clist = csys.referencecode('dir', True) """ frame = refcode.lower() if frame == 'j2000': frame = 'fk5' if frame == 'b1950': frame = 'fk4' return frame
[docs] def invert_dict(input_dict: dict) -> dict: """Inverts a dictionary so that values become keys and keys become grouped in a list. Args: input_dict: The original dictionary. Returns: A new dictionary with values as keys and lists of original keys as values. """ inverted = collections.defaultdict(list) for key, value in input_dict.items(): inverted[value].append(key) return dict(inverted)
[docs] def convert_paths_to_basenames(command_string: str) -> str: """Convert all absolute and relative file paths in command string to basenames. Handles multi-line strings with comments and preserves all formatting while converting only the file paths to basenames. Ensures proper quote pair matching and excludes strings with ANY nested quotes (both same and different types). Args: command_string: CASA command string(s) with file paths to convert. Returns: Command string with all paths converted to basenames only. """ def replace_path(match: re.Match) -> str: full_match = match.group(0) quote_char = full_match[0] path_content = full_match[1:-1] if "'" in path_content or '"' in path_content: return full_match # Convert to basename only if it contains path separators if '/' in path_content: basename = Path(path_content).name return f'{quote_char}{basename}{quote_char}' return full_match lines = command_string.split('\n') converted_lines = [] for line in lines: if line.strip().startswith('#'): converted_lines.append(line) else: pattern = r"'[^']*'|\"[^\"]*\"" converted_line = re.sub(pattern, replace_path, line) converted_lines.append(converted_line) return '\n'.join(converted_lines)
[docs] def human_file_size(size_in_bytes: int | float) -> str: """Converts a file size in bytes to a human-readable string format. Uses binary prefixes (KB, MB, GB, TB, PB, EB, ZB, YB) where 1 KB = 1024 bytes. Args: size_in_bytes: The size in bytes (integer or float). Returns: A string representing the human-readable file size (e.g., "1.2 KB", "3.45 MB"). Raises: ValueError: If the input size_in_bytes is negative. Examples: >>> bytes_to_human_readable(0) '0 Bytes' >>> bytes_to_human_readable(500) '500 Bytes' >>> bytes_to_human_readable(1024) '1.0 KB' >>> bytes_to_human_readable(1500) '1.46 KB' >>> bytes_to_human_readable(1024 * 1024) '1.0 MB' >>> bytes_to_human_readable(2500000) '2.38 MB' >>> bytes_to_human_readable(1024**3 * 1.5) '1.5 GB' >>> bytes_to_human_readable(1024**6) '1.0 EB' """ if size_in_bytes < 0: raise ValueError("File size cannot be negative.") if size_in_bytes == 0: return "0 Bytes" # Define the units and the base (1024 for binary prefixes) unit_labels = ("Bytes", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB") base = 1024 # Calculate the index of the appropriate unit unit_index = int(math.floor(math.log(size_in_bytes, base))) # Ensure the index does not exceed the available units unit_index = min(unit_index, len(unit_labels) - 1) # Calculate the size in the chosen unit human_readable_size = size_in_bytes / (base**unit_index) # Get the unit label unit = unit_labels[unit_index] # Format the output string if unit == "Bytes": return f"{int(human_readable_size)} {unit}" else: # Format to one decimal place, adjust as needed return f"{human_readable_size:.1f} {unit}"