Source code for irfpy.mima.ghostbuster

""" Handling proton ghost.

Proton ghost is a problem for MEX data handling.
This module provide an algorithm to reduce the ghost.

Usage: To be written.

Algorithm:
The algorithm used is based on Hans' implementation of Matlab.
Some modifications have been done (both intentionally and unintentionally), but
I hope general tendency is kept.

There are 7 rules defined by Hans, but so far three of simple rules have been implemented
(Rules 2, 3, and 4).

The reduction is done for (E96,M32) matrix.

Usually, the matrix should already be corrected by
- background subtraction
- missing mass interpolation
- masking the blocked FOV
but more detailed explanations should be written.

"""
import sys as _sys
import logging as _logging
import numpy as _np
import matplotlib.pyplot as _plt
from irfpy.mima import massring as _massring
from irfpy.mima import energy as _ene
import copy as _copy


[docs]def maximax(arr1d): """ Return a pair of (max, maxarg) for arr1d. :param arr1d: 1-dimensional array :return: (max, maxarg) """ maxval = _np.nanmax(arr1d) imaxval = _np.nanargmax(arr1d) return (maxval, imaxval)
[docs]def minimin(arr1d): minval = _np.nanmin(arr1d) iminval = _np.nanargmin(arr1d) return (minval, iminval)
[docs]def indexes_energy_at(etable, energy): """ Return the index pair where the given energy bisect. :param etable: Energy array, nan allowed :param energy: Enegy :return: (i0, i1) where etable[i0] > energy > etable[i1] (To check which contains the equal, but not very sensitive) >>> from irfpy.mima import energy >>> import numpy as np >>> etable = energy.get_default_table_v5_late(negative=True) >>> etable = np.where(etable > 0, etable, np.nan) >>> i0, i1 = indexes_energy_at(etable, 200) >>> print(i0, etable[i0]) 49 215.7 >>> print(i1, etable[i1]) 50 193.0 """ higher = _np.where(etable > energy)[0] lower = _np.where(etable <= energy)[0] return (_np.nanmax(higher), _np.nanmin(lower))
[docs]def increment(value, by=1, max=_np.infty, min=-_np.infty): """ A helper function to increment (and to decrement) :param value: The original value :param by: A value to add to the original. Default of 1 is increment. If you specify -1, it corresponds to decrement. :param max: Maximum of possible value :param min: Minimum of possible value :return: ``value + by`` is returned. If it exceed ``max``, max is returned. If it becomes below ``min``, min is returned. """ newvalue = value + by if newvalue > max: newvalue = max elif newvalue < min: newvalue = min return newvalue
[docs]class BusterBase: def __init__(self, emmatrix, lowenergy_limit_eV=100, highenergy_limit_eV=2000): """ :param emmatrix: :param lowenergy_limit_eV: :param highenergy_limit_eV: :param delta: :param lim: """ self.emmatrix = emmatrix # Original EM matrix # self.processed = _copy.deepcopy(emmatrix) # Processed EM matrix. Initialized by original as copy self.lowenergy_limit_eV = lowenergy_limit_eV self.highenergy_limit_eV = highenergy_limit_eV self.massfilter = MassFilter(emmatrix.promsection, emmatrix.pacc) lowenergylim, _i1 = indexes_energy_at(emmatrix.energy_table, lowenergy_limit_eV) highenergylim, _i1 = indexes_energy_at(emmatrix.energy_table, highenergy_limit_eV) self.min_estep = highenergylim # A bit conflusing, but minimum energy step is maximum energy in eV self.max_estep = lowenergylim
[docs] def increment(self, i): """ Increment. :param i: Original value :return: Incremented value, ``i+1``, or ``self.max_estep`` if ``i+1>self.max_estep`` >>> import numpy as np >>> emmatrix = EMmatrix(np.zeros([96,32]), 15, 4) # prom=15 and pacc=4 >>> bbase = BusterBase(emmatrix, lowenergy_limit_eV=200) >>> print(bbase.increment(30)) 31 >>> print(bbase.max_estep) 49 >>> print(bbase.increment(49)) # It does not go to 50, but is kept 49 49 """ i1 = i + 1 if (i + 1 <= self.max_estep) else self.max_estep return i1
[docs] def decrement(self, i): """ Decrement. :param i: Original value :return: Decremented value, ``i-1``, or ``self.min_estep`` if ``i-1 < max.minstep``. >>> import numpy as np >>> emmatrix = EMmatrix(np.zeros([96,32]), 15, 4) # prom=15 and pacc=4 >>> bbase = BusterBase(emmatrix, highenergy_limit_eV=1000) >>> print(bbase.decrement(40)) 39 >>> print(bbase.min_estep) 31 >>> print(bbase.decrement(30)) # It does not go to 29, but goes to 31 31 """ i1 = i - 1 if (i - 1 >= self.min_estep) else self.min_estep return i1
[docs] def split_proton(self): """ Split the self.original EMmatrix to two EMmatrixes. Proton and Nonproton. :return: """ # First, remove protons emmatrix_proton, emmatrix_noproton = self.emmatrix.split(self.massfilter.condition_real_proton()) return emmatrix_proton, emmatrix_noproton
[docs] def get_noproton_matrix(self): em0, em1 = self.split_proton() return em1 # Return only for non-proton
[docs] def split_energy_range_noproton(self, e0, e1): """ Split the self.original EMmatrix to two EMmatrixes. Inside or outside the energy step range [e0,e1] :param e0: :param e1: :return: A pair of EMmatrix, inside matrix and outside matrix """ cond = condition_estep_range(e0, e1) # True for selected energy steps cond2 = ~self.massfilter.condition_real_proton() # True for non-proton region cond = (cond & cond2) emmatrix_inside, emmatrix_outside = self.emmatrix.split(cond) return emmatrix_inside, emmatrix_outside
[docs] def buster(self): """ :return: A pair of :class:`EMmatrix` (em1, em2). em1 is for the cleaned matrix, and em2 is for the matrix that was subtracted. """ raise NotImplementedError('')
[docs]class BusterSingleEnergySpillover(BusterBase): def __init__(self, *args, **kwds): BusterBase.__init__(self, *args, **kwds) self.mass5curve = _massring.central_mass_channel(5, self.emmatrix.promsection, self.emmatrix.pacc) self.mass12curve = _massring.central_mass_channel(12, self.emmatrix.promsection, self.emmatrix.pacc)
[docs] def buster(self, lim=14, delta=3, minimum_ratio_heavy_middle=5, minimum_ratio_ghost_proton=2, ): _logger = _logging.getLogger(__name__) # Take an original matrix emmatrix = self.emmatrix # Initialize the returned matrix emmatrix_ghost = self.emmatrix.zeros_like() emmatrix_real = self.emmatrix massfilter = MassFilter(emmatrix.promsection, emmatrix.pacc) # First, remove protons emmatrix_proton, emmatrix_noproton = emmatrix.split(massfilter.condition_real_proton()) matrix = emmatrix_noproton.get_matrix() # Numpy array ions = matrix # Alias lowenergylim = self.max_estep highenergylim = self.min_estep Eilim = highenergylim Espect = emmatrix_noproton.get_sum_energy_spectra() # Energy spectra espect = Espect # Alias... cH, iH = maximax(Espect) iHup = self.increment(iH) iHdown = self.decrement(iH) tmpmax = _np.nanmax(ions, axis=0) # Maximum count for each mass channel itmp = _np.nanargmax(ions, axis=0) # Energy step that gives the maximum count for each mass channel tst = _np.where(_np.abs(_np.diff(itmp)) <= delta)[0] # Maximum energy is confined ind = int(_np.nanmedian(itmp)) if iH < self.min_estep or iH > self.max_estep: return emmatrix_real, emmatrix_ghost if len(tst) >= lim: # I wonder if jj is iH. In HN code, jj looks a (fixed) according to the for loop far before. # Let's try. #jj = a jj = iH while (Eilim < iHup < lowenergylim) & (espect[iHup] < espect[iHup - 1]): iHup = self.increment(iHup) while (Eilim < iHdown < lowenergylim) & (espect[iHdown] < espect[iHdown + 1]): iHdown = self.decrement(iHdown) maxE = _np.max(ions, axis=1) maxcnt = max(ions[iH]) cnt32 = self.emmatrix.get_matrix(copy=False)[iH, 31] ibb = int(self.mass12curve[jj]) # The last mass channel of M>12 icc = int(self.mass5curve[jj]) # The last mass channle of M>5 bb = _np.arange(0, ibb + 1) cc = _np.arange(ibb + 1, icc + 1) hvy = float(_np.nanmedian(ions[jj, bb])) noions = float(_np.nanmedian(ions[jj, cc])) median_heavy = hvy median_middle = noions _logger.debug(f'{Espect[iH]}') _logger.debug(f'{Espect[iHdown]}') _logger.debug(f'{Espect[iHup]}') _logger.debug(f'{ind}') _logger.debug(f'{iH}') _logger.debug(f'{delta}') _logger.debug(f'{Eilim}') _logger.debug(f'{maxcnt}') _logger.debug(f'{cnt32}') if ((Espect[iH] > 2 * Espect[iHdown]) & (Espect[iH] > 2 * Espect[iHup]) & (abs(ind - iH) < delta) & (iH > Eilim) & (maxcnt > minimum_ratio_ghost_proton * cnt32) & (noions > hvy / minimum_ratio_heavy_middle)): emmatrix_real, emmatrix_ghost = self.split_energy_range_noproton(iHdown, iHup) return emmatrix_real, emmatrix_ghost
[docs]class BusterSpillOverManyNonzeros(BusterBase): def __init__(self, *args, **kwds): BusterBase.__init__(self, *args, **kwds)
[docs] def buster(self, threshold=28, m0=0, m1=31): """ Buster the spill over. :param threshold: Threshold. If more than this number between [m0,m1], this rule is triggered. :return: a pair of :class:`EMmatrix` If many mass channel is occupied, this condition is triggered. For example, let's try to make the following array. >>> import numpy as np >>> matrix = np.zeros([96, 32]) >>> matrix[40, :] = 1 # E=40, all the data is 1. Total 32. >>> matrix[39, 3:-3] = 2 # E=39 , the data is 2. Total 2x26 = 52 >>> matrix[38, 6:-6] = 3 # E=38, the data is 3. Total 3x20 = 60 >>> matrix[37, 9:-9] = 4 # E=37, the data is 4. Total 4x14 = 56 >>> matrix[36, 12:-12] = 5 # #=36, the data is 5. Total 5x8 = 40 >>> matrix[35, 15:-15] = 6 # E=35, the data is 6, total, 6x2 = 12 >>> matrix[41, 3:-3] = 2 # E=39 , the data is 2. Total 2x26 = 52 >>> matrix[42, 6:-6] = 3 # E=38, the data is 3. Total 3x20 = 60 >>> matrix[43, 9:-9] = 4 # E=37, the data is 4. Total 4x14 = 56 >>> matrix[44, 12:-12] = 5 # #=36, the data is 5. Total 5x8 = 40 >>> matrix[45, 15:-15] = 6 # E=35, the data is 6, total, 6x2 = 12 >>> emmatrix = EMmatrix(matrix, 15, 4) >>> buster_matrix = BusterSpillOverManyNonzeros(emmatrix) >>> m1, m2 = buster_matrix.buster() """ nonproton_matrix = self.get_noproton_matrix() nonzeros = nonproton_matrix.get_nonzero_count_energy_spectrum(m0=m0, m1=m1) # (96,) shape. Value is number of mass step that has counts espectra = nonproton_matrix.get_sum_energy_spectra() # (96,) shape. Value is the total counts (mass collapsed) _logger = _logging.getLogger(__name__) if max(nonzeros) > threshold: _logger.info('Criteria hit.') # Which energy step is influenced? # Energy step that has spilled over estep_max_influenced = _np.nanargmax(nonzeros) # Energy range that is influenced estep_influed_min = self.decrement(estep_max_influenced) estep_influed_max = self.increment(estep_max_influenced) # Low energy step to extend according to the energy spectra while (estep_influed_min > self.min_estep) & (espectra[estep_influed_min] <= espectra[estep_influed_min - 1]): estep_influed_min = self.decrement(estep_influed_min) while (estep_influed_max < self.max_estep) & (espectra[estep_influed_max] <= espectra[estep_influed_max + 1]): estep_influed_max = self.increment((estep_influed_max)) # The influence energy range is extended by 1 for each direction according to HN, but not adopted here (TBD) #estep_influed_max += 1 #estep_influed_min -= 1 # Split the emmatrix according to the energy steps emmatrix_ghost, emmatrix_real = self.split_energy_range_noproton(estep_influed_min, estep_influed_max) else: emmatrix_ghost = self.emmatrix.zeros_like() emmatrix_real = self.emmatrix return emmatrix_real, emmatrix_ghost
[docs]class BusterWeakSolarwind(BusterBase): def __init__(self, *args, **kwds): BusterBase.__init__(self, *args, **kwds) self._logger = _logging.getLogger(__name__)
[docs] def buster(self): """ Buster weak SW :return: """ emmatrix = self.get_noproton_matrix() espectra = emmatrix.get_sum_energy_spectra() max_espect, ie_max_espect = maximax(espectra) nonzeros = emmatrix.get_nonzero_count_energy_spectrum() # (96,) shape. Value is number of mass step that has counts max_nonzeros, ie_max_nonzeros = maximax(nonzeros) max_m0, ie_max_m0 = maximax(emmatrix.get_matrix()[:, 0]) maxc, im_maxc = maximax(emmatrix.get_matrix()[ie_max_espect, 1:]) if (_np.count_nonzero(nonzeros > 2) <= 5) & max_nonzeros >=8 & im_maxc >= 15 & im_maxc <= 25: # Which energy step is influenced? # Energy step that has spilled over estep_max_influenced = _np.nanargmax(nonzeros) # Energy range that is influenced estep_influed_min = self.decrement(estep_max_influenced) estep_influed_max = self.increment(estep_max_influenced) # Low energy step to extend according to the energy spectra while (estep_influed_min > self.min_estep) & (espectra[estep_influed_min] <= espectra[estep_influed_min - 1]): estep_influed_min = self.decrement(estep_influed_min) while (estep_influed_max < self.max_estep) & (espectra[estep_influed_max] <= espectra[estep_influed_max + 1]): estep_influed_max = self.increment((estep_influed_max)) # Split the emmatrix according to the energy steps emmatrix_ghost, emmatrix_real = self.split_energy_range_noproton(estep_influed_min, estep_influed_max) else: emmatrix_ghost = self.emmatrix.zeros_like() emmatrix_real = self.emmatrix return emmatrix_real, emmatrix_ghost
[docs]class BusterTooManyCountInMiddle(BusterBase): """ Criteria 6. If too many counts exist in amu 5-12 area, the count <5 amu to be ghost """ def __init__(self, *args, **kwds): BusterBase.__init__(self, *args, **kwds) self._logger = _logging.getLogger(__name__) self.mass5curve = _massring.central_mass_channel(5, self.emmatrix.promsection, self.emmatrix.pacc) self.mass12curve = _massring.central_mass_channel(12, self.emmatrix.promsection, self.emmatrix.pacc) self.mask_heavy = _np.zeros([96, 32], dtype=bool) self.mask_middle = _np.zeros([96, 32], dtype=bool) self.mask_ghost = _np.zeros([96, 32], dtype=bool) for ie in range(self.min_estep, self.max_estep + 1): mass5 = self.mass5curve[ie] mass12 = self.mass12curve[ie] im_heavy = _np.arange(0, mass12, dtype=int) im_middle = _np.arange(int(mass12), mass5, dtype=int) self.mask_heavy[ie, im_heavy] = True self.mask_middle[ie, im_middle] = True
[docs] def buster(self, threshould=10.0): """ Buster the ghost. :param threshould: The threshold, k. :rtype: (:class:`EMmatrix`, :class:`EMmatrix`) :returns: A pair of matrix. Real and ghost matrixes. We use the following rule for each energy step in the ghost energy range (defined in __init__ as parameter). Calculate - ``<Cmiddle>``, a median count in the middle mass channel (mass 5-12 amu) - ``<Cheavy>``, a median count in the heavy mass channel (mass 12 and above) Then, if ``<Cmiddle> > <Cheavy> / k``, then this rule is activated. The rule will then consider the counts at mass 5 amu and above as ghost. """ matrix = self.emmatrix.get_matrix() # It is (96,32 )array condition = _np.zeros([96, 32], dtype=bool) # Median value for each energy step heavy_matrix = _np.ma.masked_array(matrix, mask=self.mask_heavy) heavy_matrix_median = _np.ma.median(heavy_matrix, axis=1) # For each energy heavy_matrix_median.fill(_np.nan) middle_matrix = _np.ma.masked_array(matrix, mask=self.mask_middle) middle_matrix_median = _np.ma.median(middle_matrix, axis=1) middle_matrix_median.fill(_np.nan) for ie in range(self.min_estep, self.max_estep + 1): heavy_median = heavy_matrix_median[ie] middle_median = middle_matrix_median[ie] if (middle_median > 0) & (middle_median >= heavy_median / threshould): im_ghost = _np.arange(0, self.mass5curve[ie], dtype=int) condition[ie, im_ghost] = True emmatrix_ghost, emmatrix_real = self.emmatrix.split(condition) return emmatrix_real, emmatrix_ghost
[docs]class BusterTooManyCountInLowMassChannels(BusterBase): """ Criteria 7. If too many counts in low mass channel than middle (5-12). """ def __init__(self, *args, **kwds): BusterBase.__init__(self, *args, **kwds) self._logger = _logging.getLogger(__name__) self.mass5curve = _massring.central_mass_channel(5, self.emmatrix.promsection, self.emmatrix.pacc) self.mass12curve = _massring.central_mass_channel(12, self.emmatrix.promsection, self.emmatrix.pacc)
[docs] def buster(self, threshold=2): matrix = self.emmatrix.get_matrix() condition = _np.zeros([96, 32], dtype=bool) for ie in range(self.min_estep, self.max_estep + 1): if sum(matrix[ie, 1:3]) > sum(matrix[ie, 3:5]) / threshold: condition[ie, :15] = True emmatrix_ghost, emmatrix_real = self.emmatrix.split(condition) return emmatrix_real, emmatrix_ghost
[docs]class EMmatrix: """ Energy-Mass matrix data & associated table. Making EMmatrix by hand. >>> import numpy as np >>> em = np.arange(96)[:, np.newaxis] + np.zeros([96, 32]) # (96, 32) array, with energy index as value >>> em[85:] = np.nan >>> matx = EMmatrix(em, 15, 4) Getting data. >>> em1 = matx.get_matrix() # Just return the matrix. Should be the same to em >>> (em == em1)[:85].all() True Collapsing the matrix. >>> espect = matx.get_max_energy_spectra() # Energy spectra, maximum along mass. >>> espect[30] # As defined above, it should be the index. 30.0 >>> mspect = matx.get_max_mass_spectra() # Mass spectra. This case, constantly 84. >>> mspect[15] 84.0 >>> espect = matx.get_sum_energy_spectra() >>> espect[10] # Sum shall be the index time 32. 320.0 >>> mspect = matx.get_sum_mass_spectra() >>> mspect[10] # Sum shall be constantly the sum from 0 to 84. 3570.0 """ def __init__(self, emmatrix: _np.ndarray, promsection: int, pacc: int,): """ :param emmatrix: E-M matrix with (96, 32) shape. :type emmatrix: `numpy.ndarray` :param promsection: PROM section. 0 to 16. :param pacc: PACC. 0 to 7. :keyword acceptance_proton: Acceptance mass channel for protons. ``mass channel > Rm + Dm * acceptace_proton`` is considered as real proton. >>> em = EMmatrix(_np.zeros([96, 32]), 16, 4) """ _logger = _logging.getLogger(__name__) if emmatrix.ndim != 2: raise ValueError('Given EM matrix has dimension of {}, while EM matrix should be 2 dim.'.format(emmatrix.ndim)) self.emmatrix = emmatrix """ The E-M matrix. """ self.promsection = promsection self.pacc = pacc self.energy_table = _ene.get_default_table(promsection, keep_negative=True) self.energy_table = _np.where(self.energy_table < 0, _np.nan, self.energy_table) # Mask unused energy table.
[docs] def get_matrix(self, *args, **kwds): return _np.array(self.emmatrix, *args, **kwds)
[docs] def get_sum_energy_spectra(self): return _np.nansum(self.emmatrix, axis=1)
[docs] def get_sum_mass_spectra(self): return _np.nansum(self.emmatrix, axis=0)
[docs] def get_max_energy_spectra(self): return _np.nanmax(self.emmatrix, axis=1)
[docs] def get_max_mass_spectra(self): return _np.nanmax(self.emmatrix, axis=0)
[docs] def get_nonzero_count_mass_spectrum(self): """ Return the number of nonzero energy step for each mass channel. :return: Number of nonzero energy step for each mass channel. (32,) shaped """ matrix = self.emmatrix # Count non-zeros along energy. nonzero_counts = _np.count_nonzero(matrix > 0, axis=0) # >0 for treating nan value. return nonzero_counts
[docs] def get_nonzero_count_energy_spectrum(self, m0=0, m1=31): """ Return the number of nonzero mass channels for each energy step. :return: Number of nonzero mass channel for each energy step. (96,) shaped """ matrix = self.emmatrix[:, m0:m1+1] nonzero_counts = _np.count_nonzero(matrix > 0, axis=1) # >0 needed for handling nan values properly. return nonzero_counts
[docs] def split(self, condition): """ Return two new :class:`EMmatrix`, which do and do not satisfy condition :param condition: (96,32) shaped bool array :return: (m1, m2) where both m1 and m2 is :class`EMmatrix` object. m1 contains bins where condtions are True, and m2 for bins where condtions are False. Otherwise, 0 are filled. (nan is kept) >>> import numpy as np >>> em = np.arange(96)[:, np.newaxis] + np.zeros([96, 32]) # (96, 32) array, with energy index as value >>> em[85:] = np.nan >>> matx = EMmatrix(em, 15, 4) >>> meven, modd = matx.split(matx.get_matrix() % 2 == 0) # Select even values to meven and otherwise modd >>> print(np.nanmax(meven.get_matrix())) 84.0 >>> print(np.nanmax(modd.get_matrix())) 83.0 >>> print(np.nansum(meven.get_matrix())) # sum of 2 to 84 is 1806, multiplied by 32 => 57792 57792.0 >>> (modd.get_matrix() + meven.get_matrix() == matx.get_matrix())[:85].all() # Sum of two results shall be the original. True >>> np.isfinite(modd.get_matrix()[85:]).any() # All the data E>=85 should not be a finite number. False """ m1 = _np.where(condition, self.emmatrix, 0) m2 = _np.where(condition, 0, self.emmatrix) m1 = _np.where(_np.isfinite(self.emmatrix), m1, self.emmatrix) m2 = _np.where(_np.isfinite(self.emmatrix), m2, self.emmatrix) m1 = EMmatrix(m1, self.promsection, self.pacc) m2 = EMmatrix(m2, self.promsection, self.pacc) return (m1, m2)
[docs] def ascii_dump(self, fp=None): """ Ascii dump >>> import numpy as np >>> arr = np.random.poisson(0.2, size=(96,32)) >>> em = EMmatrix(arr, 15, 4) >>> em.ascii_dump() # doctest: +SKIP """ if fp is None: fp = _sys.stdout ne, nm = self.emmatrix.shape for ie in range(ne)[::-1]: print(f'{ie:02d} | ', file=fp, end='') for im in range(nm): print(' {:4g}'.format(self.emmatrix[ie, im]), file=fp, end='') print(file=fp) print('-' * 165, file=fp) print(' ', file=fp, end='') for im in range(nm): print(f' {im:4g}', file=fp, end='')
def __add__(self, other): """ Adding. :param other: Either :class:`EMmatrix`, or a numpy array. >>> import numpy as np >>> m1 = EMmatrix(np.ones([96, 32]), 15, 4) # A matrix with all values 1 >>> m2 = EMmatrix(np.ones([96, 32]), 15, 4) # Another matrix, as well all values 1 >>> m3 = m1 + m2 # Should be all values 2. >>> print(m3.get_matrix().sum()) # Thus, the sum should be 2 * 32 * 96 ~6144 6144.0 >>> m4 = m1 + np.ones([96, 32]) # One can also add an array >>> print(m4.get_matrix().sum()) 6144.0 """ try: promsection = other.promsection pacc = other.pacc matrix = other.emmatrix except AttributeError: # In this case, other is assumed to be numpy array. promsection = self.promsection pacc = self.pacc matrix = other if promsection != self.promsection: raise ValueError('Promsection mismatch') if pacc != self.pacc: raise ValueError('PACC mismatch') return EMmatrix(matrix + self.emmatrix, promsection, pacc)
[docs] def zeros_like(self): """ Produce a new EMmatrix, with matrix filled by zero. :return: Zero-filled EMmatrix. Other parameters are the same as ``self``. >>> import numpy as np >>> em = np.arange(96)[:, np.newaxis] + np.zeros([96, 32]) # (96, 32) array, with energy index as value >>> em[85:] = np.nan >>> matx = EMmatrix(em, 15, 4) >>> zeromatx = matx.zeros_like() >>> print(zeromatx.emmatrix.max()) 0.0 >>> print(zeromatx.pacc) 4 >>> print(zeromatx.promsection) 15 """ return EMmatrix(_np.zeros_like(self.emmatrix), self.promsection, self.pacc)
[docs]def buster_rule1(emmatrix: EMmatrix): buster_obj = BusterSingleEnergySpillover(emmatrix) emmatrix_residual, emmatrix_ghost = buster_obj.buster() return emmatrix_residual, emmatrix_ghost
[docs]def buster_rule2(emmatrix: EMmatrix): buster_obj = BusterSpillOverManyNonzeros(emmatrix) emmatrix_residual, emmatrix_ghost = buster_obj.buster() return emmatrix_residual, emmatrix_ghost
[docs]def buster_rule3(emmatrix: EMmatrix): buster_obj = BusterSpillOverManyNonzeros(emmatrix) emmatrix_residual, emmatrix_ghost = buster_obj.buster(threshold=10, m0=14, m1=28) return emmatrix_residual, emmatrix_ghost
[docs]def buster_rule4(emmatrix: EMmatrix): buster_obj = BusterSpillOverManyNonzeros(emmatrix) emmatrix_residual, emmatrix_ghost = buster_obj.buster(threshold=7, m0=10, m1=20) return emmatrix_residual, emmatrix_ghost
[docs]def buster_rule5(emmatrix: EMmatrix): buster_obj = BusterWeakSolarwind(emmatrix) emmatrix_residual, emmatrix_ghost = buster_obj.buster() return emmatrix_residual, emmatrix_ghost
[docs]def buster_rule6(emmatrix: EMmatrix): buster_obj = BusterTooManyCountInMiddle(emmatrix) emmatrix_residual, emmatrix_ghost = buster_obj.buster() return emmatrix_residual, emmatrix_ghost
[docs]def buster_rule7(emmatrix: EMmatrix): buster_obj = BusterTooManyCountInLowMassChannels(emmatrix) emmatrix_residual, emmatrix_ghost = buster_obj.buster() return emmatrix_residual, emmatrix_ghost
[docs]def condition_estep_range(estep0, estep1): """ Make a condition matrix. :param estep0: Minimum energy step. Inclusive. :param estep1: Maximum energy step. Inclusive. :return: Bool array of (96,32), with [estep0:estep1+1, :] as True, otherwise false. >>> cond = condition_estep_range(30, 50) >>> cond.shape (96, 32) >>> print(cond[30, :].all()) # Estep-30 is True for all True >>> print(cond[50, :].all()) # Estep=50 is True for all True >>> print(cond[29, :].any()) # Estep-29 is False for all False >>> print(cond[51, :].any()) # Estep=51 is false for all False """ boolmatrix = _np.zeros([96, 32], dtype=bool) boolmatrix[estep0:estep1 + 1] = True return boolmatrix
[docs]class MassFilter: """ Handles mass in the EM matrix. """ def __init__(self, promsection, pacc): _logger = _logging.getLogger(__name__) self.promsection = promsection self.pacc = pacc self.energy_table = _ene.get_default_table(promsection, keep_negative=True) self.energy_table = _np.where(self.energy_table < 0, _np.nan, self.energy_table) # Mask unused energy table.
[docs] def rm(self, m): # return _np.array([_massring.rm(_, m, self.pacc) for _ in self.energy_table]) return _np.array(_massring.central_mass_channel(m, self.promsection, self.pacc))
[docs] def dm(self, m): dmlist = _np.zeros_like(self.energy_table) + _massring.dm0() # always 12 for MEX return dmlist
[docs] def massline_proton_minimum(self, acceptance_proton=2.0): massline_proton_minimum_list = self.rm(1) - acceptance_proton * self.dm(1) return massline_proton_minimum_list
[docs] def condition_real_proton(self, acceptance_proton=2.0): """ Return a bool array specifying the real proton :return: (96,32) bool array. True for the real proton, False for non-trivial proton. >>> filter = MassFilter(15, 4) >>> matx = filter.condition_real_proton() >>> matx.shape (96, 32) >>> print(matx[19, 24:26]) # Energy step 17, Mass step 24 is False, but 25 is True [False True] >>> print('%.2f' % filter.massline_proton_minimum()[19]) # ... as the minimum proton line sits 24.63. 24.63 """ boolmatrix = _np.zeros((96, 32), dtype=bool) massline_thresh = self.massline_proton_minimum(acceptance_proton=acceptance_proton) # (96,) float array massline_thresh = _np.where(_np.isnan(massline_thresh), 32., _np.ceil(massline_thresh)) massline_thresh = massline_thresh.astype(int) for ie in range(96): boolmatrix[ie, massline_thresh[ie]:] = True return boolmatrix
[docs] def condition_shouldbemin(self, m0=5, m1=12): """ ``shouldbemin`` parameter by HN. My understanding is that this area no much foregound is expected, so that ghost dominates. :param m0: Lowest mass in amu :param m1: Highest mass in amu :return: A conditon matrix (96, 32) """ m0line = self.rm(m0) m1line = self.rm(m1) boolmatrix = _np.zeros((96, 32), dtype=bool) for ie in range(96): try: minimum_index = int(_np.ceil(m1line[ie])) maximum_index = int(_np.ceil(m0line[ie])) if maximum_index > 32: maximum_index = 32 if minimum_index < 32: boolmatrix[ie, minimum_index:maximum_index] = True except ValueError: # NaN should be handled here. pass return boolmatrix
[docs]def simple_plotting(em: _np.ndarray, title=""): """ Simple plotting. :param em: (96, 32) array. """ elist = _np.linspace(-0.5, 95.5, 97) mlist = _np.linspace(-0.5, 31.5, 33) _plt.subplot(111, xlim=(-0.5, 31.5), ylim=(95.5, -0.5), xlabel='Mass channel', ylabel='Energy step', title=title) _plt.pcolormesh(mlist, elist, em)
if __name__ == "__main__": import numpy as np em = np.arange(96)[:, np.newaxis] + np.zeros([96, 32]) # (96, 32) array, with energy index as value em[85:] = np.nan _plt.figure() # The original data simple_plotting(em) promsection = 15 pacc = 4 # Split test, based on real proton emmatrix = EMmatrix(em, promsection, pacc) filter = MassFilter(promsection, pacc) condition_real_proton = filter.condition_real_proton() em_real, em_nonreal = emmatrix.split(condition_real_proton) m1line = filter.rm(1) d1line = filter.dm(1) m2line = filter.rm(2) _plt.figure() simple_plotting(em_real.emmatrix) _plt.plot(m1line, np.arange(96), color='b') _plt.plot(m1line - d1line * 2, np.arange(96), color='w') _plt.plot(m2line, np.arange(96), color='b') _plt.figure() simple_plotting(em_nonreal.emmatrix) _plt.plot(m1line, np.arange(96), color='b') _plt.plot(m1line - d1line * 2, np.arange(96), color='w') _plt.plot(m2line, np.arange(96), color='b') # Split test, based on shouldbemin m5line = filter.rm(5) m12line = filter.rm(12) em_empty, em_nonempty = em_nonreal.split(filter.condition_shouldbemin()) _plt.figure() simple_plotting(em_empty.emmatrix) _plt.plot(m5line, np.arange(96), color='b') _plt.plot(m12line, np.arange(96), color='b') _plt.figure() simple_plotting(em_nonempty.emmatrix) _plt.plot(m5line, np.arange(96), color='b') _plt.plot(m12line, np.arange(96), color='b') # Split test, based on energy steps condition_e40_60 = condition_estep_range(40, 60) em_inside, em_outside = emmatrix.split(condition_e40_60) _plt.figure() simple_plotting(em_inside.emmatrix, title="E40-60") _plt.figure() simple_plotting(em_outside.emmatrix, title="Outside E40-60") # buster test (1) : Not implemented # buster test (2-4) matrix = np.zeros([96, 32]) matrix[40, :] = 1 # E=40, all the data is 1. Total 32. matrix[39, 3:-3] = 2 # E=39 , the data is 2. Total 2x26 = 52 matrix[38, 6:-6] = 3 # E=38, the data is 3. Total 3x20 = 60 matrix[37, 9:-9] = 4 # E=37, the data is 4. Total 4x14 = 56 matrix[36, 12:-12] = 5 # #=36, the data is 5. Total 5x8 = 40 matrix[35, 15:-15] = 6 # E=35, the data is 6, total, 6x2 = 12 matrix[41, 3:-3] = 2 # E=39 , the data is 2. Total 2x26 = 52 matrix[42, 6:-6] = 3 # E=38, the data is 3. Total 3x20 = 60 matrix[43, 9:-9] = 4 # E=37, the data is 4. Total 4x14 = 56 matrix[44, 12:-12] = 5 # #=36, the data is 5. Total 5x8 = 40 matrix[45, 15:-15] = 6 # E=35, the data is 6, total, 6x2 = 12 _plt.figure(); simple_plotting(emmatrix.get_matrix(), title='rule2-4 / original') emmatrix = EMmatrix(matrix, 15, 4) m1, m2 = buster_rule2(emmatrix) _plt.figure(); simple_plotting(m1.get_matrix(), 'rule2, residual') _plt.figure(); simple_plotting(m2.get_matrix(), 'rule2, ghost') m1, m2 = buster_rule3(emmatrix) _plt.figure(); simple_plotting(m1.get_matrix(), 'rule3, residual') _plt.figure(); simple_plotting(m2.get_matrix(), 'rule3, ghost') m1, m2 = buster_rule4(emmatrix) _plt.figure(); simple_plotting(m1.get_matrix(), 'rule4, residual') _plt.figure(); simple_plotting(m2.get_matrix(), 'rule4, ghost') _plt.show()