Source code for irfpy.cena.cena_mass2

''' Class handling CENA mass mode data.

CENA mass mode module.  Better implementation than cena_mass
from the memory leak point of view as mass2 module uses ring-cache.

The memory cache is only valid for reading a CENA data file, which
is an orbit base.  Packet based cache is **not** supported in module level.
Thus, getter methods (:meth:`getdataE16`, :meth:`getdata` or whaterver)
are rather time consuming if the request is repeated.
In such a case, user may use cache by their own, or consider using
:mod:`cena_mass_time` module.
'''

import datetime

import logging as _logging

import urllib.request
import urllib.parse
import urllib.error
import numpy

import Sadppac

from irfpy.util.julday import Julday, JdObject, JdSeries
from irfpy.util.ringcache import SimpleRingCache
from irfpy.util import utc
import irfpy.cy1orb.Cy1OrbitNr

from irfpy.util.irfpyrc import Rc
import irfpy.cena.cena_flux
__cnt2flx = irfpy.cena.cena_flux.Count2Flux()

_logger = _logging.getLogger(__name__)

class _MassCache:
    ''' A cache implementation for CENA mass mode.

    Cached is the array of decompressed CenaPacket's.
    '''
    _mass_cache = None

    @classmethod
    def default_cache(cls):
        if cls._mass_cache is None:
            cls._mass_cache = _MassCache()
        return cls._mass_cache
        

    def __init__(self):
        self.__rc = Rc()
        __cs = self.__rc.get('cena', 'cena_mass2_cachesize')
        if __cs is None:
            __cs = 10
        __cs = int(__cs)
        self.__mass = SimpleRingCache(__cs)   # __cs data is stored.
        del __cs

        self.__cy1orbitnr = irfpy.cy1orb.Cy1OrbitNr.Cy1OrbitNr()
        self.__cy1orbitnr.setFromDefaultUri()

    def _getorbitnr(self):
        return self.__cy1orbitnr

    def _getcache(self, orb):
        ''' A helper function of the cache and accessing.

        This function hide the caching from the other code.
        '''
        logger = _logging.getLogger(self.__class__.__name__)

        __mass = self.__mass

        if not __mass.hasKey(orb):
            cenapac = JdSeries()

            br = Sadppac.CenaBmuReader()
            cenapath = self.__rc.get('cena', 'bmuobase')
            cenadatapath = cenapath + '/' + ('orb_%04d' % orb) + '/' + (
                                        'orb_%04d.cena' % orb)
            logger.info('Retrieve data from %s' % cenadatapath)
            try:
                f0, f1 = urllib.request.urlretrieve(cenadatapath)
                logger.info('Load data from %s' % f0)
            except IOError as e:
                logger.info('No file existing for %s' % cenadatapath)  # This happens frequently.
                __mass.add(orb, cenapac)
                return cenapac

            br.readFile(f0)
            cenaarray = br.getArray()

            for i in range(cenaarray.size()):
                pac = cenaarray.getPacket(i)
                if pac.getTMmode() == pac.TMMODE_MASS_ACC:
                    pac = pac.decompress()
                    if  pac is not None:
                        jd = utc.convert(pac.getUtc(), Julday)
                        cenapac.add(jd, pac)
                else:
                    logger.debug('NOT MASS MODE')
            __mass.add(orb, cenapac)

        return __mass.get(orb)

    def __str__(self):
        return '<irfpy.cena.cena_mass2._MassCache instance (cache size %d)>' % len(self.__mass)
        

def _packet2array(pac):
    ''' Input is the CenaPacket instance (Sadppac) and the output is the numpy.array.

    CenaPacket instance should be decompressed before.
    Only 8x7x128x1 array is supported.
    '''
    ### Evaluate CenaPacket
    nE = pac.getNrEne()
    nC = pac.getNrCh()
    nP = pac.getNrPhase()
    nM = pac.getNrMass()

    if nE!=8 or nC!=7 or nP!=1 or nM!=128:
        _logger.warning('Binning parameter not supported for observation of %s. nE=%d(!=8), nC=%d(!=7), nP=%d(!=1), nM=%d(!=128)\n'%(pac.getUtc(),nE,nC,nP,nM))
        raise RuntimeError('Binning parameter not supported for observation of %s. nE=%d(!=8), nC=%d(!=7), nP=%d(!=1), nM=%d(!=128)\n'%(pac.getUtc(),nE,nC,nP,nM))

    matx = pac.getCenaSciMassAcc()
    if matx is None:
        _logger.warning('Failed to get CenaSciMassAcc by unknown reason for %d at %s.'%(pac.get4sCycleCntr(), str(pac.getUtc())))
        raise RuntimeError("Matrix compressed?? %d %s" % (pac.get4sCycleCntr(), str(pac.getUtc())))
    nC2 = matx.getNrChIntr()  # Internal expression of channel number.  In fact, this is 8.
    data = matx.getCountAsArray()
    data = numpy.array(data)
    data=data.reshape(nE, nP, nC2, nM)
    data = data[:, :, :nC, :].sum(1)    # TODO: Inefficient way ???  Just to ignore the nP=1 axis.

    return data


