cena_energy_spectra
ΒΆ
A script to plot CENA energy spectra
USAGE: python cena_energy_spectra.py [opts] start_time stop_time
#!/usr/bin/env python
''' A script to plot CENA energy spectra
USAGE: python cena_energy_spectra.py [opts] start_time stop_time
'''
import sys
from pylab import *
import numpy as np
import numpy.ma as ma
from optparse import OptionParser
import datetime
import dateutil.parser
import logging
logging.basicConfig()
try:
import irfpy.cena.cena_mass2 as cenam
except IOError as e:
logging.error('Failed to load CENA module. ' +
'Most probable reason of this failure is ' +
'"CENA database or orbit number file not found." ' +
'Check the ~/.irfpyrc settings and the corresponding database location.')
sys.exit(-3)
from irfpy.cena import energy
from irfpy.util import utc
from irfpy.util import bipower
import irfpy.cena.cena_flux
clr = ['b', 'g', 'r', 'c', 'm', 'y', 'k']
def mkdata(t0, t1, *args, **kwds):
''' Make the data of the energy spectra.
Returned is ((energy_eV, errorbar), (spectra, errorbar))
:param t0: Start of the time of accumulation
:type t0: ``datetime.datetime`` is recommended.
:param t1: End of the time of accumulation
:type t1: ``datetime.datetime`` is recommended.
:keyword type: 'count' or 'flux'. 'count' is default.
:type type: ``string``
:return: A tuple of the data.
((energy_eV, errorbar_eV), (spectra, errorbar_spectra) x 7).
All the data is ``numpy.ma.masked_array``.
If "type" is "count", errorbar is filled by None.
Error bar consists of 2 tuples, i.e. lower and upper.
'''
type = kwds.get('type', 'count')
type = type.lower()
# First read the data.
logging.info('Loading time of observation')
tlist = cenam.getobstime(timerange=[t0, t1])
logging.info('.. done: Length=%d' % (len(tlist)))
# Energy talb.
etbl = energy.getEnergyE16()
# Energy error
eerr = energy.getFwhmRangeE16()
eerr[0, :] = etbl - eerr[0, :]
eerr[1, :] = - etbl + eerr[1, :]
# Prepared for return values.
retval = []
retval.append((ma.masked_array(etbl), ma.masked_array(eerr)))
if type == 'count':
logging.info('Reading data')
datalist = []
# average = np.ma.zeros([16, 7])
for t in tlist:
try:
d = cenam.getdataE16(t) # 16x7x128 JdObject
except RuntimeError as e:
logging.warn('Skipped non-supported data %s' % utc.convert(t, datetime.datetime))
continue
datalist.append(d.getData())
# average += d.getData()[:, :, 0:64].sum(2) # Integrate over mass
logging.info('.. done: Length=%d' % len(datalist))
if len(datalist) == 0:
# Retval
for i in range(7):
sp = zeros([16])
retval.append((sp, None))
else:
datalist = np.ma.array(datalist) # T x 16x7x128 array
datalist = datalist[:, :, :, 0:64].sum(3) # Sum over the mass, 16x7
datalist = datalist.mean(0)
for i in range(7):
retval.append((datalist[:, i], None))
elif type == 'flux':
cnt2flx = irfpy.cena.cena_flux.Count2Flux()
datalist = []
for t in tlist:
try:
d = cenam.getdataE16(t).getData() # 16x7x128
except RuntimeError as e:
logging.warn('Skipped non-supported data %s' % utc.convert(t, datetime.datetime))
continue
d = d[:, :, 0:64].sum(2) # Sum over the mass for H, then 16x7
# Set to a very strange negative value.
flx = np.ma.ones((16, 7)) * (-1e+50)
for ie in range(16):
for id in range(7):
if not np.ma.is_masked(d[ie, id]):
flx[ie, id] = cnt2flx.getFlux(d[ie, id], ie, id)
# Now flx is the flux
# mask it
flx = np.ma.masked_less(flx, 0)
datalist.append(flx)
logging.info('.. done: Length=%d' % len(datalist))
# Masked array in 2-D. Nx16x7 where N is the data number.
ndat = len(datalist)
datalist = np.ma.array(datalist)
# Take average.
datalist = datalist.mean(0)
if ndat == 0:
# In case no data available for the time period.
for i in range(7):
sp = zeros([16])
retval.append((sp, None))
else:
for i in range(7):
spectra = datalist[:, i]
# Errorbar calculation by hard coded way.
# Errorbar is three times.
# Errorbar in lower bounds 66.7%, and upper bounds 200%
yerr0 = 2. / 3. * spectra
yerr1 = 2. * spectra
yerr = [yerr0, yerr1]
retval.append((spectra, yerr))
elif type == 'vdf':
raise NotImplementedError('type vdf is not yet implemented')
else:
raise KeyError("The keyword should be either 'count', 'flux' or 'vdf'")
retval.append(tlist)
return retval
def simple_fitting(etbl, spectra):
''' Energy table to spectra fitting by bi-power law.
:param etbl: Energy table.
:type etbl: ``numpy.ma.MaskedArray``
:param spectra: Energy spectra
:type spectra: ``numpy.ma.MaskedArray``
>>> etbl = ma.masked_array([1, 2, 3])
'''
# If spectra value is 0, it is disregarded.
spec0 = ma.masked_less_equal(spectra, 0)
etbl0 = ma.masked_array(etbl, spec0.mask).compressed()
spec0 = spec0.compressed()
print(spec0)
if len(etbl0) != len(spec0):
raise ValueError('!!!! Unmatched length of valid data. (%d vs %d)'
% (len(etbl0), len(spec0)))
if len(etbl0) < 4:
# At least 4 data is needed for the fitting.
raise ValueError('!!!! No valid data. (%d [>=4])' % len(etbl0))
(k0, r0, k1, r1), success = bipower.fitting(etbl0, spec0)
return (k0, r0, k1, r1), success
def main(t0, t1, *args, **kwds):
''' A script to plot CENA energy spectra.
Returned is [(energy_eV, errorbar), (spectra, errorbar)x7, tlist]
:param t0: Start of the time of accumulation
:type t0: ``datetime.datetime`` is recommended.
:param t1: End of the time of accumulation
:type t1: ``datetime.datetime`` is recommended.
:keyword type: 'count' or 'flux'. 'count' is default.
:type type: ``string``
:keyword errbar: Enable error bar. False is default.
:type errbar: ``boolean``
:keyword fitting: Enable fitting by bi-power law. False is default.
Only enabled when type is 'flux'.
:type fitting: ``boolean``
:keyword output: Output file name. PNG is recommended.
Default is ``None``.
If ``None`` is given, no output graphic is produced (only X output).
:return: A tuple of the data. [(energy_eV, errorbar_eV), (spectra, errorbar_spectra) x 7, tlist].
If "type" is "count", errorbar is filled by None.
Error bar consists of 2 tuples, i.e. lower and upper.
'''
figure(figsize=(8, 10))
type = kwds.get('type', 'count')
type = type.lower()
errbar = kwds.get('errbar', False)
fitting = kwds.get('fitting', False)
output = kwds.get('output', None)
logging.debug('Type of the plot: %s' % type)
# Make data.
dat = mkdata(t0, t1, type=type)
# Fitting
fitprms = None
if fitting and type == "flux":
logging.info('.. fitting started')
fitprms = []
for ch in range(7):
# Fitting. prm is ((k0, r0, k1, r1), success)
try:
prm = simple_fitting(dat[0][0], dat[ch+1][0])
fitprms.append(prm)
except ValueError as e:
logging.error('No good data. Skipped.')
fitprms.append(None)
continue
# Instance the bipower function.
fnc = bipower.mkfunc(prm[0][0], prm[0][1], prm[0][2], prm[0][3])
eneax = arange(10., 3000)
valax = [fnc(e) for e in eneax]
plot(eneax, valax, clr[ch]+'o-')
# dat is 8 element array.
etbl, eerr = dat[0]
# d0, d0e = dat[1]
# d1, d1e = dat[2]
# d2, d2e = dat[3]
# d3, d3e = dat[4]
# d4, d4e = dat[5]
# d5, d5e = dat[6]
# d6, d6e = dat[7]
tlist = dat[8]
ndat = len(tlist)
if type == 'count':
logging.info('.. done: Length=%d' % ndat)
for i in range(7):
errorbar(etbl, dat[i+1][0], fmt=clr[i], xerr=eerr, label='CH-%1d' % i)
ylabel('Count (H) (#/accum)')
elif type == 'flux':
# One count level
cnt2flx = irfpy.cena.cena_flux.Count2Flux()
ocn = np.array([[cnt2flx.getFlux(1, ie, id) for id in range(7)]
for ie in range(16)])
for i in range(7):
if errbar:
xerr = eerr
yerr = dat[i+1][1]
else:
xerr = None
yerr = None
errorbar(etbl, dat[i+1][0], fmt=clr[i] + 'o-', xerr=xerr, yerr=yerr,
label='CH-%1d' % i)
plot(etbl, ocn[:, 4], 'k--', label='1 cnt/4s (CH4)')
if ndat != 0:
plot(etbl, ocn[:, 4] / ndat, 'k--', label='1 cnt/%ds (CH4)'
% (4 * ndat))
ylabel('Flux (H) (#/cm2 sr s eV)')
yscale('log')
ylim(1e-1, 1e+7)
elif type == 'vdf':
raise NotImplementedError('type vdf is not yet implemented')
else:
raise KeyError("The keyword should be either 'count', 'flux' or 'vdf'")
xlabel('Energy')
title('CENA %s-%s' % (t0, t1))
xlim(10, 4000)
xscale('log')
legend(loc='best')
# legend()
if output == None:
show()
else:
plt.savefig(output)
return dat, fitprms
def mainexec():
usage = "USAGE: %prog [options] start_time stop_time"
parser = OptionParser(usage=usage)
parser.add_option('-v', action='store_true', dest='verbose',
default=False)
parser.add_option('-t', '--type', action='store', dest='type',
default='count',
help='Type of the plot, one of count|flux|vdf. Deafult count.')
parser.add_option('-e', '--errorbar', action='store_true', dest='errbar',
default=False, help='Plot errorbar as well.')
parser.add_option('-o', '--output', action='store', dest='filename',
default=None, help='Plot errorbar as well.')
parser.add_option('-f', '--fitting', action='store_true', dest='fitting',
default=False, help='Fit by bi-power law.')
options, args = parser.parse_args()
if len(args) != 2:
logging.error('!!!! Illegal number of argument. t0 and t1 should be specified.')
parser.print_help()
sys.exit(-5)
t0 = dateutil.parser.parse(args[0])
t1 = dateutil.parser.parse(args[1])
if options.verbose:
logging.getLogger().setLevel(logging.DEBUG)
logging.debug('Start = %s' % t0)
logging.debug('End = %s' % t1)
cena_spectra = main(t0, t1, type=options.type, errbar=options.errbar,
output=options.filename, fitting=options.fitting)
if __name__ == '__main__':
mainexec()