Source code for pipeline.infrastructure.utils.parallactic_range
# - functions to measure parallactic angle coverage of calibrators
# Do not evaluate type annotations at definition time.
from __future__ import annotations
from typing import TYPE_CHECKING
from pipeline import infrastructure
from pipeline.infrastructure import casa_tools, utils
if TYPE_CHECKING:
from collections.abc import Callable
from pipeline.domain import MeasurementSet, Field
LOG = infrastructure.logging.get_logger(__name__)
__all__ = ['ous_parallactic_range']
# Type aliases for the parallactic angle computations.
ParallacticAngle = float
SignedAngle = float
PositiveDefiniteAngle = float
[docs]
def ous_parallactic_range(
mses: list[MeasurementSet],
field_name: str,
intent: str
) -> tuple[float, float] | None:
"""
Get the parallactic angle range across all measurement sets for field when observed with the specified intent.
Args:
mses: MeasurementSets to process
field_name: Field to inspect
intent: observing intent to consider
Returns:
angular range expressed as (min angle, max angle) tuple or None
"""
angles = []
for ms in mses:
fields = ms.get_fields(task_arg=field_name, intent=intent)
if not fields:
LOG.error('No field found for for field: %s intent: %s in ms: %s; cannot determine '
'parallactic angle.', field_name, intent, ms.basename)
continue
if len(fields) > 1:
LOG.warning('Multiple fields found for field: %s intent: %s in ms: %s; using first one for '
'the parallactic angle range calculation.', field_name, intent, ms.basename)
field = fields[0]
try:
angles.extend(_parallactic_range_for_field(ms, field, intent))
except ValueError as e:
LOG.exception('Could not determine parallactic angle', exc_info=e)
continue
if not angles:
return
signed_range = _range_after_processing(angles, _to_signed)
pd_range = _range_after_processing(angles, _to_positive_definite)
return min((signed_range, pd_range))
def _parallactic_range_for_field(ms: MeasurementSet, f: Field, intent: str) -> tuple[float, float]:
"""
Get the parallactic angle range for field f when observed with the specified intent.
Args:
ms: MeasurementSet to process
f: Field to inspect
intent: observing intent to consider
Returns:
angular range expressed as (min angle, max angle) tuple
Raises:
ValueError: if no scans are found in the MS for specified field and intent
"""
# get the scans when the field was observed with the required intent
scans = ms.get_scans(field=f.id, scan_intent=intent)
if not scans:
raise ValueError(f'No scans detected for field {f} with intent {intent}')
# This uses the earliest start and latest end times of the scan domain
# objects. In theory, times within a scan can be field and spw dependent so
# they may not exactly match those of the field in question, but they
# should be accurate enough for our purposes
min_utc = min([s.start_time for s in scans], key=utils.get_epoch_as_datetime)
max_utc = max([s.end_time for s in scans], key=utils.get_epoch_as_datetime)
observatory = ms.antenna_array.name
min_pa = _parallactic_angle_at_epoch(f, min_utc, observatory)
max_pa = _parallactic_angle_at_epoch(f, max_utc, observatory)
if min_pa > max_pa:
min_pa, max_pa = max_pa, min_pa
return min_pa, max_pa
def _parallactic_angle_at_epoch(f: Field, e: dict, observatory: str) -> float:
"""
Get the instantaneous parallactic angle for field f at epoch e.
Args:
f: Field domain object
e: CASA epoch
Returns
angular separation in degrees
"""
me = casa_tools.measures
qa = casa_tools.quanta
try:
me.doframe(me.observatory(observatory))
me.doframe(e)
pole_direction = me.direction('J2000', '0deg', '+90deg')
pole_azel = me.measure(pole_direction, 'AZEL')
field_azel = me.measure(f.mdirection, 'AZEL')
separation = me.posangle(field_azel, pole_azel)
sep_degs = qa.convert(separation, 'deg')
return sep_degs['value']
finally:
me.done()
def _range_after_processing(fs: list[float], g: Callable[[float], float]):
"""
Get the range of a list of floats (fs) once processed by function g.
"""
processed = [g(f) for f in fs]
return max(processed) - min(processed)
def _to_signed(angle: ParallacticAngle) -> SignedAngle:
if angle > 180:
return angle - 360
return angle
def _to_positive_definite(angle: ParallacticAngle) -> PositiveDefiniteAngle:
if angle < 0:
return angle + 360
return angle