Source code for pipeline.hifv.tasks.exportvlassdata.exportvlassdata

import collections
import errno
import fnmatch
import os
import shutil
import tarfile
import re
import traceback

import numpy as np

import xml.etree.ElementTree as eltree
import astropy.io.fits as apfits

import pipeline as pipeline
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline import environment
from pipeline.h.tasks.common import manifest
from pipeline.h.tasks.exportdata import exportdata
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry
from pipeline.infrastructure import utils
from pipeline.domain import DataType

from pipeline.infrastructure.utils import predict_kernel

LOG = infrastructure.get_logger(__name__)

StdFileProducts = collections.namedtuple('StdFileProducts', 'ppr_file weblog_file casa_commands_file casa_pipescript parameterlist')

def atoi(text):
    return int(text) if text.isdigit() else text


def natural_keys(text):
    """
    alist.sort(key=natural_keys) sorts in human order
    http://nedbatchelder.com/blog/200712/human_sorting.html
    (See Toothy's implementation in the comments)
    https://stackoverflow.com/questions/5967500/how-to-correctly-sort-a-string-with-a-number-inside
    """
    return [ atoi(c) for c in re.split(r'(\d+)', text) ]


class ExportvlassdataResults(basetask.Results):
    def __init__(self, final=[], pool=[], preceding=[]):
        super().__init__()
        self.pool = pool[:]
        self.final = final[:]
        self.preceding = preceding[:]
        self.error = set()

    def __repr__(self):
        return 'ExportvlassdataResults:'


