Source code for pipeline.hsdn.tasks.exportdata.exportdata

"""
Single-Dish Exportdata task dedicated to NRO data.

Please see hsd/tasks/exportdata/exportdata.py for generic
description on how Exportdata task works.
"""
from __future__ import annotations

import collections
import os
from typing import TYPE_CHECKING

import pipeline.h.tasks.exportdata.exportdata as exportdata
import pipeline.hsd.tasks.exportdata.exportdata as sdexportdata
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
from pipeline.infrastructure import task_registry
from pipeline.hsdn.tasks.importdata.importdata import NROImportDataResults
from pipeline.hsdn.tasks.restoredata.restoredata import NRORestoreDataResults
from . import manifest
from . import nrotemplategenerator

# the logger for this module
LOG = infrastructure.logging.get_logger(__name__)

if TYPE_CHECKING:
    from pipeline.infrastructure.launcher import Context
    from pipeline.infrastructure.project import ProjectStructure


class NROPipelineNameBuilder(exportdata.PipelineProductNameBuilder):
    """Name(str) building utility methods class for NRO Pipeline.

    Methods in this class overrides those in PipelineProductNameBuilder and
    constructs names (paths) of products in the absence of valid OUS status ID
    nor session name in Nobeyama datasets."""

    @classmethod
    def _build_from_oussid(self, basename: str, ousstatus_entity_id: str | None=None,
                           output_dir: str | None=None) -> str:
        """Build a string for use as path.

        Args:
            basename : base name of path
            ousstatus_entity_id : OUS Status ID. not in use.
            output_dir : output directory path

        Returns:
            path string
        """
        return self._join_dir(basename, output_dir)

    @classmethod
    def _build_from_ps_oussid(self, basename: str,
                              project_structure: ProjectStructure | None=None,
                              ousstatus_entity_id: str | None=None,
                              output_dir: str | None=None) -> str:
        """Build a string for use as path.

        Args:
            basename : base name of path
            project_structure : project structure element, not in use.
            ousstatus_entity_id : OUS Status ID, not in use.
            output_dir : output directory path

        Returns:
            path string
        """
        return self._join_dir(basename, output_dir)

    @classmethod
    def _build_from_oussid_session(self, basename: str,
                                   ousstatus_entity_id: str | None=None,
                                   session_name: str | None=None,
                                   output_dir: str | None=None):
        """Build a string for use as path.

        Args:
            basename : base name of path
            ousstatus_entity_id : OUS Status ID, not in use.
            session_name : session name, not in use.
            output_dir : output directory path

        Returns:
            path string
        """
        return self._join_dir(basename, output_dir)


class NROExportDataInputs(sdexportdata.SDExportDataInputs):
    """Inputs class for NROExportData.

    Inputs class must be separated per task class even if
    it's effectively the same."""

    pass


[docs] @task_registry.set_equivalent_casa_task('hsdn_exportdata') @task_registry.set_casa_commands_comment('The output data products are computed.') class NROExportData(sdexportdata.SDExportData): """A class for exporting Nobeyama data to the products subdirectory. It performs the following additional operations after SDExportData does: - Export reduction template in a Python file - Export nroscale CSV file """ Inputs = NROExportDataInputs NameBuilder = NROPipelineNameBuilder
[docs] def prepare(self) -> exportdata.ExportDataResults: """Prepare and execute an export data job appropriate to the task inputs. Returns: object of exportdata.ExportDataResults """ results = super().prepare() # manifest file manifest_file = os.path.join(self.inputs.context.products_dir, results.manifest) # export NRO data reduction template template_script = \ self._export_reduction_template(self.inputs.products_dir) if template_script is not None: self._update_manifest(manifest_file, script=template_script) # export NRO scaling file template template_file = \ self._export_nroscalefile_template(self.inputs.products_dir) if template_file is not None: self._update_manifest(manifest_file, scalefile=template_file) return results
def _export_reduction_template(self, products_dir: str) -> str: """Export reduction template script. Exports rebase_and_image.py. It is a Python script to perform baseline subtraction and imaging by CASA tasks. Args: products_dir : product directory path Returns: script name """ script_name = 'rebase_and_image.py' config_name = 'rebase_and_image_config.py' script_path = os.path.join(products_dir, script_name) config_path = os.path.join(products_dir, config_name) status = nrotemplategenerator.generate_script(self.inputs.context, script_path, config_path) return script_name if status is True else None def _export_nroscalefile_template(self, products_dir: str) -> str: """Export nroscale CSV file. Args: products_dir : product directory path Returns: file name of nroscalefile, default:'nroscalefile.csv' """ datafile_name = 'nroscalefile.csv' datafile_path = os.path.join(products_dir, datafile_name) status = nrotemplategenerator.generate_csv(self.inputs.context, datafile_path) return datafile_name if status is True else None def _update_manifest(self, manifest_file: str, script: str=None, scalefile: str=None): """Add Nobeyama specific products to manifest file. Args: manifest_file : manifest file path script : the name of template reduction script scalefile : the name of NRO scale file """ pipemanifest = manifest.NROPipelineManifest('') pipemanifest.import_xml(manifest_file) ouss = pipemanifest.get_ous() if script: pipemanifest.add_reduction_script(ouss, script) pipemanifest.write(manifest_file) if scalefile: pipemanifest.add_scalefile(ouss, scalefile) pipemanifest.write(manifest_file) def _export_casa_restore_script(self, context: Context, script_name: str, products_dir: str, oussid: str, vislist: list[str], session_list: list[str]) -> str: """Generate and export CASA restore script. Args: context : pipeline context script_name : Name of the restore script products_dir : Name of the product directory oussid : OUS Status ID vislist : a list of MeasurementSet names session_list : a list of session id Returns: path of output CASA script file """ tmpvislist = list(map(os.path.basename, vislist)) restore_task_name = 'hsdn_restoredata' hm_rasterscan = self._get_hm_rasterscan_value(context) args = collections.OrderedDict(vis=tmpvislist, reffile='./nroscalefile.csv', hm_rasterscan=hm_rasterscan) return self._export_casa_restore_script_template(context, script_name, products_dir, oussid, restore_task_name, args) def _get_hm_rasterscan_value(self, context: Context) -> str: """Retrieve hm_rasterscan value from Results object. This method checks if either NROImportDataResults or NRORestoreDataResults object is registered to Pipeline context. If exists, hm_rasterscan value is retrieved from the results object. If no NROImportDataResults nor NRORestoreDataResults object is registered, 'time' will be returned. Args: context: Pipeline context Returns: The hm_rasterscan value. """ results_filter = filter( lambda x: isinstance(x, basetask.ResultsList) and isinstance(x[0], (NROImportDataResults, NRORestoreDataResults)), map(lambda x: x.read(), context.results) ) importdata_results = next(results_filter, None) if importdata_results: hm_rasterscan = importdata_results.inputs.get('hm_rasterscan', 'time') else: hm_rasterscan = 'time' return hm_rasterscan