Source code for irfpy.cena.empirical_lib

''' This is a module to support :mod:`irfpy.cena.empirical`.

This module is a helping module for :mod:`irfpy.cena.empirical`,
thus users do not need to understand.
There is an interface to fit the observed count rate to the
parameters in :mod:`irfpy.cena.empirical`.

The module was copied from
``110301-Paper-CenaEnergySpectra/script/average_spectra_general/src/scripts/avs5_fitting2.py``.
Then, refactored.

Below is the original comments.

The count data is fitted by any parametric functions.
You can define the parametric functions as you want.
See :meth:`flux_function_maxwell` as an example.
'''
import sys

import logging
logger = logging.getLogger('empirical_lib')
#logger.setLevel(logging.DEBUG)

import numpy as np
import scipy as sp
import scipy.optimize
import scipy.interpolate

import irfpy.util.constant as k

import irfpy.cena.energy
import irfpy.cena.cena_flux


[docs]def flux_function_maxwell(parameters, energy_in_eV): r''' A general function following maxwell. The function returns the values corresponding the given parameters and energy. Both parameters should be the np.array or np.ma.array. Returned is the np.array or np.ma.array. The formulation in J(E) is: .. math:: J(E) = p_0^2 E \exp(-\frac{E}{p_1^2}) The squares limit the parameters positive. If you compare with the data measured in MKSA, .. math:: p_0^2 = \frac{2E}{m^2} n (\frac{m}{2 \pi k T})^{3/2} \\ p_1^2 = \frac{2kT}{m} If converting to eV and cm unit, which may be used more widely in the plasma physics, .. math:: p_0^2 = \frac{q^2}{m^2} 200 n (\frac{m}{2 \pi q T})^{3/2} \\ p_1^2 = T or, .. math:: T \mathrm{[eV]} = p_1^2 \\ n \mathrm{[cm}^{-3}\mathrm{]} = \frac{m^2}{200 q^2} (2 \pi q p_1^2)^{3/2} \times p_0^2 ''' return (parameters[0] **2) * energy_in_eV * np.exp(-energy_in_eV / (parameters[1] ** 2))
[docs]def maxwell_parameter_to_physical_values(p0, p1): ''' From the best fit paramter, you may get temperateure and density. See :meth:`flux_function_maxwell`. ''' T = p1 * p1 n = ((k.m_hydrogen / k.qe) ** 2) / 200. * ((2*np.pi*k.qe*(p1*p1))/(k.m_hydrogen))**(1.5) * (p0*p0) return (n, T)
[docs]class flux_cena_background: ''' To make function following the constant background. This is not physical, but sometimes embedded in the data. ``parameter`` should be np.array with (1, ) shape. The square of the parameter is the corresponding background. The function should return continuous function as energy_in_eV is arbitrary. However, in a practical use, energy_in_eV only receive the specific values which we get from energy.getEnergyE16() method. Thus, one count level using cena_flux.Count2Flux() is used, and the linear interpolation is used for the internal values. ''' def __init__(self, ch): self.c2j = irfpy.cena.cena_flux.Count2Flux() bg = [self.c2j.getFlux(1, ie, ch) for ie in range(16)] ene = irfpy.cena.energy.getEnergyE16() self.interpolfunc = scipy.interpolate.interp1d(ene, bg) def __call__(self, parameters, energy_in_eV): valarr = np.zeros_like(energy_in_eV) for index in range(len(energy_in_eV)): try: val = self.interpolfunc(energy_in_eV[index]) * (parameters[0]**2) valarr[index] = val except ValueError as e: valarr[index] = float('nan') return valarr
[docs]class flux_function_maxwell_bg: ''' A general function following maxwell, together with flat count rate. The function returns the values corresponding the given parameters and energy. Both parameters should be the np.array or np.ma.array. Returned is the np.array or np.ma.array. ''' def __init__(self, ch): self.flux_bg = flux_cena_background(ch) def __call__(self, parameters, energy_in_eV): return flux_function_maxwell((parameters[0], parameters[1]), energy_in_eV) + self.flux_bg((parameters[2], ), energy_in_eV)
[docs]def flux_thompson_sigmund_free(parameters, energy_in_eV): r''' Tompson sigmund formulae. Two parameters, which was defined indeed by the binding energy and the mass ratio between the projectile and the target physically. However, this function assumes they are parameters. Coefficient is also a parameter. .. math:: f(E) = p_0 \frac{E}{(E + p_1^2)^3} (1-\sqrt{\frac{E + p_1^2}{p2^2}}) This looks the distribution function, not the differential flux. .. math:: J(E) = \frac{v f(E)}{\sin\alpha} so, .. math:: J(E) = p_0 \frac{E^{3/2}}{(E + p_1^2)^3} (1-\sqrt{\frac{E + p_1^2}{p2^2}}) For practical reason, p_1 and p2 given are squared to keep the resulting paramters positive. ''' return (parameters[0]) * (energy_in_eV ** 1.5) / ((energy_in_eV + (parameters[1]**2)) ** 3) * (1-np.sqrt( (energy_in_eV + (parameters[1]**2)) / (parameters[2]**2)))
[docs]def flux_function_shifted_maxwell(parameters, energy): r''' A function following "shifted" maxwell. Three parameters. .. math:: p_0 \exp( - \frac{(\sqrt{E} - \sqrt{p2})^2}{p_1}) p0 includes information of temperature and density. p1 is the temperature itself (in the unit of eV). p2 is the bulk energy in eV. It can be converted to the velocity as vb = sqrt(2q/m * p[2]) ~ 13.6 km/s sqrt(E [eV]) in MKSA units for hydrogen. ''' sqrt2 = np.sqrt(energy) - np.sqrt(np.abs(parameters[2])) inexp = sqrt2 ** 2 / parameters[1] return parameters[0] * np.exp(-inexp)
[docs]def flux_bipower(parameters, energy): r''' Bipower law Four parameters. .. math:: J(E) = \mathrm{min}(p_0 E^{p_1}, p_2 E^{p_3}) ''' lower = parameters[0] * (energy ** parameters[1]) upper = parameters[2] * (energy ** parameters[3]) return np.ma.array([lower, upper]).min(0)
[docs]def flux_function_linear(parameters, energy_in_eV): ''' A general function following the linear function, as a test. ''' return parameters[0] + energy_in_eV * parameters[1]
class __WrapContinuousFunction: ''' A continuous function is wrapped, and make a function using energy index. It is used for wrapping the continuous function (regarding to the energy) to discrete the energy step. This means that the energy index for CENA is the key (input) of the differential flux function. This is only used to be called from the sp.optimize.leastsq method. ''' def __init__(self, continuous_function): ''' :param continuous_function: A function taking parameters and energy_in_eV arrays to return the corresponding differential flux. ''' self.func = continuous_function def __call__(self, parameters, energy_index): ''' Returns the flux corresponding to the parameter and energy index. :param parameters: Parameters for the ``continuous_function``. :type parameters: An array-like to give to the first argument of ``continuous_function``. :param energy_index: Energy index array, 0-15. :type energy_index: An array-like. This is equivalent to ``continuous_function(parameters, energy[energy_index])`` but callable form of ``sp.optimize.leastsq``. ''' if np.isscalar(energy_index): energy_index = [energy_index, ] # Energy index is converted to energy step eneev = np.array([irfpy.cena.energy.getEnergyE16()[eid] for eid in energy_index]) # Je at the CENA energy step. flx = self.func(parameters, eneev) return flx class __FluxFunc2CountFunc: ''' This wraps the flux function to count function. This is only used to be called from ``sp.optimize.leastsq``. ''' def __init__(self, fluxfunction, ch): self.flux_function = fluxfunction self.ch = ch def __call__(self, parameters, energy_index): ''' Fit function should take parameters and energy index as input. The function should take parameter and energy index. Energy index is an array that includes the integer 0 to 15. Returned is the count rate that has the same shape as ``energy_index``. Do not use masked array. ''' j2c = irfpy.cena.cena_flux.Flux2Count_Simple() if np.isscalar(energy_index): energy_index = [energy_index, ] # Energy index is converted to energy step eneev = np.array([irfpy.cena.energy.getEnergyE16()[eid] for eid in energy_index]) # # Maxwell Je. # flx = parameters[0] * eneev * np.exp(-eneev / parameters[1]) # Flux function. flx = self.flux_function(parameters, energy_index) # Je converted to C cnt = np.zeros(len(energy_index)) for idx, eid in enumerate(energy_index): c = j2c.getCount(flx[idx], eid, self.ch) cnt[idx] = c logger.debug('# %s %s' % (str(parameters), str(cnt))) return cnt
[docs]def run_fitting(cnts, ch, fitfunc, initial_parameter): ''' Main function to run the fitting. :param cnts: An array of counts, which is to be fitted. :type cnts: ``np.ma.MaskedArray`` :param ch: The channel observed. :type ch: ``int`` :param fitfunc: A parametric function to fit the data. :type fitfunc: ``function`` or ``callable``. :returns: (best_parameter, success_code, bestfit_function, loglikely) This fit the given counts by the given fitting function. ''' assert len(cnts) == 16 cnts = np.ma.masked_invalid(cnts) logger.debug(cnts) enestep = np.arange(16) cnts_valid = cnts.compressed() estep_valid = np.ma.masked_array(enestep, mask=cnts.mask).compressed() parametric_func = __WrapContinuousFunction(fitfunc) parametric_count = __FluxFunc2CountFunc(parametric_func, ch) errfunc = lambda p, x, y: parametric_count(p, x) - y p, s = sp.optimize.leastsq(errfunc, initial_parameter, args=(estep_valid, cnts_valid)) logger.debug('Best fit= %s (%d)' % (str(p), s)) # Output of the results. j2c = irfpy.cena.cena_flux.Flux2Count_Simple() c2j = irfpy.cena.cena_flux.Count2Flux() bestfitcnts = parametric_count(p, list(range(16))) logger.debug('Count(Obs) = %s' % str(cnts)) logger.debug('Count(Fit) = %s' % str(bestfitcnts)) obsflux = [c2j.getFlux(cnts[i], i, 3) for i in range(16)] bestfitflux = [c2j.getFlux(bestfitcnts[i], i, 3) for i in range(16)] logger.debug('Flux(Obs) = %s' % str(obsflux)) logger.debug('Flux(Fit) = %s' % str(bestfitflux)) residual = errfunc(p, estep_valid, cnts_valid) res2 = (residual ** 2).sum() n = len(estep_valid) loglikely = -n / 2.0 * (np.log(np.pi*2.0) + 1 + np.log(res2/n)) aic = -2 * loglikely + 2 * len(p) logger.debug('AIC= %f' % aic) bestfitfunction = lambda energy: fitfunc(p, energy) return p, s, bestfitfunction, loglikely
[docs]def main_sample(): ### As a sample. # Let's first take the sample data from the real data. ### Orbit = 1237 ### T0 = 2009-02-19 00:08:22.492000 ### T1 = 2009-02-19 00:28:06.516000 cnts = [float(v) for v in 'nan nan nan nan 0.041152 0.078189 0.164609 0.378601 0.460905 0.547325 0.242798 0.057613 nan nan nan nan'.split()] # Class for background (or constant count rate) bgfunc = flux_cena_background(3) # 3 is channel. retval = run_fitting(cnts, 3, bgfunc, (1,)) # Class for Maxwell+background. maxbgfunc = flux_function_maxwell_bg(3) # 3 is channel. retval = run_fitting(cnts, 3, maxbgfunc, (20, 10, 0.1)) print(retval) # Other fucntions are usual functions. retval = run_fitting(cnts, 3, flux_bipower, (1e4, 0., 1e9, -3)) retval = run_fitting(cnts, 3, flux_function_maxwell, (1, 100)) retval = run_fitting(cnts, 3, flux_function_shifted_maxwell, (1, 10, 100)) retval = run_fitting(cnts, 3, flux_thompson_sigmund_free, (1e8, 10, 20)) retval = run_fitting(cnts, 3, flux_function_linear, (1, 100))