''' IMA science packet emulation
IMA packet is either 192s or 32s.
See also :ref:`tutorial_vima_scidata2`.
*32s packet*
Not yet supported (since we have not run this mode for nominal operations)
*192s packet*
192s is the default mode for IMA.
In the usual mode, the energy is swept every 12s over 96 steps.
Then, the deflection is swept, if sweeping mode, over 16 steps.
Thus, each 12s data represent the 2-D data (cone-shaped).
This 2-D data is :class:`CntMatrix2D`.
'''
import logging
import numpy as np
from irfpy.imacommon import imamode
from irfpy.util import exception as ex
from irfpy.util import tdict
[docs]class CntMatrix(object):
''' Represent a single 192s packet with deflection sweeping.
'''
def __init__(self, t, matrix, mode, mode_data=None, name='unknown', hk=None):
''' Constructor of count matrix for 192s packet.
:param t: (``datetime``) The start time of the packet
:param matrix: The matrix in MAEP order. This will be modified
by operation.
:param matrix_data: The matrix in MAEP order, the original matrix.
This should be kept for any further operations.
:param mode: The mode. (:class:`irfpy.imacommon.imamode.Mode` object)
:param mode_data: The mode at observation.
:param name: The name of the sensor. Default is 'unknown'.
:param hk: The :class:`HK` object. Default is HK object with all variables filled by None.
'''
self.t = t
if matrix.shape != (mode.m, mode.a, mode.e, mode.p):
raise ex.IrfpyError('\n'.join(
['Given matrix shape does not match to the given mode.',
' Matrix: {}'.format(str(matrix.shape)),
' Mode: {}'.format(str(mode)),
]))
self.matrix = matrix
self.matrix_data = matrix.copy()
self.mode = mode
if mode_data is None:
self.mode_data = mode
else:
self.mode_data = mode_data
self.name = name
if hk is None:
self.hk = HK()
else:
self.hk = hk
def __str__(self):
return "<%s(%s)@%s:MOD=%02d >>%02d<<:CNTmax=%d>" % (
self.__class__,
self.name,
self.t.strftime('%FT%T.%f'),
self.mode.i,
self.mode_data.i,
self.matrix.max())
[docs]class TimeSeriesCntMatrix(tdict.TimeDict):
''' Container for time series of :class:`CntMatrix`.
Instance of this class preserves the time series of
the :class:`CntMatrix` member.
Users do not need to instance any object, but helper functions
will generate those objects.
This class is an extension from :class:`util:irfpy.util.tdict.TimeDict`.
'''
def __init__(self):
tdict.TimeDict.__init__(self)
[docs] def append(self, t, data):
''' Append the data.
Append the data.
Users do not need to add the data.
'''
if not isinstance(data, CntMatrix):
raise TypeError('Given data is not CntMatrix instance')
if t != data.t:
raise ValueError('Given date is not consistent with data date')
tdict.TimeDict.append(self, t, data)
[docs] def getarray(self):
""" Return the matrix as a big numpy array.
:return: (M32, A16, E96, P16, Tn) masked numpy array.
:raises: Exception raised if the mode changed in the dataset.
(Consider using emulation mode to read such data.)
"""
arr = []
for time, data in self:
arr.append(data.matrix)
return np.ma.masked_array(arr).transpose(1, 2, 3, 4, 0) # To be MAEPT order.
[docs]class CntMatrix2D(object):
''' Container for 2-D slice of IMA science count.
Instance of this class preserves the 2D information
of the IMA count data together with some aux data.
Users do not need to instance the object.
This is mostly used as a member of :class:`TimeSeriesCntMatrix2D`.
The data is accessed via the attributes:
- :attr:`t` for time
- :attr:`matrix` for matrix (counts) in ``np.ma.masked_array``.
- :attr:`matrix_data` for matrix (counts) in ``np.ma.masked_array``.
- :attr:`polar` polar index (start, stop) inclusive.
- :attr:`mode` mode (:class:`irfpy.imacommon.imamode.Mode` object).
- :attr:`mode_data` for the original mode (for emulation functionality)
- :attr:`name` for the name of the sensor
- :attr:`hk` for :class:`HK`, the housekeeping data
IMA science data is obtained "simultaneously" in 2D velocity space,
namely energy (velocity) and azimuth.
Additionally, mass direction data is appended.
The shape of the matrix depends on the
:mod:`mode <irfpy.imacommon.imamode>`.
Two commonly used modes are
:attr:`mode 24 <irfpy.imacommon.imamode.M24>` and
:attr:`mode 25 <irfpy.imacommon.imamode.M25>` and the corresponding
shapes are the same (M32, A16, E96).
The difference is the time resolution and elevation coverage.
- For :attr:`mode 24 <irfpy.imacommon.imamode.M24>`,
the time resolution is 12sec and elevation coverage is every 1 step.
- For :attr:`mode 25 <irfpy.imacommon.imamode.M25>`,
the time resolution is 24sec and elevation coverage is every 2 steps.
Note that the energy direction information is obtained from the
sweep of high voltage. Thus, in the reality the obtained data is not
"instantly" observed. However, this class assums the observations
are done at the time of the start.
'''
def __init__(self, t, matrix, polar, mode, mode_data=None, name='unknown', hk=None):
self.t = t
self.matrix = matrix
self.matrix_data = matrix.copy()
self.polar = polar
self.mode = mode
if mode_data is None:
self.mode_data = mode
else:
self.mode_data = mode_data
self.name = name
if hk is None:
self.hk = HK()
else:
self.hk = hk
def __str__(self):
return "<%s(%s)@%s:MOD=%02d >>%02d<<:POL=%02d/%02d:CNTmax=%d>" % (
self.__class__,
self.name,
self.t.strftime('%FT%T.%f'),
self.mode.i,
self.mode_data.i,
self.polar[0],
self.polar[1],
self.matrix.max())
[docs]class TimeSeriesCntMatrix2D(tdict.TimeDict):
''' Container for time series of :class:`CntMatrix2D`.
Instance of this class preserves the time series of
the :class:`CntMatrix2D` member.
Users do not need to instance any object, but helper functions
will generate those objects.
This class is an extensin from :class:`util:irfpy.util.tdict.TimeDict`.
'''
def __init__(self):
tdict.TimeDict.__init__(self)
[docs] def append(self, t, data):
''' Append the data.
Append the data.
This is a developer's code, so that
users do not need to add the data by themselves.
'''
if not isinstance(data, CntMatrix2D):
raise TypeError('Given data is not CntMatrix2D instance')
if t != data.t:
raise ValueError('Given date is not consistent with data date')
tdict.TimeDict.append(self, t, data, dup='update')
[docs] def getarray(self):
""" Return the matrix as a big numpy array.
:return: (M32, A16, E96, Tn) shaped masked numpy array.
:raises: Exception raised if the mode changed in the dataset.
(Consider using emulation mode to read such data.)
"""
arr = []
for time, data in self:
arr.append(data.matrix)
return np.ma.masked_array(arr).transpose(1, 2, 3, 0) # To be MAET order.
[docs]def timeseries2d_to_energytime_simple(timeser2d, azim=None):
""" Convert :class:`TimeSeriesCntMatrix2D` object to (96, N_t) matrix.
:param timeser2d: :class:`TimeSeriesCntMatrix2D` object
:keyword azim: Azimuthal angle to be accounted. A list/tuple of the number
should be given. For example, ``(1,)`` will give the azimuth channel 1 only.
``(0, 4, 7, 9)`` will give the azimuth channels 0, 4, 7, and 9 (although
this is non-sense for physics point of view). If *None* is given,
all the channels are used.
:returns: Simply add over the mass and the azimuth, and returns
(96, N_t) shaped matrix.
"""
arr = timeser2d
ntime = len(arr)
etarr = np.ma.zeros([96, ntime])
if azim is None:
azim = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
for i, (t, mat2d) in enumerate(arr):
### t is datetime object, mat2d is CntMatrix2D object with (M32, A16, E96) shape
etarr[:, i] = mat2d.matrix[:, azim, :].sum(1).sum(0)
return etarr
[docs]def timeseries2d_to_energytime_max(timeser2d, azim=None):
""" Convert :class:`TimeSeriesCntMatrix2D` object to (96, N_t) matrix.
:param timeser2d: :class:`TimeSeriesCntMatrix2D` object
:keyword azim: Azimuthal angle to be accounted. A list/tuple of the number
should be given. For example, ``(1,)`` will give the azimuth channel 1 only.
``(0, 4, 7, 9)`` will give the azimuth channels 0, 4, 7, and 9 (although
this is non-sense for physics point of view). If *None* is given,
all the channels are used.
:returns: Simply take the maximum of the azimuth channle, after the sum over the mass index, and returns
(96, N_t) shaped matrix.
"""
arr =timeser2d
ntime = len(arr)
etarr = np.ma.zeros([96, ntime])
if azim is None:
azim = list(range(16))
for i, (t, mat2d) in enumerate(arr):
cnts = mat2d.matrix.sum(0) # Add over the mass. Now (A16, E96) array
cnts = cnts[azim, :].max(0) # Now take only the given azimuth channel, and get the maximum along this axis.
etarr[:, i] = cnts
return etarr
[docs]def convert2to3(timeser_cntmat2d, emulate_full=False):
''' Convert 2D time series to 3D.
:param timeser_cntmat2d: :class:`TimeSeriesCntMatrix2D` object
:keywrod emulate_full: Data is emulated to represent the full matrix,
namely (M32, A16, E96, P16) size matrix.
To use this functionality, the 2D object should be full size,
or emulated full size (i.e. the shape should be (M32, A16, E96)).
For mode such as 28, where the original data is (M32, A8, E96) is
not accepted. It should be the mode 27 (using emulation functionality)
where (M32, A16, E96) sized 2D object is available.
For mode 27, the original shape of (M32, A16, E96, P2)
will be expanded to (M32, A16, E96, P16) with each value 1/8.
:returns: :class:`TimeSeriesCntMatrix` object
The first and last data could contain a partial data,
unless the first data of 2D array is synchronized to the start of the measurement
(i.e. polar[0] == 0).
'''
if emulate_full:
return convert2to3_emu(timeser_cntmat2d)
else:
return convert2to3_noemu(timeser_cntmat2d)
[docs]def convert2to3_noemu(timeser_cntmat2d):
logger = logging.getLogger('convert2to3_noemu')
# logger.setLevel(logging.DEBUG)
tser = timeser_cntmat2d # For alias...
ntser = len(tser)
if ntser == 0:
return TimeSeriesCntMatrix() # Being no data in input, no data goes to output
tser3 = TimeSeriesCntMatrix() # This is a container to save the CntMatrix object.
t0, d0 = tser[0] # t0 is datetime object, d0 is the first CntMatrix2D object
# p00 = d0.polar[0] # p00 is the
# p01 = d0.polar[1]
# mat0 = d0.matrix
# m0 = d0.mode
# md0 = d0.mode_data
# n0 = d0.name
### Prepare the matrix initialized by -9999.
mat3d = np.zeros([d0.mode.m, d0.mode.a, d0.mode.e, d0.mode.p]) - 9999.
npol = 16 // d0.mode.p # Number of each polar step. e.g. npol=1 for M24, =2 for M25.
### The polar index for 3D is calculated, and matrix updated.
p0 = d0.polar[0] // npol # Index of the given polar step.
mat3d[:, :, :, p0] = d0.matrix
for i in range(ntser):
ti, di = tser[i] # The current 2D spectra is di
# pi0 = di.polar[0]
# pi1 = di.polar[1]
# pi = pi0 // npol
# mati = di.matrix
# mi = di.mode
# mdi = di.mode_data
# ni = di.name
### If t0 is set to None, the 3D data is flushed, thus a new 3D data to be prepared
if t0 is None:
t0 = ti # Start of the 3D data.
d0 = di # Start 2D packet.
mat3d = np.zeros([d0.mode.m, d0.mode.a, d0.mode.e, d0.mode.p]) - 9999.
npol = 16 // d0.mode.p # npol=1 for M24, =2 for M25.
### The polar index for 3D is calculated, and matrix updated.
pi = di.polar[0] // npol # Index of the given polar step.
mat3d[:, :, :, pi] = di.matrix
### When the polar angle reached the maximum (15), the 3D data is flushed.
if di.polar[1] == 15:
cntmatx = CntMatrix(t0, np.ma.masked_less(mat3d, 0), d0.mode, mode_data=d0.mode_data, name=d0.name, hk=d0.hk)
tser3.append(t0, cntmatx)
t0 = None
### Flush the buffer.
if not t0 is None:
cntmatx = CntMatrix(t0, np.ma.masked_less(mat3d, 0), d0.mode, mode_data=d0.mode_data, name=d0.name, hk=d0.hk)
tser3.append(t0, cntmatx)
return tser3
[docs]def convert2to3_emu(timeser_cntmat2d):
tser = convert2to3_noemu(timeser_cntmat2d) # First, non-emulated matrix is made.
tserfull = TimeSeriesCntMatrix() # Emulated matrix container.
for tlabel, dat3d in tser: # For each instance...
t = dat3d.t
matrix = dat3d.matrix # Shape according to the mode.
shape = matrix.shape # (M, A, E, P)
mode = dat3d.mode
mode_data = dat3d.mode_data
name = dat3d.name
hk = dat3d.hk
if shape[:3] != (32, 16, 96):
raise ex.IrfpyError('Conversion to 3D with emulation failed. The 2D data should be read in emulated mode. Mode={}/Shape={} is given'.format(mode.i, str(shape)))
ematrix = np.ma.zeros([32, 16, 96, 16]) - 9999
if mode == imamode.M24:
ematrix = matrix
elif mode == imamode.M25: # (M32, A16, E96, P8)
ematrix[:, :, :, ::2] = matrix / 2.
ematrix[:, :, :, 1::2] = matrix / 2.
elif mode == imamode.M26: # (M32, A16, E96, P4)
ematrix[:, :, :, ::4] = matrix / 4.
ematrix[:, :, :, 1::4] = matrix / 4.
ematrix[:, :, :, 2::4] = matrix / 4.
ematrix[:, :, :, 3::4] = matrix / 4.
elif mode == imamode.M27: # (M32, A16, E96, P2)
ematrix[:, :, :, ::8] = matrix / 8.
ematrix[:, :, :, 1::8] = matrix / 8.
ematrix[:, :, :, 2::8] = matrix / 8.
ematrix[:, :, :, 3::8] = matrix / 8.
ematrix[:, :, :, 4::8] = matrix / 8.
ematrix[:, :, :, 5::8] = matrix / 8.
ematrix[:, :, :, 6::8] = matrix / 8.
ematrix[:, :, :, 7::8] = matrix / 8.
else:
raise ex.IrfpyError('Only the mode 24, 25, 26, and 27 support.')
edat3d = CntMatrix(t, ematrix, imamode.M24, mode_data=mode_data, name=name, hk=hk)
tserfull.append(t, edat3d)
return tserfull
[docs]class HK:
""" Represents HK data embedded in science data.
HK data is stored. This is very primitive class and the user can directly access the variable
from the attirbutes.
- ``mode`` The mode
- ``pacclevel`` PAC level
- ``promsection`` (EE)PROM section
- ``fifofill`` FIFO fill
- More. TBC
.. note::
This is for embedded HK data. For real HK data, also stored in ``full`` dataset has not yet
been implemented. This is obtained in different time than the science data packet.
"""
def __init__(self):
self.mode = None
self.pacclevel = None
self.promsection = None
self.fifofill = None
def __str__(self):
return '<IMA/HK>'