""" Handling proton ghost.
Proton ghost is a problem for MEX data handling.
This module provide an algorithm to reduce the ghost.
Usage: To be written.
Algorithm:
The algorithm used is based on Hans' implementation of Matlab.
Some modifications have been done (both intentionally and unintentionally), but
I hope general tendency is kept.
There are 7 rules defined by Hans, but so far three of simple rules have been implemented
(Rules 2, 3, and 4).
The reduction is done for (E96,M32) matrix.
Usually, the matrix should already be corrected by
- background subtraction
- missing mass interpolation
- masking the blocked FOV
but more detailed explanations should be written.
"""
import sys as _sys
import logging as _logging
import numpy as _np
import matplotlib.pyplot as _plt
from irfpy.mima import massring as _massring
from irfpy.mima import energy as _ene
import copy as _copy
[docs]def maximax(arr1d):
""" Return a pair of (max, maxarg) for arr1d.
:param arr1d: 1-dimensional array
:return: (max, maxarg)
"""
maxval = _np.nanmax(arr1d)
imaxval = _np.nanargmax(arr1d)
return (maxval, imaxval)
[docs]def minimin(arr1d):
minval = _np.nanmin(arr1d)
iminval = _np.nanargmin(arr1d)
return (minval, iminval)
[docs]def indexes_energy_at(etable, energy):
""" Return the index pair where the given energy bisect.
:param etable: Energy array, nan allowed
:param energy: Enegy
:return: (i0, i1) where etable[i0] > energy > etable[i1]
(To check which contains the equal, but not very sensitive)
>>> from irfpy.mima import energy
>>> import numpy as np
>>> etable = energy.get_default_table_v5_late(negative=True)
>>> etable = np.where(etable > 0, etable, np.nan)
>>> i0, i1 = indexes_energy_at(etable, 200)
>>> print(i0, etable[i0])
49 215.7
>>> print(i1, etable[i1])
50 193.0
"""
higher = _np.where(etable > energy)[0]
lower = _np.where(etable <= energy)[0]
return (_np.nanmax(higher), _np.nanmin(lower))
[docs]def increment(value, by=1, max=_np.infty, min=-_np.infty):
""" A helper function to increment (and to decrement)
:param value: The original value
:param by: A value to add to the original. Default of 1 is increment.
If you specify -1, it corresponds to decrement.
:param max: Maximum of possible value
:param min: Minimum of possible value
:return: ``value + by`` is returned. If it exceed ``max``, max is returned.
If it becomes below ``min``, min is returned.
"""
newvalue = value + by
if newvalue > max:
newvalue = max
elif newvalue < min:
newvalue = min
return newvalue
[docs]class BusterBase:
def __init__(self, emmatrix, lowenergy_limit_eV=100, highenergy_limit_eV=2000):
"""
:param emmatrix:
:param lowenergy_limit_eV:
:param highenergy_limit_eV:
:param delta:
:param lim:
"""
self.emmatrix = emmatrix # Original EM matrix
# self.processed = _copy.deepcopy(emmatrix) # Processed EM matrix. Initialized by original as copy
self.lowenergy_limit_eV = lowenergy_limit_eV
self.highenergy_limit_eV = highenergy_limit_eV
self.massfilter = MassFilter(emmatrix.promsection, emmatrix.pacc)
lowenergylim, _i1 = indexes_energy_at(emmatrix.energy_table, lowenergy_limit_eV)
highenergylim, _i1 = indexes_energy_at(emmatrix.energy_table, highenergy_limit_eV)
self.min_estep = highenergylim # A bit conflusing, but minimum energy step is maximum energy in eV
self.max_estep = lowenergylim
[docs] def increment(self, i):
""" Increment.
:param i: Original value
:return: Incremented value, ``i+1``, or ``self.max_estep`` if ``i+1>self.max_estep``
>>> import numpy as np
>>> emmatrix = EMmatrix(np.zeros([96,32]), 15, 4) # prom=15 and pacc=4
>>> bbase = BusterBase(emmatrix, lowenergy_limit_eV=200)
>>> print(bbase.increment(30))
31
>>> print(bbase.max_estep)
49
>>> print(bbase.increment(49)) # It does not go to 50, but is kept 49
49
"""
i1 = i + 1 if (i + 1 <= self.max_estep) else self.max_estep
return i1
[docs] def decrement(self, i):
""" Decrement.
:param i: Original value
:return: Decremented value, ``i-1``, or ``self.min_estep`` if ``i-1 < max.minstep``.
>>> import numpy as np
>>> emmatrix = EMmatrix(np.zeros([96,32]), 15, 4) # prom=15 and pacc=4
>>> bbase = BusterBase(emmatrix, highenergy_limit_eV=1000)
>>> print(bbase.decrement(40))
39
>>> print(bbase.min_estep)
31
>>> print(bbase.decrement(30)) # It does not go to 29, but goes to 31
31
"""
i1 = i - 1 if (i - 1 >= self.min_estep) else self.min_estep
return i1
[docs] def split_proton(self):
""" Split the self.original EMmatrix to two EMmatrixes. Proton and Nonproton.
:return:
"""
# First, remove protons
emmatrix_proton, emmatrix_noproton = self.emmatrix.split(self.massfilter.condition_real_proton())
return emmatrix_proton, emmatrix_noproton
[docs] def get_noproton_matrix(self):
em0, em1 = self.split_proton()
return em1 # Return only for non-proton
[docs] def split_energy_range_noproton(self, e0, e1):
""" Split the self.original EMmatrix to two EMmatrixes. Inside or outside the energy step range [e0,e1]
:param e0:
:param e1:
:return: A pair of EMmatrix, inside matrix and outside matrix
"""
cond = condition_estep_range(e0, e1) # True for selected energy steps
cond2 = ~self.massfilter.condition_real_proton() # True for non-proton region
cond = (cond & cond2)
emmatrix_inside, emmatrix_outside = self.emmatrix.split(cond)
return emmatrix_inside, emmatrix_outside
[docs] def buster(self):
"""
:return: A pair of :class:`EMmatrix` (em1, em2). em1 is for the cleaned matrix, and em2 is for the matrix that was subtracted.
"""
raise NotImplementedError('')
[docs]class BusterSingleEnergySpillover(BusterBase):
def __init__(self, *args, **kwds):
BusterBase.__init__(self, *args, **kwds)
self.mass5curve = _massring.central_mass_channel(5, self.emmatrix.promsection, self.emmatrix.pacc)
self.mass12curve = _massring.central_mass_channel(12, self.emmatrix.promsection, self.emmatrix.pacc)
[docs] def buster(self, lim=14, delta=3, minimum_ratio_heavy_middle=5, minimum_ratio_ghost_proton=2, ):
_logger = _logging.getLogger(__name__)
# Take an original matrix
emmatrix = self.emmatrix
# Initialize the returned matrix
emmatrix_ghost = self.emmatrix.zeros_like()
emmatrix_real = self.emmatrix
massfilter = MassFilter(emmatrix.promsection, emmatrix.pacc)
# First, remove protons
emmatrix_proton, emmatrix_noproton = emmatrix.split(massfilter.condition_real_proton())
matrix = emmatrix_noproton.get_matrix() # Numpy array
ions = matrix # Alias
lowenergylim = self.max_estep
highenergylim = self.min_estep
Eilim = highenergylim
Espect = emmatrix_noproton.get_sum_energy_spectra() # Energy spectra
espect = Espect # Alias...
cH, iH = maximax(Espect)
iHup = self.increment(iH)
iHdown = self.decrement(iH)
tmpmax = _np.nanmax(ions, axis=0) # Maximum count for each mass channel
itmp = _np.nanargmax(ions, axis=0) # Energy step that gives the maximum count for each mass channel
tst = _np.where(_np.abs(_np.diff(itmp)) <= delta)[0] # Maximum energy is confined
ind = int(_np.nanmedian(itmp))
if iH < self.min_estep or iH > self.max_estep:
return emmatrix_real, emmatrix_ghost
if len(tst) >= lim:
# I wonder if jj is iH. In HN code, jj looks a (fixed) according to the for loop far before.
# Let's try.
#jj = a
jj = iH
while (Eilim < iHup < lowenergylim) & (espect[iHup] < espect[iHup - 1]):
iHup = self.increment(iHup)
while (Eilim < iHdown < lowenergylim) & (espect[iHdown] < espect[iHdown + 1]):
iHdown = self.decrement(iHdown)
maxE = _np.max(ions, axis=1)
maxcnt = max(ions[iH])
cnt32 = self.emmatrix.get_matrix(copy=False)[iH, 31]
ibb = int(self.mass12curve[jj]) # The last mass channel of M>12
icc = int(self.mass5curve[jj]) # The last mass channle of M>5
bb = _np.arange(0, ibb + 1)
cc = _np.arange(ibb + 1, icc + 1)
hvy = float(_np.nanmedian(ions[jj, bb]))
noions = float(_np.nanmedian(ions[jj, cc]))
median_heavy = hvy
median_middle = noions
_logger.debug(f'{Espect[iH]}')
_logger.debug(f'{Espect[iHdown]}')
_logger.debug(f'{Espect[iHup]}')
_logger.debug(f'{ind}')
_logger.debug(f'{iH}')
_logger.debug(f'{delta}')
_logger.debug(f'{Eilim}')
_logger.debug(f'{maxcnt}')
_logger.debug(f'{cnt32}')
if ((Espect[iH] > 2 * Espect[iHdown]) &
(Espect[iH] > 2 * Espect[iHup]) &
(abs(ind - iH) < delta) &
(iH > Eilim) &
(maxcnt > minimum_ratio_ghost_proton * cnt32) &
(noions > hvy / minimum_ratio_heavy_middle)):
emmatrix_real, emmatrix_ghost = self.split_energy_range_noproton(iHdown, iHup)
return emmatrix_real, emmatrix_ghost
[docs]class BusterSpillOverManyNonzeros(BusterBase):
def __init__(self, *args, **kwds):
BusterBase.__init__(self, *args, **kwds)
[docs] def buster(self, threshold=28, m0=0, m1=31):
""" Buster the spill over.
:param threshold: Threshold. If more than this number between [m0,m1], this rule is triggered.
:return: a pair of :class:`EMmatrix`
If many mass channel is occupied, this condition is triggered.
For example, let's try to make the following array.
>>> import numpy as np
>>> matrix = np.zeros([96, 32])
>>> matrix[40, :] = 1 # E=40, all the data is 1. Total 32.
>>> matrix[39, 3:-3] = 2 # E=39 , the data is 2. Total 2x26 = 52
>>> matrix[38, 6:-6] = 3 # E=38, the data is 3. Total 3x20 = 60
>>> matrix[37, 9:-9] = 4 # E=37, the data is 4. Total 4x14 = 56
>>> matrix[36, 12:-12] = 5 # #=36, the data is 5. Total 5x8 = 40
>>> matrix[35, 15:-15] = 6 # E=35, the data is 6, total, 6x2 = 12
>>> matrix[41, 3:-3] = 2 # E=39 , the data is 2. Total 2x26 = 52
>>> matrix[42, 6:-6] = 3 # E=38, the data is 3. Total 3x20 = 60
>>> matrix[43, 9:-9] = 4 # E=37, the data is 4. Total 4x14 = 56
>>> matrix[44, 12:-12] = 5 # #=36, the data is 5. Total 5x8 = 40
>>> matrix[45, 15:-15] = 6 # E=35, the data is 6, total, 6x2 = 12
>>> emmatrix = EMmatrix(matrix, 15, 4)
>>> buster_matrix = BusterSpillOverManyNonzeros(emmatrix)
>>> m1, m2 = buster_matrix.buster()
"""
nonproton_matrix = self.get_noproton_matrix()
nonzeros = nonproton_matrix.get_nonzero_count_energy_spectrum(m0=m0, m1=m1) # (96,) shape. Value is number of mass step that has counts
espectra = nonproton_matrix.get_sum_energy_spectra() # (96,) shape. Value is the total counts (mass collapsed)
_logger = _logging.getLogger(__name__)
if max(nonzeros) > threshold:
_logger.info('Criteria hit.')
# Which energy step is influenced?
# Energy step that has spilled over
estep_max_influenced = _np.nanargmax(nonzeros)
# Energy range that is influenced
estep_influed_min = self.decrement(estep_max_influenced)
estep_influed_max = self.increment(estep_max_influenced)
# Low energy step to extend according to the energy spectra
while (estep_influed_min > self.min_estep) & (espectra[estep_influed_min] <= espectra[estep_influed_min - 1]):
estep_influed_min = self.decrement(estep_influed_min)
while (estep_influed_max < self.max_estep) & (espectra[estep_influed_max] <= espectra[estep_influed_max + 1]):
estep_influed_max = self.increment((estep_influed_max))
# The influence energy range is extended by 1 for each direction according to HN, but not adopted here (TBD)
#estep_influed_max += 1
#estep_influed_min -= 1
# Split the emmatrix according to the energy steps
emmatrix_ghost, emmatrix_real = self.split_energy_range_noproton(estep_influed_min, estep_influed_max)
else:
emmatrix_ghost = self.emmatrix.zeros_like()
emmatrix_real = self.emmatrix
return emmatrix_real, emmatrix_ghost
[docs]class BusterWeakSolarwind(BusterBase):
def __init__(self, *args, **kwds):
BusterBase.__init__(self, *args, **kwds)
self._logger = _logging.getLogger(__name__)
[docs] def buster(self):
""" Buster weak SW
:return:
"""
emmatrix = self.get_noproton_matrix()
espectra = emmatrix.get_sum_energy_spectra()
max_espect, ie_max_espect = maximax(espectra)
nonzeros = emmatrix.get_nonzero_count_energy_spectrum() # (96,) shape. Value is number of mass step that has counts
max_nonzeros, ie_max_nonzeros = maximax(nonzeros)
max_m0, ie_max_m0 = maximax(emmatrix.get_matrix()[:, 0])
maxc, im_maxc = maximax(emmatrix.get_matrix()[ie_max_espect, 1:])
if (_np.count_nonzero(nonzeros > 2) <= 5) & max_nonzeros >=8 & im_maxc >= 15 & im_maxc <= 25:
# Which energy step is influenced?
# Energy step that has spilled over
estep_max_influenced = _np.nanargmax(nonzeros)
# Energy range that is influenced
estep_influed_min = self.decrement(estep_max_influenced)
estep_influed_max = self.increment(estep_max_influenced)
# Low energy step to extend according to the energy spectra
while (estep_influed_min > self.min_estep) & (espectra[estep_influed_min] <= espectra[estep_influed_min - 1]):
estep_influed_min = self.decrement(estep_influed_min)
while (estep_influed_max < self.max_estep) & (espectra[estep_influed_max] <= espectra[estep_influed_max + 1]):
estep_influed_max = self.increment((estep_influed_max))
# Split the emmatrix according to the energy steps
emmatrix_ghost, emmatrix_real = self.split_energy_range_noproton(estep_influed_min, estep_influed_max)
else:
emmatrix_ghost = self.emmatrix.zeros_like()
emmatrix_real = self.emmatrix
return emmatrix_real, emmatrix_ghost
[docs]class BusterTooManyCountInMiddle(BusterBase):
""" Criteria 6. If too many counts exist in amu 5-12 area, the count <5 amu to be ghost
"""
def __init__(self, *args, **kwds):
BusterBase.__init__(self, *args, **kwds)
self._logger = _logging.getLogger(__name__)
self.mass5curve = _massring.central_mass_channel(5, self.emmatrix.promsection, self.emmatrix.pacc)
self.mass12curve = _massring.central_mass_channel(12, self.emmatrix.promsection, self.emmatrix.pacc)
self.mask_heavy = _np.zeros([96, 32], dtype=bool)
self.mask_middle = _np.zeros([96, 32], dtype=bool)
self.mask_ghost = _np.zeros([96, 32], dtype=bool)
for ie in range(self.min_estep, self.max_estep + 1):
mass5 = self.mass5curve[ie]
mass12 = self.mass12curve[ie]
im_heavy = _np.arange(0, mass12, dtype=int)
im_middle = _np.arange(int(mass12), mass5, dtype=int)
self.mask_heavy[ie, im_heavy] = True
self.mask_middle[ie, im_middle] = True
[docs] def buster(self, threshould=10.0):
""" Buster the ghost.
:param threshould: The threshold, k.
:rtype: (:class:`EMmatrix`, :class:`EMmatrix`)
:returns: A pair of matrix. Real and ghost matrixes.
We use the following rule for each energy step in the ghost energy range (defined in __init__ as parameter).
Calculate
- ``<Cmiddle>``, a median count in the middle mass channel (mass 5-12 amu)
- ``<Cheavy>``, a median count in the heavy mass channel (mass 12 and above)
Then, if ``<Cmiddle> > <Cheavy> / k``, then this rule is activated.
The rule will then consider the counts at mass 5 amu and above as ghost.
"""
matrix = self.emmatrix.get_matrix() # It is (96,32 )array
condition = _np.zeros([96, 32], dtype=bool)
# Median value for each energy step
heavy_matrix = _np.ma.masked_array(matrix, mask=self.mask_heavy)
heavy_matrix_median = _np.ma.median(heavy_matrix, axis=1) # For each energy
heavy_matrix_median.fill(_np.nan)
middle_matrix = _np.ma.masked_array(matrix, mask=self.mask_middle)
middle_matrix_median = _np.ma.median(middle_matrix, axis=1)
middle_matrix_median.fill(_np.nan)
for ie in range(self.min_estep, self.max_estep + 1):
heavy_median = heavy_matrix_median[ie]
middle_median = middle_matrix_median[ie]
if (middle_median > 0) & (middle_median >= heavy_median / threshould):
im_ghost = _np.arange(0, self.mass5curve[ie], dtype=int)
condition[ie, im_ghost] = True
emmatrix_ghost, emmatrix_real = self.emmatrix.split(condition)
return emmatrix_real, emmatrix_ghost
[docs]class BusterTooManyCountInLowMassChannels(BusterBase):
""" Criteria 7. If too many counts in low mass channel than middle (5-12).
"""
def __init__(self, *args, **kwds):
BusterBase.__init__(self, *args, **kwds)
self._logger = _logging.getLogger(__name__)
self.mass5curve = _massring.central_mass_channel(5, self.emmatrix.promsection, self.emmatrix.pacc)
self.mass12curve = _massring.central_mass_channel(12, self.emmatrix.promsection, self.emmatrix.pacc)
[docs] def buster(self, threshold=2):
matrix = self.emmatrix.get_matrix()
condition = _np.zeros([96, 32], dtype=bool)
for ie in range(self.min_estep, self.max_estep + 1):
if sum(matrix[ie, 1:3]) > sum(matrix[ie, 3:5]) / threshold:
condition[ie, :15] = True
emmatrix_ghost, emmatrix_real = self.emmatrix.split(condition)
return emmatrix_real, emmatrix_ghost
[docs]class EMmatrix:
""" Energy-Mass matrix data & associated table.
Making EMmatrix by hand.
>>> import numpy as np
>>> em = np.arange(96)[:, np.newaxis] + np.zeros([96, 32]) # (96, 32) array, with energy index as value
>>> em[85:] = np.nan
>>> matx = EMmatrix(em, 15, 4)
Getting data.
>>> em1 = matx.get_matrix() # Just return the matrix. Should be the same to em
>>> (em == em1)[:85].all()
True
Collapsing the matrix.
>>> espect = matx.get_max_energy_spectra() # Energy spectra, maximum along mass.
>>> espect[30] # As defined above, it should be the index.
30.0
>>> mspect = matx.get_max_mass_spectra() # Mass spectra. This case, constantly 84.
>>> mspect[15]
84.0
>>> espect = matx.get_sum_energy_spectra()
>>> espect[10] # Sum shall be the index time 32.
320.0
>>> mspect = matx.get_sum_mass_spectra()
>>> mspect[10] # Sum shall be constantly the sum from 0 to 84.
3570.0
"""
def __init__(self,
emmatrix: _np.ndarray,
promsection: int,
pacc: int,):
"""
:param emmatrix: E-M matrix with (96, 32) shape.
:type emmatrix: `numpy.ndarray`
:param promsection: PROM section. 0 to 16.
:param pacc: PACC. 0 to 7.
:keyword acceptance_proton: Acceptance mass channel for protons. ``mass channel > Rm + Dm * acceptace_proton``
is considered as real proton.
>>> em = EMmatrix(_np.zeros([96, 32]), 16, 4)
"""
_logger = _logging.getLogger(__name__)
if emmatrix.ndim != 2:
raise ValueError('Given EM matrix has dimension of {}, while EM matrix should be 2 dim.'.format(emmatrix.ndim))
self.emmatrix = emmatrix
""" The E-M matrix.
"""
self.promsection = promsection
self.pacc = pacc
self.energy_table = _ene.get_default_table(promsection, keep_negative=True)
self.energy_table = _np.where(self.energy_table < 0, _np.nan, self.energy_table) # Mask unused energy table.
[docs] def get_matrix(self, *args, **kwds):
return _np.array(self.emmatrix, *args, **kwds)
[docs] def get_sum_energy_spectra(self):
return _np.nansum(self.emmatrix, axis=1)
[docs] def get_sum_mass_spectra(self):
return _np.nansum(self.emmatrix, axis=0)
[docs] def get_max_energy_spectra(self):
return _np.nanmax(self.emmatrix, axis=1)
[docs] def get_max_mass_spectra(self):
return _np.nanmax(self.emmatrix, axis=0)
[docs] def get_nonzero_count_mass_spectrum(self):
""" Return the number of nonzero energy step for each mass channel.
:return: Number of nonzero energy step for each mass channel. (32,) shaped
"""
matrix = self.emmatrix
# Count non-zeros along energy.
nonzero_counts = _np.count_nonzero(matrix > 0, axis=0) # >0 for treating nan value.
return nonzero_counts
[docs] def get_nonzero_count_energy_spectrum(self, m0=0, m1=31):
""" Return the number of nonzero mass channels for each energy step.
:return: Number of nonzero mass channel for each energy step. (96,) shaped
"""
matrix = self.emmatrix[:, m0:m1+1]
nonzero_counts = _np.count_nonzero(matrix > 0, axis=1) # >0 needed for handling nan values properly.
return nonzero_counts
[docs] def split(self, condition):
""" Return two new :class:`EMmatrix`, which do and do not satisfy condition
:param condition: (96,32) shaped bool array
:return: (m1, m2) where both m1 and m2 is :class`EMmatrix` object. m1 contains bins where condtions are True,
and m2 for bins where condtions are False. Otherwise, 0 are filled. (nan is kept)
>>> import numpy as np
>>> em = np.arange(96)[:, np.newaxis] + np.zeros([96, 32]) # (96, 32) array, with energy index as value
>>> em[85:] = np.nan
>>> matx = EMmatrix(em, 15, 4)
>>> meven, modd = matx.split(matx.get_matrix() % 2 == 0) # Select even values to meven and otherwise modd
>>> print(np.nanmax(meven.get_matrix()))
84.0
>>> print(np.nanmax(modd.get_matrix()))
83.0
>>> print(np.nansum(meven.get_matrix())) # sum of 2 to 84 is 1806, multiplied by 32 => 57792
57792.0
>>> (modd.get_matrix() + meven.get_matrix() == matx.get_matrix())[:85].all() # Sum of two results shall be the original.
True
>>> np.isfinite(modd.get_matrix()[85:]).any() # All the data E>=85 should not be a finite number.
False
"""
m1 = _np.where(condition, self.emmatrix, 0)
m2 = _np.where(condition, 0, self.emmatrix)
m1 = _np.where(_np.isfinite(self.emmatrix), m1, self.emmatrix)
m2 = _np.where(_np.isfinite(self.emmatrix), m2, self.emmatrix)
m1 = EMmatrix(m1, self.promsection, self.pacc)
m2 = EMmatrix(m2, self.promsection, self.pacc)
return (m1, m2)
[docs] def ascii_dump(self, fp=None):
""" Ascii dump
>>> import numpy as np
>>> arr = np.random.poisson(0.2, size=(96,32))
>>> em = EMmatrix(arr, 15, 4)
>>> em.ascii_dump() # doctest: +SKIP
"""
if fp is None:
fp = _sys.stdout
ne, nm = self.emmatrix.shape
for ie in range(ne)[::-1]:
print(f'{ie:02d} | ', file=fp, end='')
for im in range(nm):
print(' {:4g}'.format(self.emmatrix[ie, im]), file=fp, end='')
print(file=fp)
print('-' * 165, file=fp)
print(' ', file=fp, end='')
for im in range(nm):
print(f' {im:4g}', file=fp, end='')
def __add__(self, other):
""" Adding.
:param other: Either :class:`EMmatrix`, or a numpy array.
>>> import numpy as np
>>> m1 = EMmatrix(np.ones([96, 32]), 15, 4) # A matrix with all values 1
>>> m2 = EMmatrix(np.ones([96, 32]), 15, 4) # Another matrix, as well all values 1
>>> m3 = m1 + m2 # Should be all values 2.
>>> print(m3.get_matrix().sum()) # Thus, the sum should be 2 * 32 * 96 ~6144
6144.0
>>> m4 = m1 + np.ones([96, 32]) # One can also add an array
>>> print(m4.get_matrix().sum())
6144.0
"""
try:
promsection = other.promsection
pacc = other.pacc
matrix = other.emmatrix
except AttributeError: # In this case, other is assumed to be numpy array.
promsection = self.promsection
pacc = self.pacc
matrix = other
if promsection != self.promsection:
raise ValueError('Promsection mismatch')
if pacc != self.pacc:
raise ValueError('PACC mismatch')
return EMmatrix(matrix + self.emmatrix, promsection, pacc)
[docs] def zeros_like(self):
""" Produce a new EMmatrix, with matrix filled by zero.
:return: Zero-filled EMmatrix. Other parameters are the same as ``self``.
>>> import numpy as np
>>> em = np.arange(96)[:, np.newaxis] + np.zeros([96, 32]) # (96, 32) array, with energy index as value
>>> em[85:] = np.nan
>>> matx = EMmatrix(em, 15, 4)
>>> zeromatx = matx.zeros_like()
>>> print(zeromatx.emmatrix.max())
0.0
>>> print(zeromatx.pacc)
4
>>> print(zeromatx.promsection)
15
"""
return EMmatrix(_np.zeros_like(self.emmatrix), self.promsection, self.pacc)
[docs]def buster_rule1(emmatrix: EMmatrix):
buster_obj = BusterSingleEnergySpillover(emmatrix)
emmatrix_residual, emmatrix_ghost = buster_obj.buster()
return emmatrix_residual, emmatrix_ghost
[docs]def buster_rule2(emmatrix: EMmatrix):
buster_obj = BusterSpillOverManyNonzeros(emmatrix)
emmatrix_residual, emmatrix_ghost = buster_obj.buster()
return emmatrix_residual, emmatrix_ghost
[docs]def buster_rule3(emmatrix: EMmatrix):
buster_obj = BusterSpillOverManyNonzeros(emmatrix)
emmatrix_residual, emmatrix_ghost = buster_obj.buster(threshold=10, m0=14, m1=28)
return emmatrix_residual, emmatrix_ghost
[docs]def buster_rule4(emmatrix: EMmatrix):
buster_obj = BusterSpillOverManyNonzeros(emmatrix)
emmatrix_residual, emmatrix_ghost = buster_obj.buster(threshold=7, m0=10, m1=20)
return emmatrix_residual, emmatrix_ghost
[docs]def buster_rule5(emmatrix: EMmatrix):
buster_obj = BusterWeakSolarwind(emmatrix)
emmatrix_residual, emmatrix_ghost = buster_obj.buster()
return emmatrix_residual, emmatrix_ghost
[docs]def buster_rule6(emmatrix: EMmatrix):
buster_obj = BusterTooManyCountInMiddle(emmatrix)
emmatrix_residual, emmatrix_ghost = buster_obj.buster()
return emmatrix_residual, emmatrix_ghost
[docs]def buster_rule7(emmatrix: EMmatrix):
buster_obj = BusterTooManyCountInLowMassChannels(emmatrix)
emmatrix_residual, emmatrix_ghost = buster_obj.buster()
return emmatrix_residual, emmatrix_ghost
[docs]def condition_estep_range(estep0, estep1):
""" Make a condition matrix.
:param estep0: Minimum energy step. Inclusive.
:param estep1: Maximum energy step. Inclusive.
:return: Bool array of (96,32), with [estep0:estep1+1, :] as True, otherwise false.
>>> cond = condition_estep_range(30, 50)
>>> cond.shape
(96, 32)
>>> print(cond[30, :].all()) # Estep-30 is True for all
True
>>> print(cond[50, :].all()) # Estep=50 is True for all
True
>>> print(cond[29, :].any()) # Estep-29 is False for all
False
>>> print(cond[51, :].any()) # Estep=51 is false for all
False
"""
boolmatrix = _np.zeros([96, 32], dtype=bool)
boolmatrix[estep0:estep1 + 1] = True
return boolmatrix
[docs]class MassFilter:
""" Handles mass in the EM matrix.
"""
def __init__(self, promsection, pacc):
_logger = _logging.getLogger(__name__)
self.promsection = promsection
self.pacc = pacc
self.energy_table = _ene.get_default_table(promsection, keep_negative=True)
self.energy_table = _np.where(self.energy_table < 0, _np.nan, self.energy_table) # Mask unused energy table.
[docs] def rm(self, m):
# return _np.array([_massring.rm(_, m, self.pacc) for _ in self.energy_table])
return _np.array(_massring.central_mass_channel(m, self.promsection, self.pacc))
[docs] def dm(self, m):
dmlist = _np.zeros_like(self.energy_table) + _massring.dm0() # always 12 for MEX
return dmlist
[docs] def massline_proton_minimum(self, acceptance_proton=2.0):
massline_proton_minimum_list = self.rm(1) - acceptance_proton * self.dm(1)
return massline_proton_minimum_list
[docs] def condition_real_proton(self, acceptance_proton=2.0):
""" Return a bool array specifying the real proton
:return: (96,32) bool array. True for the real proton, False for non-trivial proton.
>>> filter = MassFilter(15, 4)
>>> matx = filter.condition_real_proton()
>>> matx.shape
(96, 32)
>>> print(matx[19, 24:26]) # Energy step 17, Mass step 24 is False, but 25 is True
[False True]
>>> print('%.2f' % filter.massline_proton_minimum()[19]) # ... as the minimum proton line sits 24.63.
24.63
"""
boolmatrix = _np.zeros((96, 32), dtype=bool)
massline_thresh = self.massline_proton_minimum(acceptance_proton=acceptance_proton) # (96,) float array
massline_thresh = _np.where(_np.isnan(massline_thresh), 32., _np.ceil(massline_thresh))
massline_thresh = massline_thresh.astype(int)
for ie in range(96):
boolmatrix[ie, massline_thresh[ie]:] = True
return boolmatrix
[docs] def condition_shouldbemin(self, m0=5, m1=12):
""" ``shouldbemin`` parameter by HN.
My understanding is that this area no much foregound is expected, so that ghost dominates.
:param m0: Lowest mass in amu
:param m1: Highest mass in amu
:return: A conditon matrix (96, 32)
"""
m0line = self.rm(m0)
m1line = self.rm(m1)
boolmatrix = _np.zeros((96, 32), dtype=bool)
for ie in range(96):
try:
minimum_index = int(_np.ceil(m1line[ie]))
maximum_index = int(_np.ceil(m0line[ie]))
if maximum_index > 32:
maximum_index = 32
if minimum_index < 32:
boolmatrix[ie, minimum_index:maximum_index] = True
except ValueError: # NaN should be handled here.
pass
return boolmatrix
[docs]def simple_plotting(em: _np.ndarray, title=""):
""" Simple plotting.
:param em: (96, 32) array.
"""
elist = _np.linspace(-0.5, 95.5, 97)
mlist = _np.linspace(-0.5, 31.5, 33)
_plt.subplot(111, xlim=(-0.5, 31.5), ylim=(95.5, -0.5), xlabel='Mass channel', ylabel='Energy step', title=title)
_plt.pcolormesh(mlist, elist, em)
if __name__ == "__main__":
import numpy as np
em = np.arange(96)[:, np.newaxis] + np.zeros([96, 32]) # (96, 32) array, with energy index as value
em[85:] = np.nan
_plt.figure()
# The original data
simple_plotting(em)
promsection = 15
pacc = 4
# Split test, based on real proton
emmatrix = EMmatrix(em, promsection, pacc)
filter = MassFilter(promsection, pacc)
condition_real_proton = filter.condition_real_proton()
em_real, em_nonreal = emmatrix.split(condition_real_proton)
m1line = filter.rm(1)
d1line = filter.dm(1)
m2line = filter.rm(2)
_plt.figure()
simple_plotting(em_real.emmatrix)
_plt.plot(m1line, np.arange(96), color='b')
_plt.plot(m1line - d1line * 2, np.arange(96), color='w')
_plt.plot(m2line, np.arange(96), color='b')
_plt.figure()
simple_plotting(em_nonreal.emmatrix)
_plt.plot(m1line, np.arange(96), color='b')
_plt.plot(m1line - d1line * 2, np.arange(96), color='w')
_plt.plot(m2line, np.arange(96), color='b')
# Split test, based on shouldbemin
m5line = filter.rm(5)
m12line = filter.rm(12)
em_empty, em_nonempty = em_nonreal.split(filter.condition_shouldbemin())
_plt.figure()
simple_plotting(em_empty.emmatrix)
_plt.plot(m5line, np.arange(96), color='b')
_plt.plot(m12line, np.arange(96), color='b')
_plt.figure()
simple_plotting(em_nonempty.emmatrix)
_plt.plot(m5line, np.arange(96), color='b')
_plt.plot(m12line, np.arange(96), color='b')
# Split test, based on energy steps
condition_e40_60 = condition_estep_range(40, 60)
em_inside, em_outside = emmatrix.split(condition_e40_60)
_plt.figure()
simple_plotting(em_inside.emmatrix, title="E40-60")
_plt.figure()
simple_plotting(em_outside.emmatrix, title="Outside E40-60")
# buster test (1) : Not implemented
# buster test (2-4)
matrix = np.zeros([96, 32])
matrix[40, :] = 1 # E=40, all the data is 1. Total 32.
matrix[39, 3:-3] = 2 # E=39 , the data is 2. Total 2x26 = 52
matrix[38, 6:-6] = 3 # E=38, the data is 3. Total 3x20 = 60
matrix[37, 9:-9] = 4 # E=37, the data is 4. Total 4x14 = 56
matrix[36, 12:-12] = 5 # #=36, the data is 5. Total 5x8 = 40
matrix[35, 15:-15] = 6 # E=35, the data is 6, total, 6x2 = 12
matrix[41, 3:-3] = 2 # E=39 , the data is 2. Total 2x26 = 52
matrix[42, 6:-6] = 3 # E=38, the data is 3. Total 3x20 = 60
matrix[43, 9:-9] = 4 # E=37, the data is 4. Total 4x14 = 56
matrix[44, 12:-12] = 5 # #=36, the data is 5. Total 5x8 = 40
matrix[45, 15:-15] = 6 # E=35, the data is 6, total, 6x2 = 12
_plt.figure(); simple_plotting(emmatrix.get_matrix(), title='rule2-4 / original')
emmatrix = EMmatrix(matrix, 15, 4)
m1, m2 = buster_rule2(emmatrix)
_plt.figure(); simple_plotting(m1.get_matrix(), 'rule2, residual')
_plt.figure(); simple_plotting(m2.get_matrix(), 'rule2, ghost')
m1, m2 = buster_rule3(emmatrix)
_plt.figure(); simple_plotting(m1.get_matrix(), 'rule3, residual')
_plt.figure(); simple_plotting(m2.get_matrix(), 'rule3, ghost')
m1, m2 = buster_rule4(emmatrix)
_plt.figure(); simple_plotting(m1.get_matrix(), 'rule4, residual')
_plt.figure(); simple_plotting(m2.get_matrix(), 'rule4, ghost')
_plt.show()