Source code for pipeline.domain.state

import datetime

import pipeline.infrastructure as infrastructure

LOG = infrastructure.get_logger(__name__)


[docs] class State: """A logical representation of rows in the STATE table. Relates STATE_ID (in the MAIN table) to the observing mode(s) and corresponding pipeline intent(s). Attributes: id: Numerical identifier of this State. obs_mode: Unique obs_mode values associated with this State. obs_mode_mapping: Class-level dictionary mapping obs_mode strings to pipeline intent strings. """ obs_mode_mapping = {} __slots__ = ('id', 'obs_mode') def __getstate__(self) -> tuple[int, str]: return self.id, self.obs_mode def __setstate__(self, state) -> None: self.id, self.obs_mode = state def __init__(self, state_id: int, obs_mode: str) -> None: """ Initialize a State object. Args: state_id: Numerical identifier of this State. obs_mode: Unique obs_mode values associated with this State. """ self.id = state_id # work around NumPy bug with empty strings # http://projects.scipy.org/numpy/ticket/1239 self.obs_mode = str(obs_mode) def __repr__(self) -> str: return '{0}({1!r}, {2!r})'.format( self.__class__.__name__, self.id, self.obs_mode) @property def intents(self) -> set[str]: """Return all intents associated with this State.""" # return all intents return {intent for mode, intent in self.obs_mode_mapping.items() if self.obs_mode.find(mode) != -1}
[docs] def get_obs_mode_for_intent(self, intent: str) -> list[str]: """Return list of obs_mode values associated with given intent.""" intents = {i.strip('*') for i in intent.split(',') if i is not None} return [mode for mode, pipeline_intent in self.obs_mode_mapping.items() if pipeline_intent in intents and self.obs_mode.find(mode) != -1]
def __str__(self) -> str: return '{0}(id={1}, intents={2})'.format(self.__class__.__name__, self.id, self.intents)
class StateALMA(State): """State representation for ALMA Observatory measurement sets. Extends State with ALMA-specific obs_mode to pipeline intent mappings. Attributes: id: Numerical identifier of this State. obs_mode: Unique obs_mode values associated with this State. """ # dictionary to map from STATE table obs_mode to pipeline intent obs_mode_mapping = { 'CALIBRATE_POLARIZATION#ON_SOURCE' : 'POLARIZATION', 'CALIBRATE_POLARIZATION.ON_SOURCE' : 'POLARIZATION', 'CALIBRATE_POLARIZATION_ON_SOURCE' : 'POLARIZATION', 'CALIBRATE_POL_ANGLE#ON_SOURCE' : 'POLANGLE', 'CALIBRATE_POL_ANGLE.ON_SOURCE' : 'POLANGLE', 'CALIBRATE_POL_ANGLE_ON_SOURCE' : 'POLANGLE', 'CALIBRATE_POL_LEAKAGE#ON_SOURCE' : 'POLLEAKAGE', 'CALIBRATE_POL_LEAKAGE.ON_SOURCE' : 'POLLEAKAGE', 'CALIBRATE_POL_LEAKAGE_ON_SOURCE' : 'POLLEAKAGE', 'CALIBRATE_BANDPASS#ON_SOURCE' : 'BANDPASS', 'CALIBRATE_BANDPASS.ON_SOURCE' : 'BANDPASS', 'CALIBRATE_BANDPASS_ON_SOURCE' : 'BANDPASS', 'CALIBRATE_AMPLI#ON_SOURCE' : 'AMPLITUDE', 'CALIBRATE_AMPLI.ON_SOURCE' : 'AMPLITUDE', 'CALIBRATE_AMPLI_ON_SOURCE' : 'AMPLITUDE', 'CALIBRATE_FLUX#ON_SOURCE' : 'AMPLITUDE', 'CALIBRATE_FLUX.ON_SOURCE' : 'AMPLITUDE', 'CALIBRATE_FLUX_ON_SOURCE' : 'AMPLITUDE', 'CALIBRATE_PHASE#ON_SOURCE' : 'PHASE', 'CALIBRATE_PHASE.ON_SOURCE' : 'PHASE', 'CALIBRATE_PHASE_ON_SOURCE' : 'PHASE', 'CALIBRATE_TARGET#ON_SOURCE' : 'TARGET', 'CALIBRATE_TARGET.ON_SOURCE' : 'TARGET', 'CALIBRATE_TARGET_ON_SOURCE' : 'TARGET', 'CALIBRATE_POINTING#ON_SOURCE' : 'POINTING', 'CALIBRATE_POINTING.ON_SOURCE' : 'POINTING', 'CALIBRATE_POINTING_ON_SOURCE' : 'POINTING', 'CALIBRATE_FOCUS#ON_SOURCE' : 'FOCUS', 'CALIBRATE_FOCUS.ON_SOURCE' : 'FOCUS', 'CALIBRATE_FOCUS_ON_SOURCE' : 'FOCUS', 'CALIBRATE_WVR#ON_SOURCE' : 'WVR', 'CALIBRATE_WVR.ON_SOURCE' : 'WVR', 'CALIBRATE_WVR_ON_SOURCE' : 'WVR', 'CALIBRATE_ATMOSPHERE#ON_SOURCE' : 'ATMOSPHERE', 'CALIBRATE_ATMOSPHERE.ON_SOURCE' : 'ATMOSPHERE', 'CALIBRATE_ATMOSPHERE_ON_SOURCE' : 'ATMOSPHERE', 'CALIBRATE_ATMOSPHERE#AMBIENT' : 'ATMOSPHERE', 'CALIBRATE_ATMOSPHERE#HOT' : 'ATMOSPHERE', 'CALIBRATE_ATMOSPHERE#OFF_SOURCE' : 'ATMOSPHERE', 'CALIBRATE_ATMOSPHERE.OFF_SOURCE' : 'ATMOSPHERE', 'CALIBRATE_ATMOSPHERE_OFF_SOURCE' : 'ATMOSPHERE', 'CALIBRATE_ATMOSPHERE#TEST' : 'ATMOSPHERE', 'CALIBRATE_SIDEBAND_RATIO#ON_SOURCE' : 'SIDEBAND', 'CALIBRATE_SIDEBAND_RATIO.ON_SOURCE' : 'SIDEBAND', 'CALIBRATE_SIDEBAND_RATIO_ON_SOURCE' : 'SIDEBAND', 'CALIBRATE_SIDEBAND_RATIO#OFF_SOURCE': 'SIDEBAND', 'CALIBRATE_SIDEBAND_RATIO.OFF_SOURCE': 'SIDEBAND', 'CALIBRATE_SIDEBAND_RATIO_OFF_SOURCE': 'SIDEBAND', 'CALIBRATE_DIFFGAIN#REFERENCE' : 'DIFFGAINREF', 'CALIBRATE_DIFFGAIN#ON_SOURCE' : 'DIFFGAINSRC', 'CALIBRATE_DELAY#ON_SOURCE' : 'CHECK', 'CALIBRATE_DELAY.ON_SOURCE' : 'CHECK', 'CALIBRATE_DELAY_ON_SOURCE' : 'CHECK', 'OBSERVE_CHECK_SOURCE#ON_SOURCE' : 'CHECK', 'OBSERVE_CHECK_SOURCE.ON_SOURCE' : 'CHECK', 'OBSERVE_CHECK_SOURCE_ON_SOURCE' : 'CHECK', 'OBSERVE_TARGET#ON_SOURCE' : 'TARGET', 'OBSERVE_TARGET.ON_SOURCE' : 'TARGET', 'OBSERVE_TARGET_ON_SOURCE' : 'TARGET', 'OBSERVE_TARGET#OFF_SOURCE' : 'REFERENCE', 'OBSERVE_TARGET.OFF_SOURCE' : 'REFERENCE', 'OBSERVE_TARGET_OFF_SOURCE' : 'REFERENCE' } def __init__(self, state_id: int, obs_mode: str) -> None: """ Initialize a StateALMA object. Args: state_id: Numerical identifier of this State. obs_mode: Unique obs_mode values associated with this State. """ super().__init__(state_id, obs_mode) if 'CALIBRATE_FLUX' in obs_mode: LOG.trace('Translating %s intent to AMPLITUDE for state #%s' '' % (obs_mode, state_id)) class StateALMACycle0(StateALMA): """State representation for ALMA Cycle 0 measurement sets. Extends StateALMA with workarounds for mislabeled Cycle 0 data, including removal of spurious PHASE intents when co-existing with BANDPASS or AMPLITUDE intents. Attributes: id: Numerical identifier of this State. obs_mode: Unique obs_mode values associated with this State. """ # Check whether these states co-exist with PHASE _PHASE_BYPASS_INTENTS = frozenset(('BANDPASS', 'AMPLITUDE')) def __init__(self, state_id: int, obs_mode: str) -> None: """ Initialize a StateALMACycle0 object. Args: state_id: Numerical identifier of this State. obs_mode: Unique obs_mode values associated with this State. """ super().__init__(state_id, obs_mode) # For Cycle 0, check whether this state has PHASE and another cal # intent. If so, the PHASE obsmode will be removed. # First collect the intents using the raw obsmodes recorded in the # state table.. intents = self.intents # .. and test to see if any of these intents require phase removal has_bypass_intent = intents.isdisjoint(StateALMACycle0._PHASE_BYPASS_INTENTS) # if so, and PHASE is indeed included as an intent, .. if 'PHASE' in intents and not has_bypass_intent: LOG.info('Cycle 0 mislabeled data workaround: removing PHASE ' 'intent for State %s' % self.id) # .. find the obs_mode(s) responsible for the addition of the # phase intent.. phase_obs_modes = [k for k, v in self.obs_mode_mapping.items() if v == 'PHASE'] # and remove them from the obsmodes we should register dephased_obs_modes = [m for m in obs_mode.split(',') if m not in phase_obs_modes] LOG.trace('Before: %s' % self.__repr__()) # .. so that in resetting this object's obs_modes to the # corrected value, we remove the registration of the pipeline # PHASE intent self.obs_mode = ','.join(dephased_obs_modes) LOG.trace('After: %s' % self.__repr__()) class StateVLA(State): """State representation for VLA Observatory measurement sets. Extends State with VLA-specific obs_mode to pipeline intent mappings. Attributes: id: Numerical identifier of this State. obs_mode: Unique obs_mode values associated with this State. """ # dictionary to map from STATE table obs_mode to pipeline intent obs_mode_mapping = { 'CALIBRATE_POLARIZATION#ON_SOURCE' : 'POLARIZATION', 'CALIBRATE_POLARIZATION.ON_SOURCE' : 'POLARIZATION', 'CALIBRATE_POLARIZATION_ON_SOURCE' : 'POLARIZATION', 'CALIBRATE_POLARIZATION#UNSPECIFIED' : 'POLARIZATION', 'CALIBRATE_POL_ANGLE#ON_SOURCE' : 'POLANGLE', 'CALIBRATE_POL_ANGLE.ON_SOURCE' : 'POLANGLE', 'CALIBRATE_POL_ANGLE_ON_SOURCE' : 'POLANGLE', 'CALIBRATE_POL_ANGLE#UNSPECIFIED' : 'POLANGLE', 'CALIBRATE_POL_LEAKAGE#ON_SOURCE' : 'POLLEAKAGE', 'CALIBRATE_POL_LEAKAGE.ON_SOURCE' : 'POLLEAKAGE', 'CALIBRATE_POL_LEAKAGE_ON_SOURCE' : 'POLLEAKAGE', 'CALIBRATE_POL_LEAKAGE#UNSPECIFIED' : 'POLLEAKAGE', 'CALIBRATE_BANDPASS#ON_SOURCE' : 'BANDPASS', 'CALIBRATE_BANDPASS.ON_SOURCE' : 'BANDPASS', 'CALIBRATE_BANDPASS_ON_SOURCE' : 'BANDPASS', 'CALIBRATE_AMPLI#ON_SOURCE' : 'PHASE', # Was amplitude 'CALIBRATE_AMPLI.ON_SOURCE' : 'PHASE', # Was amplitude 'CALIBRATE_AMPLI_ON_SOURCE' : 'PHASE', # Was amplitude 'CALIBRATE_FLUX#ON_SOURCE' : 'AMPLITUDE', 'CALIBRATE_FLUX.ON_SOURCE' : 'AMPLITUDE', 'CALIBRATE_FLUX_ON_SOURCE' : 'AMPLITUDE', 'CALIBRATE_PHASE#ON_SOURCE' : 'PHASE', 'CALIBRATE_PHASE.ON_SOURCE' : 'PHASE', 'CALIBRATE_PHASE_ON_SOURCE' : 'PHASE', 'CALIBRATE_TARGET#ON_SOURCE' : 'TARGET', 'CALIBRATE_TARGET.ON_SOURCE' : 'TARGET', 'CALIBRATE_TARGET_ON_SOURCE' : 'TARGET', 'CALIBRATE_POINTING#ON_SOURCE' : 'POINTING', 'CALIBRATE_POINTING.ON_SOURCE' : 'POINTING', 'CALIBRATE_POINTING_ON_SOURCE' : 'POINTING', 'CALIBRATE_FOCUS#ON_SOURCE' : 'FOCUS', 'CALIBRATE_FOCUS.ON_SOURCE' : 'FOCUS', 'CALIBRATE_FOCUS_ON_SOURCE' : 'FOCUS', 'CALIBRATE_WVR#ON_SOURCE' : 'WVR', 'CALIBRATE_WVR.ON_SOURCE' : 'WVR', 'CALIBRATE_WVR_ON_SOURCE' : 'WVR', 'CALIBRATE_ATMOSPHERE#ON_SOURCE' : 'ATMOSPHERE', 'CALIBRATE_ATMOSPHERE.ON_SOURCE' : 'ATMOSPHERE', 'CALIBRATE_ATMOSPHERE_ON_SOURCE' : 'ATMOSPHERE', 'CALIBRATE_SIDEBAND_RATIO#ON_SOURCE' : 'SIDEBAND', 'CALIBRATE_SIDEBAND_RATIO.ON_SOURCE' : 'SIDEBAND', 'CALIBRATE_SIDEBAND_RATIO_ON_SOURCE' : 'SIDEBAND', 'OBSERVE_TARGET#ON_SOURCE' : 'TARGET', 'OBSERVE_TARGET.ON_SOURCE' : 'TARGET', 'OBSERVE_TARGET_ON_SOURCE' : 'TARGET', 'OBSERVE_TARGET#UNSPECIFIED' : 'TARGET', 'OBSERVE_TARGET#OFF_SOURCE' : 'REFERENCE', 'OBSERVE_TARGET.OFF_SOURCE' : 'REFERENCE', 'OBSERVE_TARGET_OFF_SOURCE' : 'REFERENCE', 'CALIBRATE_BANDPASS#UNSPECIFIED' : 'BANDPASS', 'CALIBRATE_FLUX#UNSPECIFIED' : 'AMPLITUDE', 'CALIBRATE_PHASE#UNSPECIFIED' : 'PHASE', 'CALIBRATE_AMPLI#UNSPECIFIED' : 'PHASE', # Was amplitude 'UNSPECIFIED#UNSPECIFIED' : 'UNSPECIFIED#UNSPECIFIED', 'SYSTEM_CONFIGURATION' : 'SYSTEM_CONFIGURATION', 'SYSTEM_CONFIGURATION#UNSPECIFIED' : 'SYSTEM_CONFIGURATION' } def __init__(self, state_id: int, obs_mode: str) -> None: """ Initialize a StateVLA object. Args: state_id: Numerical identifier of this State. obs_mode: Unique obs_mode values associated with this State. """ super().__init__(state_id, obs_mode) class StateAPEX(State): """State representation for APEX Observatory measurement sets. Extends State with APEX-specific obs_mode to pipeline intent mappings. Attributes: id: Numerical identifier of this State. obs_mode: Unique obs_mode values associated with this State. """ # dictionary to map from STATE table obs_mode to pipeline intent obs_mode_mapping = { 'OBSERVE_TARGET#ON_SOURCE': 'TARGET' } def __init__(self, state_id: int, obs_mode: str) -> None: """ Initialize a StateAPEX object. Args: state_id: Numerical identifier of this State. obs_mode: Unique obs_mode values associated with this State. """ super().__init__(state_id, obs_mode) class StateSMT(State): """State representation for SMT Observatory measurement sets. Extends State with SMT-specific obs_mode to pipeline intent mappings. Attributes: id: Numerical identifier of this State. obs_mode: Unique obs_mode values associated with this State. """ # dictionary to map from STATE table obs_mode to pipeline intent obs_mode_mapping = { 'OBSERVE_TARGET#ON_SOURCE': 'TARGET' } def __init__(self, state_id: int, obs_mode: str) -> None: """ Initialize a StateSMT object. Args: state_id: Numerical identifier of this State. obs_mode: Unique obs_mode values associated with this State. """ super().__init__(state_id, obs_mode) class StateNAOJ(State): """State representation for Nobeyama or ASTE Observatory measurement sets. Extends State with NAOJ-specific obs_mode to pipeline intent mappings. Attributes: id: Numerical identifier of this State. obs_mode: Unique obs_mode values associated with this State. """ # dictionary to map from STATE table obs_mode to pipeline intent obs_mode_mapping = { 'OBSERVE_TARGET#ON_SOURCE' : 'TARGET', 'OBSERVE_TARGET#OFF_SOURCE' : 'REFERENCE', 'CALIBRATE_ATMOSPHERE#R_SOURCE' : 'ATMOSPHERE', 'CALIBRATE_ATMOSPHERE#SKY_SOURCE' : 'ATMOSPHERE', 'CALIBRATE_ATMOSPHERE#ZERO_SOURCE' : 'ATMOSPHERE' } def __init__(self, state_id: int, obs_mode: str) -> None: """ Initialize a StateNAOJ object. Args: state_id: Numerical identifier of this State. obs_mode: Unique obs_mode values associated with this State. """ super().__init__(state_id, obs_mode) class StateFactory: """Factory for creating observatory-specific State objects. Creates the appropriate State subclass based on the observatory name and observation start time. """ def __init__(self, observatory: str, start: datetime.datetime | None = None) -> None: """ Initialize a StateFactory object. Args: observatory: name of observatory to create State(s) for. start: start time of observation / measurement set; this is used to distinguish between Cycle 0 and later ALMA datasets. """ if observatory == 'ALMA': if start and start < datetime.datetime(2013, 1, 21, tzinfo=datetime.timezone.utc): self._constructor = StateALMACycle0 else: self._constructor = StateALMA elif observatory == 'VLA' or observatory == 'EVLA': self._constructor = StateVLA elif observatory == 'APEX': self._constructor = StateAPEX elif observatory == 'SMT': self._constructor = StateSMT elif observatory == 'NRO' or observatory == 'ASTE': self._constructor = StateNAOJ else: raise KeyError('%s has no matching State class' % observatory) def create_state(self, state_id: int, obs_mode: str) -> State: """Return a State object with given ID and obs_mode.""" return self._constructor(state_id, obs_mode)