def _packet2arrayE16(pac):
    ''' From the packet, count rate array is returned for generic energy table.

    CenaPacket instance should be decompressed before.
    Only 8x7x128x1 array is supported.
    The returned matrix is a masked_array.
    '''
    ### Evaluate CenaPacket
    nE = pac.getNrEne()
    nC = pac.getNrCh()
    nP = pac.getNrPhase()
    nM = pac.getNrMass()

    if nE!=8 or nC!=7 or nP!=1 or nM!=128:
        _logger.warning('Binning parameter not supported for observation of %s. nE=%d(!=8), nC=%d(!=7), nP=%d(!=1), nM=%d(!=128)\n'%(pac.getUtc(),nE,nC,nP,nM))
        raise RuntimeError('Binning parameter not supported for observation of %s. nE=%d(!=8), nC=%d(!=7), nP=%d(!=1), nM=%d(!=128)\n'%(pac.getUtc(),nE,nC,nP,nM))

    matx = pac.getCenaSciMassAccE16()
    if matx is None:
        _logger.warning('Failed to get CenaSciMassAcc by unknown reason for %d at %s.'%(pac.get4sCycleCntr(), str(pac.getUtc())))
        raise RuntimeError("Matrix compressed?? %d %s" % (pac.get4sCycleCntr(), str(pac.getUtc())))
    nC2 = matx.getNrChIntr()  # Internal expression of channel number.  In fact, this is 8.
    data = numpy.zeros([16, 7, 128])

    for ie in range(16):
        for ic in range(7):
            for im in range(128):
                data[ie, ic, im] = matx.get(ie, 0, ic, im)

    data = numpy.ma.masked_equal(data, 63488)

#    data=data.reshape(nE, nP, nC2, nM)
#    data = data[:, :, :nC, :].sum(1)    # TODO: Inefficient way ???  Just to ignore the nP=1 axis.

    return data

def _pac_getsv(pac):
    ''' Return the SV table value.

    :param pac: CenaPacket instance.
    :returns: 4 integer tuple corresponding SV_WAVE1, 2A, 2B, and LENS.
    '''
    fhp = pac.getFastHkPac()
    svs = (fhp.getSv1Table(), fhp.getSv2Table(), fhp.getSv3Table(),
            fhp.getSv4Table())
    
    return svs

def _pac_gethvmain(pac):
    ''' Return the HV value.

    :param pac: CenaPacket instance
    :returns: HV value.
    '''
    fhp = pac.getFastHkPac()
    hvmain = fhp.getHvMain()
    return hvmain