class ExportvlassdataInputs(exportdata.ExportDataInputs):
    gainmap = vdp.VisDependentProperty(default=False)
    processing_data_type = [DataType.REGCAL_CONTLINE_ALL, DataType.RAW]

    # docstring and type hints: supplements hifv_exportvlassdata
    def __init__(self, context, output_dir=None, session=None, vis=None, exportmses=None, pprfile=None, calintents=None,
                 calimages=None, targetimages=None, products_dir=None, gainmap=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            output_dir: Output directory.
                Defaults to None, which corresponds to the current working directory.

            session:

            vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the hifv_importdata task.

            exportmses:

            pprfile:

            calintents:

            calimages:

            targetimages:

            products_dir:

            gainmap:

        """
        super().__init__(context, output_dir=output_dir, session=session, vis=vis,
                                                    exportmses=exportmses, pprfile=pprfile, calintents=calintents,
                                                    calimages=calimages, targetimages=targetimages,
                                                    products_dir=products_dir)
        self.gainmap = gainmap


class VlassPipelineManifest(manifest.PipelineManifest):
    """VLASS-specific PipelineManifest class."""

    @staticmethod
    def add_reimaging_resources(ous, reimaging_resources):
        """Add the reimaging_resources tarball to the OUS element."""
        eltree.SubElement(ous, "reimaging_resources", name=reimaging_resources)

    @staticmethod
    def add_parameter_list(ous, parameter_list):
        """Add a list of hif_editimlist parameter files to the OUS element."""
        for parameter_filename in parameter_list:
            eltree.SubElement(ous, "parameter_list", name=parameter_filename)


[docs] @task_registry.set_equivalent_casa_task('hifv_exportvlassdata') class Exportvlassdata(basetask.StandardTaskTemplate): Inputs = ExportvlassdataInputs is_multi_vis_task = True NameBuilder = exportdata.PipelineProductNameBuilder def __init__(self, inputs): super().__init__(inputs) self.selfcaltable_list: list[str] = [] self.flagversion_list: list[str] = []
[docs] def prepare(self): LOG.info("This Exportvlassdata class is running.") # Create a local alias for inputs, so we're not saying # 'self.inputs' everywhere inputs = self.inputs try: LOG.trace('Creating products directory: {!s}'.format(inputs.products_dir)) os.makedirs(inputs.products_dir) except OSError as exc: if exc.errno == errno.EEXIST: pass else: raise # Initialize the standard ous is string. oussid = inputs.context.get_oussid() # Define the results object result = ExportvlassdataResults() # Make the standard vislist and the sessions lists. session_list, session_names, session_vislists, vislist = self._make_lists(inputs.context, inputs.session, inputs.vis) # Export the standard per OUS file products # The pipeline processing request # A compressed tarfile of the weblog # The pipeline processing script # The CASA commands log recipe_name = self.get_recipename(inputs.context) if not recipe_name: prefix = oussid else: prefix = oussid + '.' + recipe_name stdfproducts = self._do_standard_ous_products(inputs.context, prefix, inputs.pprfile, inputs.output_dir, inputs.products_dir) if stdfproducts.ppr_file: result.pprequest = os.path.basename(stdfproducts.ppr_file) result.weblog = os.path.basename(stdfproducts.weblog_file) result.pipescript = os.path.basename(stdfproducts.casa_pipescript) result.commandslog = os.path.basename(stdfproducts.casa_commands_file) if stdfproducts.parameterlist: result.parameterlist = [os.path.basename(parameterlist) for parameterlist in stdfproducts.parameterlist] else: result.parameterlist = [] imlist = self.inputs.context.subimlist.get_imlist() img_mode = self.inputs.context.imaging_mode images_list = [] for imageitem in imlist: if imageitem['multiterm']: pbcor_image_name = imageitem['imagename'].replace('subim', 'pbcor.tt0.subim') rms_image_name = imageitem['imagename'].replace('subim', 'pbcor.tt0.rms.subim') image_bundle = [pbcor_image_name, rms_image_name] # PIPE-592: save VLASS SE alpha and alpha error images if type(img_mode) is str and img_mode.startswith('VLASS-SE-CONT'): alpha_image_name = imageitem['imagename'].replace('.image.subim', '.alpha.subim') alpha_image_error_name = imageitem['imagename'].replace('.image.subim', '.alpha.error.subim') image_bundle.extend([alpha_image_name, alpha_image_error_name]) # PIPE-1038: Adding new export products pbcor_image_name = imageitem['imagename'].replace('subim', 'pbcor.tt1.subim') rms_image_name = imageitem['imagename'].replace('subim', 'pbcor.tt1.rms.subim') image_bundle.extend([pbcor_image_name, rms_image_name]) # No FITS file created tt0_initial_models = utils.glob_ordered('*iter1.model.tt0*') tt0_initial_models.sort(key=natural_keys) tt0_initial_model_name = tt0_initial_models[0] # tt0_initial_model_name = imageitem['imagename'].replace('iter3.image.subim', 'iter1.model.tt0') # tt0_initial_model_name = tt0_initial_model_name.replace('s13', 's5') tt1_initial_models = utils.glob_ordered('*iter1.model.tt1*') tt1_initial_models.sort(key=natural_keys) tt1_initial_model_name = tt1_initial_models[0] # tt1_initial_model_name = imageitem['imagename'].replace('iter3.image.subim', 'iter1.model.tt1') # tt1_initial_model_name = tt1_initial_model_name.replace('s13', 's5') # Create list for tar file self.initial_models = [tt0_initial_model_name, tt1_initial_model_name] # No FITS files created tt0_final_model_name = imageitem['imagename'].replace('image.subim', 'model.tt0') tt1_final_model_name = imageitem['imagename'].replace('image.subim', 'model.tt1') # Create list for tar file self.final_models = [tt0_final_model_name, tt1_final_model_name] else: pbcor_image_name = imageitem['imagename'].replace('subim', 'pbcor.subim') rms_image_name = imageitem['imagename'].replace('subim', 'pbcor.rms.subim') image_bundle = [pbcor_image_name, rms_image_name] images_list.extend(image_bundle) # Add masks for PIPE-1038 self.masks = [] if type(img_mode) is str and img_mode.startswith('VLASS-SE-CONT'): QLmasks = utils.glob_ordered('*.QLcatmask-tier1.mask') QLmasks.sort(key=natural_keys) QLmask = QLmasks[-1] secondmasks = utils.glob_ordered('*.secondmask.mask') secondmasks.sort(key=natural_keys) secondmask = secondmasks[-1] finalmasks = utils.glob_ordered('*.combined-tier2.mask') finalmasks.sort(key=natural_keys) finalmask = finalmasks[-1] # Create list for tar file self.masks = [QLmask, secondmask, finalmask] if type(img_mode) is str and img_mode.startswith('VLASS-SE-CUBE'): # PIPE-1434: split VLASS-SE-CUBE full-Stokes images into IQU and V. images_list = self._split_vlass_cube_stokes(images_list) fits_list = [] for image in images_list: fitsfile = os.path.join(inputs.products_dir, image + '.fits') # PIPE-1182: Strip stage number off exported image fits files # Look for "sX_Y.", where X and Y are one or more digits at the start of the image name pattern = r'^s\d+_\d*\.' mm = re.search(pattern, image) if mm: LOG.info(f'Removing "{mm.group()}" from "{image}" before exporting to FITS.') fitsfile = fitsfile.replace(mm.group(), '') task = casa_tasks.exportfits(imagename=image, fitsimage=fitsfile) self._executor.execute(task) LOG.info('Wrote {ff}'.format(ff=fitsfile)) fits_list.append(fitsfile) # PIPE-587/PIPE-641/PIPE-1134/PIPE-2423: apply position corrections for modes that don't use # aw-projection with nplane=32. if img_mode.startswith('VLASS-') and img_mode not in ('VLASS-SE-CONT-AWPHPG', 'VLASS-SE-CONT-AWPHPG-P032', 'VLASS-SE-CONT-AWP2', 'VLASS-SE-CONT-AWP2-P032', 'VLASS-SE-CONT', 'VLASS-SE-CONT-AWP', 'VLASS-SE-CONT-AWP-P032'): # Mean antenna geographic coordinates observatory = casa_tools.measures.observatory(self.inputs.context.project_summary.telescope) # Mean observing date start_time = self.inputs.context.observing_run.start_datetime end_time = self.inputs.context.observing_run.end_datetime mid_time = start_time + (end_time - start_time) / 2 mid_time = casa_tools.measures.epoch('utc', mid_time.isoformat()) # Correction utils.positioncorrection.do_wide_field_pos_cor(fitsfile, date_time=mid_time, obs_long=observatory['m0'], obs_lat=observatory['m1']) # Update FITS header self._fix_vlass_fits_header(self.inputs.context, fitsfile) # SE Cont imaging mode export for VLASS if type(img_mode) is str and img_mode.startswith('VLASS-SE-CONT'): reimaging_resources = self._export_reimaging_resources(inputs.context, inputs.products_dir, oussid) else: reimaging_resources = None # Export the pipeline manifest file # TBD Remove support for auxiliary data products to the individual pipelines pipemanifest = self._make_pipe_manifest(inputs.context, oussid, stdfproducts, {}, {}, [], fits_list, reimaging_resources, result.parameterlist) casa_pipe_manifest = self._export_pipe_manifest('pipeline_manifest.xml', inputs.products_dir, pipemanifest) result.manifest = os.path.basename(casa_pipe_manifest) # PIPE-1434: produce a set of VLASS-SE-CUBE sci. images in the common resolution/WCS. # Note that we use full-Stokes cubes for the commom beam calculation and image smoothing input. # Using V images might give slightly different results due to CAS-13799 and CAS-13827. if type(img_mode) is str and img_mode.startswith('VLASS-SE-CUBE'): # image_refimage_list: each element is a pair of image names (a, b) # a: the IQUV CASA image without position correction or header fixes # b: the IQU FITS image inside products/, with position corrections and header fixes applied. # We exlclude .rms. images here and start from the original IQUV CASA images. image_refimage_list = [(images_list[idx].replace('.IQU.iter', '.IQUV.iter'), fits_list[idx]) for idx, image in enumerate(images_list) if '.IQU.iter' in image and '.rms.' not in image] common_beam = self._get_common_beam([image_refimage[0] for image_refimage in image_refimage_list]) for imagename, refimagename in image_refimage_list: try: LOG.info('') LOG.info(f'start to smooth and regrid {imagename}, and split it into IQU and V.') self._smooth_and_regrid(imagename, refimagename, common_beam) except Exception as e: LOG.warning(f'Exception while getting the common beam image(s) for {imagename}: {e}') LOG.info(traceback.format_exc()) # Return the results object, which will be used for the weblog return result
def _smooth_and_regrid(self, imagename, ref_imagename, target_beam): """Perform the operation to get a sci image in the commom beam/WCS.""" cme = casa_tools.measures cqa = casa_tools.quanta with casa_tools.ImageReader(ref_imagename) as image: crval_corrected = image.coordsys().referencevalue(type='direction', format='n') dr_corrected = image.coordsys().referencevalue(type='direction', format='m') cdelt = np.degrees(np.abs(image.coordsys().increment()['numeric'][0:2]))*3600.0 pixdiag_arcsec = ((cdelt[0])**2+(cdelt[1])**2)**0.5 # pixel diagonal length in arcsec LOG.debug(f'pixel diagonal length: {pixdiag_arcsec} arcsec') with casa_tools.ImageReader(imagename) as image: # We checked the kernel size and position correction offeset again the below tolerenaces but # the results won't affect the smooth/regrid execution. pstol = pixdiag_arcsec*0.2 # the tolerance for considering the convolution kernel as a point-source sptol = pixdiag_arcsec*0.1 # the tolerance to consider regridding less meaningful beam = image.restoringbeam(channel=0, polarization=0) _, kn_code = predict_kernel(beam, target_beam, pstol=pstol) if kn_code: LOG.info(f'{imagename} already reaches the target beam, but it will still be passed on to ia.convolve2d().') else: LOG.info(f'{imagename} has a restoring beam of {beam} and will be smoothed to the target beam of {target_beam}.') image_smo = image.convolve2d(beam=target_beam, targetres=True, scale=-1, major='', minor='', pa='') cs_smo = image_smo.coordsys() cs_smo.setreferencevalue(crval_corrected, type='direction') image_smo.setcoordsys(cs_smo.torecord()) # now image_smo has the corrected crval cs_nominal = image.coordsys() crval_nominal = cs_nominal.referencevalue(type='direction', format='n') dr_nominal = cs_nominal.referencevalue(type='direction', format='m') LOG.info(f"crval1,2 [rad] = {crval_corrected['numeric']}from {ref_imagename}") LOG.info(f"crval1,2 [rad] = {crval_nominal['numeric']} from {imagename}") sep_arcsec = cqa.convert(cme.separation( dr_corrected['measure']['direction'], dr_nominal['measure']['direction']), 'arcsec')['value'] LOG.info( f'crval separation: {sep_arcsec} arcsec, with a regridding tolerence warning at {sptol} arcsec = 0.1 * pixdiag_size') image_rgd = image_smo.regrid(csys=cs_nominal.torecord(), overwrite=True, axes=[0, 1]) rgTool = casa_tools.regionmanager for stokes in ['IQU', 'V']: region = rgTool.frombcs(csys=image_rgd.coordsys().torecord(), shape=image_rgd.shape(), stokes=stokes, stokescontrol='a') fits_name = ref_imagename.replace('.IQU.iter', f'.{stokes}.iter').replace( '.fits', '.com.fits') # avoid per-plane beam in IQU CASA images if stokes == 'IQU': beam = image_rgd.restoringbeam() if 'beams' in beam: beam0 = beam['beams']['*0']['*0'] image_rgd.setrestoringbeam(remove=True) image_rgd.setrestoringbeam(beam=beam0) image_rgd.tofits(fits_name, region=region, overwrite=True) self._fix_vlass_fits_header(self.inputs.context, fits_name) LOG.info(f'write the commom beam/frame FITS image: {fits_name}') image_smo.done() image_rgd.done()
[docs] def analyse(self, results): return results
[docs] def get_recipename(self, context): """ Get the recipe name """ # Get the parent ous ousstatus name. This is the sanitized ous # status uid ps = context.project_structure if ps is None: recipe_name = '' elif ps.recipe_name == 'Undefined': recipe_name = '' else: recipe_name = ps.recipe_name return recipe_name
def _has_imaging_data(self, context, vis): """ Check if the given vis contains any imaging data. """ imaging_datatypes = [DataType.SELFCAL_CONTLINE_SCIENCE, DataType.REGCAL_CONTLINE_SCIENCE, DataType.SELFCAL_LINE_SCIENCE, DataType.REGCAL_LINE_SCIENCE] ms_object = context.observing_run.get_ms(name=vis) return any(ms_object.get_data_column(datatype) for datatype in imaging_datatypes) def _make_lists(self, context, session, vis, imaging=False): """ Create the vis and sessions lists """ # Force inputs.vis to be a list. vislist = vis if isinstance(vislist, str): vislist = [vislist, ] if imaging: vislist = [vis for vis in vislist if self._has_imaging_data(context, vis)] else: vislist = [vis for vis in vislist if not self._has_imaging_data(context, vis)] # Get the session list and the visibility files associated with # each session. session_list, session_names, session_vislists = self._get_sessions(context, session, vislist) return session_list, session_names, session_vislists, vislist def _do_standard_ous_products(self, context, oussid, pprfile, output_dir, products_dir): """ Generate the per ous standard products """ # Locate and copy the pipeline processing request. # There should normally be at most one pipeline processing request. # In interactive mode there is no PPR. ppr_files = self._export_pprfile(context, output_dir, products_dir, oussid, pprfile) if ppr_files != []: ppr_file = os.path.basename(ppr_files[0]) else: ppr_file = None # Export a tar file of the web log weblog_file = self._export_weblog(context, products_dir, oussid) # Export the processing log independently of the web log casa_commands_file = self._export_casa_commands_log(context, context.logs['casa_commands'], products_dir, oussid) # Export the processing script independently of the web log casa_pipescript = self._export_casa_script(context, context.logs['pipeline_script'], products_dir, oussid) # Export the parameter list independently of the weblog for the QL, SE-CONT, SE-CUBE Imaging recipes parameterlist_set = set() for result in context.results: result_meta = result.read() if hasattr(result_meta, 'pipeline_casa_task') and result_meta.pipeline_casa_task.startswith('hif_editimlist'): parameter_filename = result_meta.inputs['parameter_file'] if parameter_filename and parameter_filename not in parameterlist_set: parameterlist_set.add(parameter_filename) _ = self._export_parameterlist(context, parameter_filename, products_dir, oussid) # SE Cont imaging mode export for VLASS img_mode = self.inputs.context.imaging_mode if isinstance(img_mode, str) and img_mode.startswith('VLASS-SE-CONT'): # Identify self cal table selfcal_results: list | None = None for result in context.results: task_result = result.read() if task_result.taskname == 'hifv_selfcal': selfcal_results = task_result break if selfcal_results is not None: for selfcal_result in selfcal_results: if selfcal_result and selfcal_result.caltable and os.path.exists(selfcal_result.caltable): self.selfcaltable_list.append(selfcal_result.caltable) # Identify flagversion vis_list = [self.inputs.vis] if isinstance(self.inputs.vis, str) else self.inputs.vis self.flagversion_list: list[str] = [f'{os.path.basename(vis)}.flagversions' for vis in vis_list] return StdFileProducts(ppr_file, weblog_file, casa_commands_file, casa_pipescript, parameterlist_set) def _make_pipe_manifest(self, context, oussid, stdfproducts, sessiondict, visdict, calimages, targetimages, reimaging_resources, parameterlist): """Generate the manifest file.""" # Initialize the manifest document and the top level ous status. pipemanifest = self._init_pipemanifest(oussid) ouss = pipemanifest.set_ous(oussid) pipemanifest.add_casa_version(ouss, environment.casa_version_string) pipemanifest.add_pipeline_version(ouss, pipeline.revision) pipemanifest.add_procedure_name(ouss, context.project_structure.recipe_name) if stdfproducts.ppr_file: pipemanifest.add_pprfile(ouss, os.path.basename(stdfproducts.ppr_file), oussid) # Add the flagging and calibration products for session_name in sessiondict: session = pipemanifest.set_session(ouss, session_name) pipemanifest.add_caltables(session, sessiondict[session_name][1], session_name) for vis_name in sessiondict[session_name][0]: pipemanifest.add_asdm(session, vis_name, visdict[vis_name][0], visdict[vis_name][1], oussid) # Add a tar file of the web log pipemanifest.add_weblog(ouss, os.path.basename(stdfproducts.weblog_file), oussid) # Add the processing log independently of the web log pipemanifest.add_casa_cmdlog(ouss, os.path.basename(stdfproducts.casa_commands_file), oussid) # Add the processing script independently of the web log pipemanifest.add_pipescript(ouss, os.path.basename(stdfproducts.casa_pipescript), oussid) # Add the calibrator images pipemanifest.add_images(ouss, calimages, 'calibrator') # Add the target images pipemanifest.add_images(ouss, targetimages, 'target') # Add reimaging resources if reimaging_resources is not None: pipemanifest.add_reimaging_resources(ouss, reimaging_resources) # Add parameter list file(s) pipemanifest.add_parameter_list(ouss, parameterlist) return pipemanifest def _init_pipemanifest(self, oussid): """ Initialize the pipeline manifest """ pipemanifest = VlassPipelineManifest(oussid) return pipemanifest def _export_pprfile(self, context, output_dir, products_dir, oussid, pprfile): # Prepare the search template for the pipeline processing request file. # Was a template in the past # Forced to one file now but keep the template structure for the moment if pprfile == '': ps = context.project_structure if ps is None or ps.ppr_file == '': pprtemplate = None else: pprtemplate = os.path.basename(ps.ppr_file) else: pprtemplate = os.path.basename(pprfile) # Locate the pipeline processing request(s) and generate a list # to be copied to the data products directory. Normally there # should be only one match but if there are more copy them all. pprmatches = [] if pprtemplate is not None: for file in os.listdir(os.path.abspath(output_dir)): if fnmatch.fnmatch(file, pprtemplate): LOG.debug('Located pipeline processing request %s' % file) pprmatches.append(os.path.join(output_dir, file)) # Copy the pipeline processing request files. pprmatchesout = [] for file in pprmatches: if oussid: outfile = os.path.join(products_dir, oussid + '.pprequest.xml') else: outfile = file pprmatchesout.append(outfile) LOG.info('Copying pipeline processing file %s to %s' % (os.path.basename(file), os.path.basename(outfile))) shutil.copy(file, outfile) return pprmatchesout def _get_sessions(self, context, sessions, vis): """ Return a list of sessions where each element of the list contains the vis files associated with that session. In future this routine will be driven by the context but for now use the user defined sessions """ # If the input session list is empty put all the visibility files # in the same session. if len(sessions) == 0: wksessions = [] for visname in vis: session = context.observing_run.get_ms(name=visname).session wksessions.append(session) else: wksessions = sessions # Determine the number of unique sessions. session_seqno = 0 session_dict = {} for i in range(len(wksessions)): if wksessions[i] not in session_dict: session_dict[wksessions[i]] = session_seqno session_seqno = session_seqno + 1 # Initialize the output session names and visibility file lists session_names = [] session_vis_list = [] for key, value in sorted(session_dict.items(), key=lambda k_v: (k_v[1], k_v[0])): session_names.append(key) session_vis_list.append([]) # Assign the visibility files to the correct session for j in range(len(vis)): # Match the session names if possible if j < len(wksessions): for i in range(len(session_names)): if wksessions[j] == session_names[i]: session_vis_list[i].append(vis[j]) # Assign to the last session else: session_vis_list[len(session_names) - 1].append(vis[j]) # Log the sessions for i in range(len(session_vis_list)): LOG.info('Visibility list for session %s is %s' % (session_names[i], session_vis_list[i])) return wksessions, session_names, session_vis_list def _export_weblog(self, context, products_dir, oussid): """ Save the processing web log to a tarfile """ # Save the current working directory and move to the pipeline # working directory. This is required for tarfile IO cwd = os.getcwd() os.chdir(os.path.abspath(context.output_dir)) # Define the name of the output tarfile ps = context.project_structure tarfilename = self.NameBuilder.weblog(project_structure=ps, ousstatus_entity_id=oussid) # if ps is None: # tarfilename = 'weblog.tgz' # elif ps.ousstatus_entity_id == 'unknown': # tarfilename = 'weblog.tgz' # else: # tarfilename = oussid + '.weblog.tgz' LOG.info('Saving final weblog in %s' % tarfilename) # Create the tar file tar = tarfile.open(os.path.join(products_dir, tarfilename), "w:gz") tar.add(os.path.join(os.path.basename(os.path.dirname(context.report_dir)), 'html')) tar.close() # Restore the original current working directory os.chdir(cwd) return tarfilename def _export_reimaging_resources(self, context, products_dir, oussid): """ Tar up the reimaging resources for archiving (tarfile) """ # Save the current working directory and move to the pipeline # working directory. This is required for tarfile IO cwd = os.getcwd() os.chdir(os.path.abspath(context.output_dir)) # Define the name of the output tarfile tarfilename = 'reimaging_resources.tgz' LOG.info('Saving reimaging resources in %s...' % tarfilename) # Create the tar file tar = tarfile.open(os.path.join(products_dir, tarfilename), "w:gz") for mask in self.masks: tar.add(mask, mask) LOG.info('....Adding %s', mask) for initial_model in self.initial_models: tar.add(initial_model, initial_model) LOG.info('....Adding %s', initial_model) for final_model in self.final_models: tar.add(final_model, final_model) LOG.info('....Adding %s', final_model) n_selfcaltable = 0 for selfcaltable in self.selfcaltable_list: if os.path.exists(selfcaltable): tar.add(selfcaltable, selfcaltable) LOG.info('....Adding %s', selfcaltable) n_selfcaltable += 1 if n_selfcaltable == 0: LOG.warning('No selfcal table found to add to the reimaging resources tarball.') n_flagversion = 0 for flagversion in self.flagversion_list: if os.path.exists(flagversion): tar.add(flagversion, flagversion) LOG.info('....Adding %s', flagversion) n_flagversion += 1 else: LOG.warning('Flagversions directory %s not found; skipping.', flagversion) if n_flagversion == 0: LOG.warning('No flagversions directories found to add to the reimaging resources tarball.') tar.close() # Restore the original current working directory os.chdir(cwd) return tarfilename def _export_parameterlist(self, context, parameterlist_name, products_dir, oussid): """Export one parameter list file.""" out_parameterlist_file = os.path.join(products_dir, os.path.basename(parameterlist_name)) LOG.info('Copying parameter list file %s to %s' % (parameterlist_name, out_parameterlist_file)) shutil.copy(parameterlist_name, out_parameterlist_file) return os.path.basename(out_parameterlist_file) def _export_table(self, context, table_name, products_dir, oussid): """Save directories, either cal table or flag versions.""" table_file = table_name out_table_file = os.path.join(products_dir, table_file) LOG.info('Copying product from %s to %s' % (table_file, out_table_file)) shutil.copytree(table_file, out_table_file) return os.path.basename(out_table_file) def _export_casa_commands_log(self, context, casalog_name, products_dir, oussid): """Save the CASA commands file.""" ps = context.project_structure casalog_file = os.path.join(context.report_dir, casalog_name) out_casalog_file = self.NameBuilder.casa_script(casalog_name, project_structure=ps, ousstatus_entity_id=oussid, output_dir=products_dir) # if ps is None: # casalog_file = os.path.join(context.report_dir, casalog_name) # out_casalog_file = os.path.join(products_dir, casalog_name) # elif ps.ousstatus_entity_id == 'unknown': # casalog_file = os.path.join(context.report_dir, casalog_name) # out_casalog_file = os.path.join(products_dir, casalog_name) # else: # casalog_file = os.path.join(context.report_dir, casalog_name) # out_casalog_file = os.path.join(products_dir, oussid + '.' + casalog_name) LOG.info('Copying casa commands log %s to %s' % (casalog_file, out_casalog_file)) shutil.copy(casalog_file, out_casalog_file) return os.path.basename(out_casalog_file) def _export_casa_script(self, context, casascript_name, products_dir, oussid): """ Save the CASA script. """ ps = context.project_structure casascript_file = os.path.join(context.report_dir, casascript_name) out_casascript_file = self.NameBuilder.casa_script(casascript_name, project_structure=ps, ousstatus_entity_id=oussid, output_dir=products_dir) # if ps is None: # casascript_file = os.path.join(context.report_dir, casascript_name) # out_casascript_file = os.path.join(products_dir, casascript_name) # elif ps.ousstatus_entity_id == 'unknown': # casascript_file = os.path.join(context.report_dir, casascript_name) # out_casascript_file = os.path.join(products_dir, casascript_name) # else: # # ousid = ps.ousstatus_entity_id.translate(str.maketrans(':/', '__')) # casascript_file = os.path.join(context.report_dir, casascript_name) # out_casascript_file = os.path.join(products_dir, oussid + '.' + casascript_name) LOG.info('Copying casa script file %s to %s' % (casascript_file, out_casascript_file)) shutil.copy(casascript_file, out_casascript_file) return os.path.basename(out_casascript_file) def _export_pipe_manifest(self, manifest_name, products_dir, pipemanifest): """ Save the manifest file. """ out_manifest_file = os.path.join(products_dir, manifest_name) LOG.info('Creating manifest file %s' % out_manifest_file) pipemanifest.write(out_manifest_file) return out_manifest_file def _fix_vlass_fits_header(self, context, fitsname): """ Update VLASS FITS product header according to PIPE-641. Should be called in the following imaging modes: 'VLASS-QL', 'VLASS-SE-CONT-MOSAIC', 'VLASS-SE-CONT-AWP-P001', and 'VLASS-SE-CUBE'. The following keywords are changed: DATE-OBS, DATE-END, RADESYS, OBJECT. """ if os.path.exists(fitsname): # Open FITS image and obtain header hdulist = apfits.open(fitsname, mode='update') header = hdulist[0].header # DATE-OBS and DATE-END keywords # Note: the new DATE-OBS value (first scan start time) might differ from the original value # (first un-flagged scan start time). header['date-obs'] = (infrastructure.utils.get_epoch_as_datetime( context.observing_run.start_time).isoformat(), 'First scan started') date_end = ('date-end', infrastructure.utils.get_epoch_as_datetime( context.observing_run.end_time).isoformat(), 'Last scan finished') if 'date-end' in [k.lower() for k in header.keys()]: header['date-end'] = date_end[1:] else: pos = header.index('date-obs') header.insert(pos, date_end, after=True) # RADESYS if header['radesys'].upper() == 'FK5': header['radesys'] = 'ICRS' # OBJECT # We assume that the FITS name follows the convention described in PIPE-968 (minus the stage # prefixes) and directly extract the 'OBJECT' name (first FIELD name of the image) from it. filename_components = os.path.basename(fitsname).split('.') object_name = filename_components[4] if object_name != '' and header['object'].upper() != object_name.upper(): header['object'] = object_name # Save changes and inform log hdulist.flush() LOG.info("Header updated in {}".format(fitsname)) # Close FITS file hdulist.close() else: LOG.warning('FITS header cannot be updated: image {} does not exist.'.format(fitsname)) return def _split_vlass_cube_stokes(self, image_list): """Split full-Stokes images into the IQU and V images and return a new image list.""" new_image_list = [] for imagename in image_list: if '.IQUV.' in imagename: for stokes_select in ['IQU', 'V']: outfile = imagename.replace('.IQUV.', '.'+stokes_select+'.') job = casa_tasks.imsubimage(imagename, outfile=outfile, stokes=stokes_select, overwrite=True, dropdeg=False) self._executor.execute(job) LOG.info(f'Wrote {outfile}') new_image_list.append(outfile) else: new_image_list.append(imagename) return new_image_list def _get_common_beam(self, image_list): """Get the common beam from the supplied "cube" image list.""" myia = casa_tools.image mycs = casa_tools.casatools.coordsys() cs = mycs.newcoordsys(direction=True, spectral=True) myia.fromshape(shape=[128, 128, len(image_list)], csys=cs.torecord()) cs.done() for idx, imagename in enumerate(image_list): with casa_tools.ImageReader(imagename) as image: bm = image.restoringbeam(channel=0, polarization=0) LOG.info(f'set the restoring beam of {imagename} in chan={idx} of the multibeam image template.') myia.setrestoringbeam(beam=bm, channel=idx) beam_target = myia.commonbeam() # ia.commonbeam() returns the bpa value under the key 'pa' rather than 'positionangle', as it is # from ia.restoringbeam(). beam_target['positionangle'] = beam_target.pop('pa') myia.close() # PIPE-1434: increase the target beam from ia.commonbeam() by a small margin to avoid potential # failures of ia.convolve2d() when the convolution kernel is too small on the minor axis near the # numerical precision limit. beam_target['major']['value'] *= 1.00001 beam_target['minor']['value'] *= 1.00001 LOG.info(f'use {beam_target} as the target beam for the common-beam image set.') return beam_target