Source code for pipeline.infrastructure.eventbus.events

"""
The events module contains classes representing various stages in the
lifecycle of a pipeline reduction.

Events are published on a topic. The implementing class defines at a minimum
the event topic. The class may also define additional attributes identifying
and/or describing the event. These extra metadata can be interpreted by the
event listeners downstream.
"""


class Event:
    """
    Base class inherited by all pipeline events.
    """
    topic = ''


class ContextLifecycleEvent(Event):
    """
    Base class for events related to the lifecycle of the pipeline context.
    """
    topic = 'lifecycle.context'

    def __init__(self, context_name=None, output_dir=None):
        super().__init__()
        # Output directory should be set but can be an empty string.
        if output_dir is None:
            raise ValueError(f'output_dir unspecified when creating {self.__class__.__name__}')
        self.output_dir = output_dir
        if not context_name:
            raise ValueError(f'context_name unspecified when creating {self.__class__.__name__}')
        self.context_name = context_name


[docs] class ContextCreatedEvent(ContextLifecycleEvent): """ Emitted when a pipeline Context is created. """ topic = 'lifecycle.context.created' def __init__(self, context_name=None, output_dir=None): super().__init__(context_name=context_name, output_dir=output_dir)
[docs] class ContextResumedEvent(ContextLifecycleEvent): """ Emitted when a pipeline Context is resumed. """ topic = 'lifecycle.context.resumed' def __init__(self, context_name=None, output_dir=None): super().__init__(context_name=context_name, output_dir=output_dir)
class TaskLifecycleEvent(Event): """ Base class for events related to the pipeline task lifecycle. """ topic = 'lifecycle.task' def __init__(self, context_name, stage_number, state): self.context_name = context_name self.stage_number = stage_number self.state = state class TaskStartedEvent(TaskLifecycleEvent): """ Emitted when a pipeline Task is started. """ topic = 'lifecycle.task.started' def __init__(self, context_name, stage_number): super().__init__(context_name, stage_number, 'started') class TaskCompleteEvent(TaskLifecycleEvent): """ Emitted when a pipeline Task completes. """ topic = 'lifecycle.task.complete' def __init__(self, context_name, stage_number): super().__init__(context_name, stage_number, 'complete') class TaskAbnormalExitEvent(TaskLifecycleEvent): """ Emitted when a pipeline Task terminates due to an unexpected error. """ topic = 'lifecycle.task.aborted' def __init__(self, context_name, stage_number): super().__init__(context_name, stage_number, 'abnormal exit') class WebLogLifecycleEvent(Event): """ Base class for events related to weblog generation. This class represents the overall lifecycle encompassing generation of all weblog artifacts (all plots, all stages, etc.). """ topic = 'lifecycle.weblog' def __init__(self, context_name, state): super().__init__() self.context_name = context_name self.state = state class WebLogRenderingStartedEvent(WebLogLifecycleEvent): """ Emitted when weblog rendering begins. """ topic = 'lifecycle.weblog.rendering.started' def __init__(self, context_name): super().__init__(context_name, 'started') class WebLogRenderingCompleteEvent(WebLogLifecycleEvent): """ Emitted when weblog rendering ends. """ topic = 'lifecycle.weblog.rendering.complete' def __init__(self, context_name): super().__init__(context_name, 'complete') class WebLogStageLifecycleEvent(WebLogLifecycleEvent): """ Base class for events related to weblog rendering of a specific stage. """ topic = 'lifecycle.weblog.stage' def __init__(self, context_name, stage_number, state): super().__init__(context_name, state) self.stage_number = stage_number class WebLogStageRenderingStartedEvent(WebLogStageLifecycleEvent): """ Emitted when stage rendering begins. """ topic = 'lifecycle.weblog.stage.rendering.started' def __init__(self, context_name, stage_number): super().__init__(context_name, stage_number, 'started') class WebLogStageRenderingCompleteEvent(WebLogStageLifecycleEvent): """ Emitted when stage rendering ends. """ topic = 'lifecycle.weblog.stage.rendering.complete' def __init__(self, context_name, stage_number): super().__init__(context_name, stage_number, 'complete') class WebLogStageRenderingAbnormalExitEvent(WebLogStageLifecycleEvent): """ Emitted when stage rendering exits due to error. """ topic = 'lifecycle.weblog.stage.rendering.aborted' def __init__(self, context_name, stage_number): super().__init__(context_name, stage_number, 'abnormal exit') class ResultLifecycleEvent(Event): """ Base class for events related to the acceptance of a pipeline Result. """ topic = 'lifecycle.result' def __init__(self, context_name, stage_number, state): self.context_name = context_name self.stage_number = stage_number self.state = state class ResultAcceptingEvent(ResultLifecycleEvent): """ Emitted when a Result is accepted into the Context. """ topic = 'lifecycle.result.accepting' def __init__(self, context_name, stage_number): super().__init__(context_name, stage_number, 'accepting') class ResultAcceptedEvent(ResultLifecycleEvent): """ Emitted when a Result has been successfully incorporated into the Context. """ topic = 'lifecycle.result.accepted' def __init__(self, context_name, stage_number): super().__init__(context_name, stage_number, 'accepted') class ResultAcceptErrorEvent(TaskLifecycleEvent): """ Emitted when Results acceptance fails due to error. """ topic = 'lifecycle.result.error' def __init__(self, context_name, stage_number): super().__init__(context_name, stage_number, 'error')