[docs]def getobstime(timerange=None, orbit=None): ''' Get the observation time of CENA as array of datetime instances. Returned is an array of datetime instance. ''' logger = _logging.getLogger(__name__ + ".getobstime") if timerange is None: if orbit is None: raise RuntimeError('No time specified') else: t0 = _MassCache.default_cache()._getorbitnr().getStartTime(orbit) t1 = _MassCache.default_cache()._getorbitnr().getStopTime(orbit) elif len(timerange) != 2: raise RuntimeError('Time range should be 2-element') else: t0, t1 = timerange t0 = utc.convert(t0, datetime.datetime) t1 = utc.convert(t1, datetime.datetime) orb0 = _MassCache.default_cache()._getorbitnr().getOrbitNr(t0) orb1 = _MassCache.default_cache()._getorbitnr().getOrbitNr(t1) logger.info('T0 = %s @ %d' % (t0, orb0)) logger.info('T1 = %s @ %d' % (t1, orb1)) tlist = [] for orb in range(orb0, orb1 + 1): logger.info('Orbit %d processing' % orb) logger.debug(' Cache status = %s' % str(_MassCache.default_cache())) cenadata = _MassCache.default_cache()._getcache(orb) for pacdata in cenadata: pac = pacdata.getData() t = utc.convert(pac.getUtc(), datetime.datetime) logger.debug('%f %s' % (pac.getUtc(), t)) if t >= t0 and t <= t1: tlist.append(t) tlist.sort() return tlist
[docs]def getdata(t, filter=None): ''' Get CENA mass mode date for the specified time t. :param t: Time :type t: `datetime.datetime` :param filter: Filter(s) to apply the data. **Not supported.** :returns: JdObject that includes data of (8,7,128) shaped numpy.array :rtype: `irfpy.util.julday.JdObject` >>> jddat = getdata(datetime.datetime(2009,4,18,1,30,0)) >>> dat = jddat.getData() # jddat is an instance of JdObject >>> print(jddat.getJd().juld()) # doctest: +ELLIPSIS 2454939.56251... >>> print(dat.sum()) 5.0 >>> print(dat.shape) (8, 7, 128) ''' logger =_logging.getLogger(__name__) t0 = utc.convert(t, datetime.datetime) jd0 = utc.convert(t, Julday) orb0 = _MassCache.default_cache()._getorbitnr().getOrbitNr(t0) logger.info('T = %s @ %d' % (t0, orb0)) cenadata4orb0 = _MassCache.default_cache()._getcache(orb0) cenadata = cenadata4orb0.getNeighbor(jd0) cenaarray = _packet2array(cenadata.getData()) return JdObject(cenadata.getJd(), numpy.array(cenaarray))
[docs]def getSvTable(t): ''' Return the SV table. >>> import dateutil.parser >>> t0 = dateutil.parser.parse('2009-02-06 05:13:11.078000') >>> print(numpy.array(getSvTable(t0).getData())) [2 2 2 2] >>> t0 = dateutil.parser.parse('2009-02-06 05:13:11.078000') >>> print(numpy.array(getSvTable(t0).getData())) [2 2 2 2] >>> t0 = dateutil.parser.parse('2009-07-29 06:21:07.265000') >>> print(numpy.array(getSvTable(t0).getData())) [1 1 1 1] >>> t0 = dateutil.parser.parse('2009-07-29 06:42:05.303000') >>> print(numpy.array(getSvTable(t0).getData())) [3 3 3 3] >>> t0 = dateutil.parser.parse('2009-07-29 08:29:11.415000') >>> print(numpy.array(getSvTable(t0).getData())) [3 3 3 3] >>> t0 = dateutil.parser.parse('2009-07-29 08:50:00.121000') >>> print(numpy.array(getSvTable(t0).getData())) [2 2 2 2] ''' t0 = utc.convert(t, datetime.datetime) jd0 = utc.convert(t, Julday) orb0 = _MassCache.default_cache()._getorbitnr().getOrbitNr(t0) cenadata4orb0 = _MassCache.default_cache()._getcache(orb0) cenadata = cenadata4orb0.getNeighbor(jd0) sv = _pac_getsv(cenadata.getData()) return JdObject(cenadata.getJd(), sv)
[docs]def getHvMain(t): ''' Return the high voltage. >>> print(getHvMain(datetime.datetime(2009, 6, 13, 8, 0, 0)).getData()) 0 >>> print(getHvMain(datetime.datetime(2009, 6, 13, 8, 2, 24)).getData()) 807 >>> print(getHvMain(datetime.datetime(2009, 6, 13, 8, 4, 52)).getData()) 2957 ''' t0 = utc.convert(t, datetime.datetime) jd0 = utc.convert(t, Julday) orb0 = _MassCache.default_cache()._getorbitnr().getOrbitNr(t0) cenadata4orb0 = _MassCache.default_cache()._getcache(orb0) cenadata = cenadata4orb0.getNeighbor(jd0) hv = _pac_gethvmain(cenadata.getData()) return JdObject(cenadata.getJd(), hv)
[docs]def getdataE16(t, filter=None, validation=None): ''' Get CENA mass mode data in generic energy level (E16). :param t: Time :type t: `datetime.datetime` :param filter: Filter to apply. **Not supported.** :param validation: Validation to apply. See also :class:`invalid_mask` class. :returns: masked_array of the count rate data. Count rate data is an instance of `numpy.np.MaskedArray` with (16, 7, 128) shape. :rtype: `irfpy.util.julday.JdObject` The method returns the masked ndarray that includes the energy and directional spectra in generic 16 energy step mode. This is very frequently used function, but rather time consuming. Thus, it is recommended to use a cache by user, or use :mod:`cena_mass_time` module that returns the "list" of the data. >>> jddat = getdataE16(datetime.datetime(2009, 4, 18, 1, 30, 0)) >>> dat = jddat.getData() >>> print(jddat.getJd().juld()) # doctest: +ELLIPSIS 2454939.56251... >>> print(dat.sum()) 5.0 >>> print(dat.shape) (16, 7, 128) >>> print(dat[:,:,:64].sum(2).sum(1)) [-- -- -- -- 0.0 0.0 0.0 0.0 1.0 1.0 2.0 1.0 -- -- -- --] Validation may exclude suspicious data by machine. An example is shown. Try to examine the data near 2009-07-13T11:17:00. >>> t0 = datetime.datetime(2009, 7, 13, 11, 17, 00) Usual data can get via getdataE16 function. You can get masked array with a shape of 16 x 7 x 128. >>> rawcount = getdataE16(t0).getData() # getData() is for extracting JdObject >>> rawcount.shape (16, 7, 128) Total count you can get, as this case, 140. (May be changeable for future database change) >>> rawcount.sum() 140.0 Average counts with data which has not masked is ~0.0195 >>> print('%.4f' % rawcount.mean()) 0.0195 Then, the number of 'non-masked' bin is 140.0 / 0.0195 = 7168.0 >>> rawcount.sum() / rawcount.mean() 7168.0 This is consistent with the number of the data 8x7x128 = 7168. Ok. Now we try a suspicious data. You can get less suspicious data with validation keyword. Simplest example (and most robust now) is to exclude >100 cnts hydrogen count or 10 times more oxygen count. This is too much strong criteria, probably, but well, we go with it. >>> proccount = getdataE16(t0, validation=[invalid_mask.high_heavy_mask, invalid_mask.high_count_mask]).getData() Shape does not change. >>> proccount.shape (16, 7, 128) However, the total count should change. >>> proccount.sum() 79.0 This probably means most of the data is rejected. Let's check it. Ok, now the average count rate can be calculated as >>> print('%.6f' % proccount.mean()) 0.015430 and thus, the number of valid bins are 79.0 / 0.015430 = 5120. >>> proccount.sum() / proccount.mean() 5120.0 This means that 2048 data is rejected in this case. Ok, let's check also directly. You can use compressed() function. >>> print(len(proccount.compressed())) 5120 ''' logger = _logging.getLogger(__name__ + ".getdataE16") t0 = utc.convert(t, datetime.datetime) jd0 = utc.convert(t, Julday) orb0 = _MassCache.default_cache()._getorbitnr().getOrbitNr(t0) cenadata4orb0 = _MassCache.default_cache()._getcache(orb0) cenadata = cenadata4orb0.getNeighbor(jd0) cenaarray = _packet2arrayE16(cenadata.getData()) if validation is not None: for v in validation: logger.debug('---> Before validation, %d' % len(cenaarray.compressed())) mask = v(cenaarray) cenaarray.mask = mask | cenaarray.mask logger.debug('<--- After validation, %d' % len(cenaarray.compressed())) return JdObject(cenadata.getJd(), cenaarray)
[docs]def getHdefluxE16(t, filter=None, validation=None): ''' Calculate the differential flux (#/eV sr cm2 s) of H-ENA from CENA data. Calculate the differential flux of the H-ENA. The flux is calucalted by the help of ``cena_flux`` module. Simple to call this function. Returned is the masked array-data JdObject. >>> f = getHdefluxE16(datetime.datetime(2009, 7, 13, 11, 17)) >>> f = f.getData() # To extract JdObject. >>> f.shape (16, 7) The valid data is only 8x7 due to the energy table. >>> len(f.compressed()) # Number of valid data 56 What is validation? Validation is to exclude the suspective data set. See also :class:`invalid_mask` class. >>> f = getHdefluxE16(datetime.datetime(2009, 7, 13, 11, 17), ... validation=[invalid_mask.high_heavy_mask, invalid_mask.high_count_mask]) This returns the masked dataset in which the low mass channel data with >100 cnts or counts in high mass channel has >10% of the low mass channel is masked. >>> f = f.getData() # Just to extract JdObject >>> f.shape (16, 7) >>> len(f.compressed()) # Number of valid data 40 You can change the threshold, if you want. >>> mask1 = invalid_mask.make_high_heavy_mask(0.05) # Remove data if O counts has >5% of H. >>> mask2 = invalid_mask.make_high_count_mask(10) # Remove data if H counts has >10 cnts. >>> f = getHdefluxE16(datetime.datetime(2009, 7, 13, 11, 17), validation=[mask1, mask2]) >>> len(f.getData().compressed()) 24 ''' # First, get the count rate data. espec = getdataE16(t, filter=filter, validation=validation) espec_t = espec.getJd() espec = espec.getData() # Third, getting only H data. especH = espec[:, :, 0:64].sum(2) # 16x7 array # Last, convert count rate to energy spectra flx = numpy.ma.ones((16, 7)) * (-1e+50) # Initialize by negative number for ie in range(16): for id in range(7): if not numpy.ma.is_masked(especH[ie, id]): flx[ie, id] = __cnt2flx.getFlux(especH[ie, id], ie, id) # mask it flx = numpy.ma.masked_less(flx, 0) return JdObject(espec_t, flx)
[docs]class invalid_mask: ''' A collection of validation routines. Validation routine should get a count rate data with a shape of (16, 7, 128), and returns a boolean array with a shape of (16, 7, 128). If the data is invalid, ``True`` is set. (Thus it is used for ``masked_array.mask``.) It is recommended to use the following set: [high_heavy_mask, high_count_mask] '''
[docs] @classmethod def allok(self, data): ''' All the data is validated. All the data is validated, thus 16x7x128 False array is returned. This does not change the data. For debug purpose, mainly. >>> import random >>> dat = numpy.array([random.randint(0, 100) for i in range(16*7*128)]) >>> dat = dat.reshape((16, 7, 128)) >>> inval = invalid_mask.allok(dat) >>> print(inval.shape) (16, 7, 128) >>> print(inval.any()) # All the inval is False. (=No any inval is True.) False ''' logger = _logging.getLogger(self.__class__.__name__ + ".allok") if data.shape != (16, 7, 128): raise ValueError('Shape of data should be (16, 7, 128).') inval = numpy.zeros([16, 7, 128]) return inval != 0
[docs] @classmethod def allng(self, data): ''' All the data is invalid. By some reason, all the data obtained in the moment is set to invalid. All the elements is set to True, i.e. invalid. Mainly for debugging purpose. ''' logger = _logging.getLogger(self.__class__.__name__ + ".allng") if data.shape != (16, 7, 128): raise ValueError('Shape of data should be (16, 7, 128).') inval = numpy.zeros([16, 7, 128]) return inval == 0
[docs] @classmethod def high_heavy_mask(self, data): ''' Mask the data with count rate (O/H) is >10%. In case heavy channel has more than 10% of the counts, the channel is rejected. ''' fnc = self.make_high_heavy_mask(0.1) return fnc(data)
[docs] @classmethod def make_high_heavy_mask(self, threshold): ''' Return the similar mask filter of high_heavy_mask with specific threshold. ''' def f(data): if data.shape != (16, 7, 128): raise ValueError('Shape of data should be (16, 7, 128).') # sum over the energy. d = data.sum(axis=0) # 7x128 # H is sum over 0 to 63. dH = d[:, :64].sum(axis=1) # shape = 7 # O is sum oveer 64 to 125. dO = d[:, 64:126].sum(axis=1) # shape = 7 mask = (dO > dH * threshold) # shape = 7 masks = [] for i in range(16): masks.append(mask) # 16 x 7 masks2 = [] for i in range(128): masks2.append(masks) masks2 = numpy.array(masks2) # 128 x 16 x 7 masks2 = masks2.transpose(1, 2, 0) return masks2 return f
[docs] @classmethod def high_count_mask(self, data): ''' Mask the data with count rate of H is >100 cnts. ` ''' fnc = self.make_high_count_mask(100) return fnc(data)
[docs] @classmethod def make_high_count_mask(self, threshold): ''' Return the mask filder to maks the channel with very high count rate. ''' def f(data): if data.shape != (16, 7, 128): raise ValueError('Shape of data should be (16, 7, 128).') # Integrate over the energy d = data.sum(axis=0) # 7x128 dH = d[:, :64].sum(axis=1) # H counts, shape = (7,) mask = (dH > threshold) # shape = 7 masks = [] for i in range(16): masks.append(mask) # 16 x 7 masks2 = [] for i in range(128): masks2.append(masks) masks2 = numpy.array(masks2) # 128 x 16 x 7 masks2 = masks2.transpose(1, 2, 0) return masks2 return f