Source code for pipeline.hifv.tasks.fixpointing.fixpointing

import shutil
import os
import numpy as np
import math
from datetime import datetime, timezone

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import casa_tools, task_registry


LOG = infrastructure.get_logger(__name__)


def fixpointing_offset_vlass(vis, intable='POINTING', antlist=[], timeoffset=[0.45, 0.95], dofilter=False,
                             usetarget=True, dointerval=True, dolookahead=True, dodirectiononly=False):
    """
    Version 0.0  STM 2019-02-05 from KG fixpointing
    Version 1.0  STM 2019-02-06 extrapolate using single-diff derivative on TARGET
    Version 1.01 STM 2019-02-12 usetarget
    Version 1.1  STM 2019-02-21 include filtering on DIRECTION-TARGET, intable
    Version 1.2  STM 2019-04-10 option to correct only DIRECTION (based on TARGET)
    Version 1.2  STM 2019-05-03 inster st.close commands
    this function will apply time offsets to the pointing in the table for the selected antennas

    Example to correct the pointing on a subset of antennas, and to filter the encoder data:
    fixpointing_offset_vlass(vis='VLASS1.1.sb34523741.eb34557765.58027.86045872685.ms',antlist=[2,3,4,5,6,8,9,10,11,13,14,16,17,18,20,21,22,23,24,25])

    To also copy the TARGET column (after fixing) to DIRECTION:
    fixpointing_offset_vlass(vis='VLASS1.1.sb34523741.eb34557765.58027.86045872685.ms',antlist=[2,3,4,5,6,8,9,10,11,13,14,16,17,18,20,21,22,23,24,25],usetarget=True)

    To only copy the TARGET column (after fixing) to DIRECTION:
    fixpointing_offset_vlass(vis='VLASS1.1.sb34523741.eb34557765.58027.86045872685.ms',antlist=[],dofilter=False,dointerval=False,usetarget=True)

    Another example (different dataset)
    fixpointing_offset_vlass(vis='VLASS1.1.sb34901451.eb34995355.58154.327259618054.ms',antlist=[2,3,4,5,7,8,9,10,11,13,14,16,17,18,20,21,22,23,24])

    For data taken in VLASS1.2 onward without the timing offset to pointing, but for which you
    want to construct a valid POINTING table (using the TARGET column):
    fixpointing_offset_vlass(vis='TSKY0001.sb36243279.eb36252233.58521.307180902775_OTFM_TARGET.ms',dofilter=False,usetarget=True,dointerval=True)

    Note that backup versions of the existing POINTING table will be created when run.

    For VLASS1.1 the dates with sets of old ACUs are (from VLASS Memo 12):

    9/7/17-10/23/17
    ea03, ea04, ea05, ea06, ea07, ea09, ea10, ea11, ea12, ea13, ea15, ea16, ea18, ea19, ea20, ea22, ea23, ea24, ea25, ea26, ea27, ea28

    10/23/17-2/7/18
    ea03, ea04, ea05, ea06, ea07, ea09, ea10, ea11, ea12, ea13, ea15, ea16, ea18, ea19, ea20, ea22, ea23, ea24, ea25, ea26, ea27

    2/7/18-2/20/18
    ea03, ea04, ea05, ea06, ea09, ea10, ea11, ea12, ea13, ea15, ea16, ea18, ea19, ea20, ea22, ea23, ea24, ea25, ea26, ea27

    """
    r2d = 180.0 / np.pi
    datestring = datetime.now(timezone.utc).isoformat()
    if not os.path.exists(vis + '/POINTING_ORIG'):
        shutil.copytree(vis + '/POINTING', vis + '/POINTING_ORIG')
        LOG.info('Copying POINTING to POINTING_ORIG')
    else:
        backupname = 'POINTING_BACKUP_' + datestring
        shutil.copytree(vis + '/POINTING', vis + '/' + backupname)
        LOG.info('Copying POINTING to ' + backupname)
    if intable != 'POINTING':
        if os.path.exists(vis + '/' + intable):
            shutil.copytree(vis + '/' + intable, vis + '/POINTING')
            LOG.info('Copying ' + intable + ' to POINTING')
        else:
            LOG.error('ERROR: could not find intable = ' + vis + '/' + intable)
            raise NameError('InTable not found')

    with casa_tools.TableReader(vis + '/POINTING', nomodify=False) as tb:
        # tb.open(vis + '/POINTING', nomodify=False)
        ants = tb.getcol('ANTENNA_ID')
        ants = np.unique(ants)
        dref = 'J2000'
        eref = tb.getcolkeyword('TIME', 'MEASINFO')['Ref']
        #
        # Need to get the columns
        col = 'DIRECTION'
        colkeyw = tb.getcolkeyword(col, 'MEASINFO')
        pref = tb.getcolkeyword(col, 'MEASINFO')['Ref']
        tcol = 'TARGET'
        tcolkeyw = tb.getcolkeyword(tcol, 'MEASINFO')
        tref = tb.getcolkeyword(tcol, 'MEASINFO')['Ref']
        # colkeyw['Ref']=dref
        # tb.putcolkeyword(col, 'MEASINFO', colkeyw)
        casa_tools.measures.doframe(casa_tools.measures.observatory('VLA'))
        alltimes = tb.getcol('TIME')
        firsttime = alltimes[0]
        del (alltimes)
        #
        delta_time = 0.1  # expected DT in seconds
        integ_offset_x = int(timeoffset[0] / delta_time)
        integ_offset_y = int(timeoffset[1] / delta_time)
        if dodirectiononly:
            LOG.info('Will correct only the DIRECTION column (based on TARGET motions)')
        else:
            LOG.info('Will correct both TARGET and DIRECTION columns (based on TARGET motions)')
        if dolookahead:
            LOG.info('Will lookahead (%i,%i) integrations in (AZ,EL)' % (integ_offset_x, integ_offset_y))
        #
        if len(antlist) > 0:
            # Correct for timing offsets in pointing for these antennas
            for ant in antlist:
                st = tb.query('ANTENNA_ID==' + str(ant))
                nrows = st.nrows()
                LOG.info('Ant %i processing nrows = %i' % (ant, nrows))
                # print(st.nrows())
                ptimes = st.getcol('TIME')
                dt = np.diff(ptimes, 1)
                ntimes = len(ptimes)

                targ_orig = st.getcol(tcol)
                dirv_orig = st.getcol(col)

                sh = dirv_orig.shape
                targ = targ_orig.reshape((sh[0], sh[len(sh) - 1]))
                dirv = dirv_orig.reshape((sh[0], sh[len(sh) - 1]))

                x = targ[0, :]
                y = targ[1, :]
                dirx = dirv[0, :]
                diry = dirv[1, :]

                if not dolookahead:
                    dx = np.diff(x, 1)
                    dy = np.diff(y, 1)
                    dxdt = dx / dt
                    dydt = dy / dt

                xoff = np.zeros(ntimes)
                yoff = np.zeros(ntimes)
                xoff_good = 0.0
                yoff_good = 0.0
                # extrapolate derivatives, check for gaps
                i_lo = 0
                for i in range(ntimes - 1):
                    #
                    # Get derivative and extrapolate (original scheme)
                    i_up = i
                    if not dolookahead:
                        if dx[i] >= np.pi:
                            dx[i] = dx[i] - 2.0 * np.pi
                            dxdt[i] = dx[i] / dt[i]
                        elif dx[i] < -np.pi:
                            dx[i] = dx[i] + 2.0 * np.pi
                            dxdt[i] = dx[i] / dt[i]
                        if dy[i] >= np.pi:
                            dy[i] = dy[i] - 2.0 * np.pi
                            dydt[i] = dy[i] / dt[i]
                        elif dy[i] < -np.pi:
                            dy[i] = dy[i] + 2.0 * np.pi
                            dydt[i] = dy[i] / dt[i]
                        if dt[i] <= 1.0:
                            # 1sec or less gap to next time, use derivative
                            xoff[i] = dxdt[i] * timeoffset[0]
                            yoff[i] = dydt[i] * timeoffset[1]
                            xoff_good = xoff[i]
                            yoff_good = yoff[i]
                        else:
                            # more than 1sec gap to the next time, found end of a scan
                            # use best previous offset
                            xoff[i] = xoff_good
                            yoff[i] = yoff_good
                            i_lo = i + 1
                    else:
                        # lookahead a number of samples closer to offset (new scheme)
                        if dt[i] > 1.0 or i == (ntimes - 2):
                            # more than 1sec gap to the next time, found end of a scan
                            for j in range(i_lo, i):
                                j_offset_x = min(i, j + integ_offset_x)
                                del_t_x = ptimes[j_offset_x] - ptimes[j]
                                del_x = x[j_offset_x] - x[j]
                                del_x_del_t = del_x / del_t_x
                                xoff[j] = del_x_del_t * timeoffset[0]
                                #
                                j_offset_y = min(i, j + integ_offset_y)
                                del_t_y = ptimes[j_offset_y] - ptimes[j]
                                del_y = y[j_offset_y] - y[j]
                                del_y_del_t = del_y / del_t_y
                                yoff[j] = del_y_del_t * timeoffset[1]
                                #
                                xoff_good = xoff[j]
                                yoff_good = yoff[j]
                            # final point in scan
                            xoff[i] = xoff_good
                            yoff[i] = yoff_good
                            #
                            i_lo = i + 1
                # final entry
                xoff[ntimes - 1] = xoff_good
                yoff[ntimes - 1] = yoff_good
                ###
                if (len(dirv_orig.shape) == 3):
                    targ_orig[0, 0, :] += xoff
                    targ_orig[1, 0, :] += yoff
                    dirv_orig[0, 0, :] += xoff
                    dirv_orig[1, 0, :] += yoff
                else:
                    targ_orig[0, :] += xoff
                    targ_orig[1, :] += yoff
                    dirv_orig[0, :] += xoff
                    dirv_orig[1, :] += yoff
                st.putcol(col, dirv_orig)
                if not dodirectiononly:
                    st.putcol(tcol, targ_orig)
                #
                mad_xoff = np.median(np.absolute(xoff))
                mad_yoff = np.median(np.absolute(yoff))
                logstr = 'AntID %i : MAD CORRECTIONS AZ=%.3f EL=%.3f arcmin' % (
                ant, mad_xoff * r2d * 60.0, mad_yoff * r2d * 60.0)
                LOG.info(logstr)
                st.close()
        #
        if dofilter:
            n_sigma_thresh = 5.0
            n_sigma_thresh_ddiff = math.sqrt(1.5) * n_sigma_thresh
            n_sigma_thresh_diff = math.sqrt(2.0) * 2.0 * n_sigma_thresh
            max_rate_degpersec = 5.0
            max_rate = max_rate_degpersec * np.pi / 180.0
            # for all ants filter glitches in DIRECTION-TARGET
            for ant in ants:
                st = tb.query('ANTENNA_ID==' + str(ant))
                nrows = st.nrows()
                LOG.info('Ant %i glitch filtering nrows = %i' % (ant, nrows))
                # print(st.nrows())
                ptimes = st.getcol('TIME')
                dt = np.diff(ptimes, 1)
                ntimes = len(ptimes)

                targ_orig = st.getcol(tcol)
                dirv_orig = st.getcol(col)
                sh = dirv_orig.shape
                targ = targ_orig.reshape((sh[0], sh[len(sh) - 1]))
                dirv = dirv_orig.reshape((sh[0], sh[len(sh) - 1]))
                offs = dirv - targ

                offx_raw = offs[0, :]
                if offx_raw[0] >= np.pi:
                    offx_raw -= 2.0 * np.pi
                elif offx_raw[0] < -np.pi:
                    offx_raw += 2.0 * np.pi
                offx = np.unwrap(offx_raw)  # there seems to be a wrap issue in AZ
                offy = offs[1, :]

                mad_offx = np.median(np.absolute(offx))
                mad_offy = np.median(np.absolute(offy))
                logstr = 'AntID %i : MAD DIR-TARG AZ=%.3f EL=%.3f arcmin' % (
                ant, mad_offx * r2d * 60.0, mad_offy * r2d * 60.0)
                LOG.info(logstr)
                madrms_offx = 1.4826 * mad_offx
                madrms_offy = 1.4826 * mad_offy
                #
                doffx = np.diff(offx, 1)
                doffy = np.diff(offy, 1)
                logstr = 'AntID %i : will correct differences > AZ=%.3f EL=%.3f arcmin' % (
                ant, mad_offx * n_sigma_thresh * r2d * 60.0, mad_offy * n_sigma_thresh * r2d * 60.0)
                LOG.info(logstr)

                nfilter = 0
                nxfilter = 0
                nyfilter = 0
                # find scan breaks
                i_lo = 0
                for i in range(ntimes - 1):
                    # Filter
                    ixfilter = 0
                    iyfilter = 0
                    if dt[i] <= 1.0:
                        # 1sec or less gap to next time, use derivative
                        if i > i_lo:
                            # check if value discrepant from average of neighbors
                            # f_i - 0.5(f_i-1 + f_i+1) = 0.5*( off[i-1]-off[i])
                            ddiffx = 0.5 * abs(doffx[i - 1] - doffx[i])
                            if ddiffx >= n_sigma_thresh_ddiff * madrms_offx:
                                # deemed a "glitch", use average
                                offx[i] = 0.5 * (offx[i - 1] + offx[i + 1])
                                doffx[i - 1] = offx[i] + offx[i - 1]
                                doffx[i] = offx[i + 1] + offx[i]
                                ixfilter += 1
                            ddiffy = 0.5 * abs(doffy[i - 1] - doffy[i])
                            if ddiffy >= n_sigma_thresh_ddiff * madrms_offy:
                                # deemed a "glitch", use average
                                offy[i] = 0.5 * (offy[i - 1] + offy[i + 1])
                                doffy[i - 1] = offy[i] + offy[i - 1]
                                doffy[i] = offy[i + 1] + offy[i]
                                iyfilter += 1
                            max_rate_x = max_rate * dt[i - 1]  # prob shouldnt modify by cos(el)
                            if abs(doffx[i - 1]) > min(max_rate_x, n_sigma_thresh_diff * madrms_offx):
                                # check for unphysical changes in single diff
                                # choose the least discrepant
                                if abs(offx[i - 1]) < abs(offx[i]):
                                    offx[i] = offx[i - 1]
                                    doffx[i - 1] = 0.0
                                    ixfilter += 1
                                elif abs(offx[i]) < abs(offx[i - 1]):
                                    offx[i - 1] = offx[i]
                                    doffx[i - 1] = 0.0
                                    ixfilter += 1
                            max_rate_y = max_rate * dt[i - 1]
                            if abs(doffy[i - 1]) > min(max_rate_y, n_sigma_thresh_diff * madrms_offy):
                                # check for unphysical changes in single diff
                                # choose the least discrepant
                                if abs(offy[i - 1]) < abs(offy[i]):
                                    offy[i] = offy[i - 1]
                                    doffy[i - 1] = 0.0
                                    iyfilter += 1
                                elif abs(offy[i]) < abs(offy[i - 1]):
                                    offy[i - 1] = offy[i]
                                    doffy[i - 1] = 0.0
                                    iyfilter += 1
                                    #
                    else:
                        # more than 1sec gap to the next time, found end of a scan
                        if i > i_lo:
                            max_rate_x = max_rate * dt[i - 1]  # prob shouldnt modify by cos(el)
                            if abs(doffx[i - 1]) > min(max_rate_x, n_sigma_thresh_diff * madrms_offx):
                                # check for unphysical changes in single diff
                                # choose the least discrepant
                                if abs(offx[i - 1]) < abs(offx[i]):
                                    offx[i] = offx[i - 1]
                                    doffx[i - 1] = 0.0
                                    ixfilter += 1
                                elif abs(offx[i]) < abs(offx[i - 1]):
                                    offx[i - 1] = offx[i]
                                    doffx[i - 1] = 0.0
                                    ixfilter += 1
                            max_rate_y = max_rate * dt[i - 1]
                            if abs(doffy[i - 1]) > min(max_rate_y, n_sigma_thresh_diff * madrms_offy):
                                # check for unphysical changes in single diff
                                # choose the least discrepant
                                if abs(offy[i - 1]) < abs(offy[i]):
                                    offy[i] = offy[i - 1]
                                    doffy[i - 1] = 0.0
                                    iyfilter += 1
                                elif abs(offy[i]) < abs(offy[i - 1]):
                                    offy[i - 1] = offy[i]
                                    doffy[i - 1] = 0.0
                                    iyfilter += 1
                        i_lo = i + 1
                    #
                    # update counters for this i
                    if ixfilter > 0:
                        nxfilter += 1
                    if iyfilter > 0:
                        nyfilter += 1
                    if ixfilter > 0 or iyfilter > 0:
                        nfilter += 1

                # Final entry
                ixfilter = 0
                iyfilter = 0
                i = ntimes - 1
                if i > i_lo:
                    max_rate_x = max_rate * dt[i - 1]  # prob shouldnt modify by cos(el)
                    if abs(doffx[i - 1]) > min(max_rate_x, n_sigma_thresh_diff * madrms_offx):
                        # check for unphysical changes in single diff
                        # choose the least discrepant
                        if abs(offx[i - 1]) < abs(offx[i]):
                            offx[i] = offx[i - 1]
                            doffx[i - 1] = 0.0
                            ixfilter += 1
                        elif abs(offx[i]) < abs(offx[i - 1]):
                            offx[i - 1] = offx[i]
                            doffx[i - 1] = 0.0
                            ixfilter += 1
                    max_rate_y = max_rate * dt[i - 1]
                    if abs(doffy[i - 1]) > min(max_rate_y, n_sigma_thresh_diff * madrms_offy):
                        # check for unphysical changes in single diff
                        # choose the least discrepant
                        if abs(offy[i - 1]) < abs(offy[i]):
                            offy[i] = offy[i - 1]
                            doffy[i - 1] = 0.0
                            iyfilter += 1
                        elif abs(offy[i]) < abs(offy[i - 1]):
                            offy[i - 1] = offy[i]
                            doffy[i - 1] = 0.0
                            iyfilter += 1
                if ixfilter > 0:
                    nxfilter += 1
                if iyfilter > 0:
                    nyfilter += 1
                if ixfilter > 0 or iyfilter > 0:
                    nfilter += 1
                #
                LOG.info('Ant %i glitch filtering modified N events: tot=%i x=%i y=%i' % (ant, nfilter, nxfilter, nyfilter))
                ###
                # Put filtered DIRECTION back in POINTING table
                dirvx = targ[0, :] + offx
                dirvy = targ[1, :] + offy
                if (len(dirv_orig.shape) == 3):
                    dirv_orig[0, 0, :] = dirvx
                    dirv_orig[1, 0, :] = dirvy
                else:
                    dirv_orig[0, :] = dirvx
                    dirv_orig[1, :] = dirvy
                st.putcol(col, dirv_orig)
                #
                # Done with this ant
                st.close()
                #
                # Done with all ants

        if usetarget:
            # for all ants replace DIRECTION with TARGET
            for ant in ants:
                st = tb.query('ANTENNA_ID==' + str(ant))
                nrows = st.nrows()
                LOG.info('Ant %i replacing DIRECTION with TARGET for nrows = %i' % (ant, nrows))
                targ_orig = st.getcol(tcol)
                dirv_orig = st.getcol(col)
                st.putcol(col, targ_orig)
                #
                if dointerval:
                    LOG.info('Ant %i setting interval to -1 for nrows = %i' % (ant, nrows))
                    inter = st.getcol('INTERVAL')
                    inter[:] = -1.0
                    st.putcol('INTERVAL', inter)
                st.close()
        elif dointerval:
            for ant in ants:
                st = tb.query('ANTENNA_ID==' + str(ant))
                nrows = st.nrows()
                LOG.info('Ant %i setting interval to -1 for nrows = %i' % (ant, nrows))
                inter = st.getcol('INTERVAL')
                inter[:] = -1.0
                st.putcol('INTERVAL', inter)
                st.close()

        # tb.done()


class FixpointingResults(basetask.Results):
    def __init__(self):
        super().__init__()
        self.pipeline_casa_task = 'Fixpointing'

    def merge_with_context(self, context):
        """
        See :method:`~pipeline.infrastructure.api.Results.merge_with_context`
        """
        return

    def __repr__(self):
        #return 'FixpointingResults:\n\t{0}'.format(
        #    '\n\t'.join([ms.name for ms in self.mses]))
        return 'FixpointingResults:'


class FixpointingInputs(vdp.StandardInputs):
    # docstring and type hints: supplements hifv_fixpointing
    def __init__(self, context, vis=None):
        """Initialize Inputs.

        Args:
            context: Pipeline context object containing state information.

            vis: The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the hifv_importdata task.

        """
        self.context = context
        self.vis = vis

[docs] @task_registry.set_equivalent_casa_task('hifv_fixpointing') @task_registry.set_casa_commands_comment('Add your task description for inclusion in casa_commands.log') class Fixpointing(basetask.StandardTaskTemplate): Inputs = FixpointingInputs
[docs] def prepare(self): m = self.inputs.context.observing_run.get_ms(self.inputs.vis) obs_start = self.inputs.context.observing_run.start_datetime obs_end = self.inputs.context.observing_run.end_datetime # For VLASS1.1 the dates with sets of old ACUs are (from VLASS Memo 12): # 9/7/17 - 10/23/17 # ea03, ea04, ea05, ea06, ea07, ea09, ea10, ea11, ea12, ea13, ea15, ea16, ea18, ea19, ea20, ea22, ea23, ea24, ea25, ea26, ea27, ea28 # 10/23/17 - 2/7/18 # ea03, ea04, ea05, ea06, ea07, ea09, ea10, ea11, ea12, ea13, ea15, ea16, ea18, ea19, ea20, ea22, ea23, ea24, ea25, ea26, ea27 # 2/7/18 - 2/20/18 # ea03, ea04, ea05, ea06, ea09, ea10, ea11, ea12, ea13, ea15, ea16, ea18, ea19, ea20, ea22, ea23, ea24, ea25, ea26, ea27 antnames = [] antlist = [] if datetime(2017, 9, 7, 0, 0, 0, tzinfo=timezone.utc) < obs_start <= datetime( 2017, 10, 23, 0, 0, 0, tzinfo=timezone.utc ): antnames = ['ea03', 'ea04', 'ea05', 'ea06', 'ea07', 'ea09', 'ea10', 'ea11', 'ea12', 'ea13', 'ea15', 'ea16', 'ea18', 'ea19', 'ea20', 'ea22', 'ea23', 'ea24', 'ea25', 'ea26', 'ea27', 'ea28'] if datetime(2017, 10, 23, 0, 0, 0, tzinfo=timezone.utc) < obs_start <= datetime( 2018, 2, 7, 0, 0, 0, tzinfo=timezone.utc ): antnames = ['ea03', 'ea04', 'ea05', 'ea06', 'ea07', 'ea09', 'ea10', 'ea11', 'ea12', 'ea13', 'ea15', 'ea16', 'ea18', 'ea19', 'ea20', 'ea22', 'ea23', 'ea24', 'ea25', 'ea26', 'ea27'] if datetime(2018, 2, 7, 0, 0, 0, tzinfo=timezone.utc) < obs_start <= datetime( 2018, 2, 20, 0, 0, 0, tzinfo=timezone.utc ): antnames = ['ea03', 'ea04', 'ea05', 'ea06', 'ea09', 'ea10', 'ea11', 'ea12', 'ea13', 'ea15', 'ea16', 'ea18', 'ea19', 'ea20', 'ea22', 'ea23', 'ea24', 'ea25', 'ea26', 'ea27'] if antnames != []: antobjectslist = m.get_antenna() # index identifiers for VLA antennas antlist = [antenna.id for antenna in antobjectslist if antenna.name in antnames] # Example translation of antenna names to index numbers # antnames = ['ea03', 'ea04', 'ea05', 'ea06', 'ea09', 'ea10', 'ea11', 'ea12', 'ea13', 'ea15', 'ea16', 'ea18', # 'ea19', 'ea20', 'ea22', 'ea23', 'ea24', 'ea25', 'ea26', 'ea27'] # antlist = [2,3,4,5,6,8,9,10,11,13,14,16,17,18,20,21,22,23,24,25] fixpointing_offset_vlass(self.inputs.vis, antlist=antlist) return FixpointingResults()
[docs] def analyse(self, results): return results