Source code for irfpy.npicommon.background
''' MEX/VEX NPI background data as proxy of SEP
MEX/VEX NPI bacground data is made routinely at
* http://rhea.umea.irf.se/~peje/mex/hk/bkg_npi/
* http://rhea.umea.irf.se/~peje/vex/hk/bkg_npi/
'''
import os
import os as _os
import glob as _glob
import datetime
import dateutil.parser
import logging
import re
import numpy as np
from irfpy.util.timeseries import ScalarSeries
import irfpy.util.timeseriesdb
from irfpy.util import datacenter as _dc
_logger = logging.getLogger(__name__)
[docs]class DataCenterBackground(_dc.BaseDataCenter):
""" Data center for MEX/VEX NPI background
"""
def __init__(self, pathname, name="NPI bakground (generic)"):
self.pathname = pathname
_dc.BaseDataCenter.__init__(self, name=name)
[docs] def search_files(self):
fn = _glob.glob(_os.path.join(self.pathname, 'E_MU??????_npi.dat'))
_logger.debug("Loaded: " + str(fn))
if len(fn) == 0:
_logger.warning(
"Data base contains no data. Check if the path ({}) contains valid data.".format(self.pathname))
return fn
[docs] def approximate_starttime(self, filename):
fname = _os.path.basename(filename)
_logger.debug('filename {}'.format(filename))
_logger.debug('basename {}'.format(fname))
yr = int(fname[4:8])
mo = int(fname[8:10])
exp_start = datetime.datetime(yr, mo, 1)
_logger.debug('Start ~{}'.format(exp_start))
return exp_start
[docs] def read_file(self, filename):
dat = np.genfromtxt(filename, converters={0: lambda s: dateutil.parser.parse(s)})
tlist = [a[0] for a in dat]
dataset = [(a[1], a[2]) for a in dat]
return tlist, dataset
[docs]def loadfile(filename):
''' From ``filename`` load the data and re-format it.
'''
logger = logging.getLogger('background.loadfile')
# logger.setLevel(logging.DEBUG)
# vals = np.genfromtxt(filename, converters={
# 0: (lambda s: dateutil.parser.parse(str(s))),
# 1: (lambda s: dateutil.parser.parse(str(s))),})
with open(filename) as fp:
valstr = [s.split() for s in fp.readlines()]
vals = [(dateutil.parser.parse(s[0]),
float(s[1]),
float(s[2])) for s in valstr]
# vals is array of data record.
# data record consists of 'day' 'hour' 'bg' 'tot'
logger.debug('LEN=%d' % len(vals))
t = []
bg = []
tot = []
for val in vals:
# val, a data record consists of 'time' 'bg' 'tot'
t.append(val[0])
bg.append(val[1])
tot.append(val[2])
bg = ScalarSeries(t, bg)
tot = ScalarSeries(t, tot)
return bg, tot
[docs]class Database:
''' A data base.
Assumes the files are downloaded locally.
'''
def __init__(self, datapath):
'''
:param datapath: Path to the data (locally downloaded)
'''
logger = logging.getLogger(self.__class__.__name__)
# logger.setLevel(logging.DEBUG)
fnames = os.listdir(datapath)
self.localdb = irfpy.util.timeseriesdb.DB()
# File name should be "E_IMyyyymm.dat" where yyyy is year
# and mm is month.
pattern = re.compile(r'E_MU(\d\d\d\d)(\d\d)_npi\.dat$')
for fname in fnames:
search = pattern.search(fname)
if search is not None:
separation = search.groups()
y = int(separation[0])
m = int(separation[1])
self.localdb.append(os.path.join(datapath, fname),
datetime.datetime(y, m, 1))
self.data_series = None
''' Background level data.
This is a loaded and concatenated background level data,
a :class:`irfpy.util.timeseries.ScalarSeries` instance.
'''
self._fn0 = None
self._fn1 = None
[docs] def get_background_level(self, t0, t1):
''' Return the background level.
'''
self.load_background_files(t0, t1)
return self.data_series.get_data(t0, t1)
[docs] def load_background_files(self, t0, t1):
logger = logging.getLogger(self.__class__.__name__)
if self.data_series is None:
fnamn = self.localdb.get(t0)
logger.info('%s -> %s' % (t0, fnamn))
self.data_series = loadfile(fnamn)[0]
self._fn0 = fnamn
self._fn1 = fnamn
while t0 < self.data_series.t0():
self._fn0 = self.localdb.previousof(self._fn0)
logger.info('%s -> %s' % (t0, self._fn0))
ds = loadfile(self._fn0)[0]
self.data_series = self.data_series.concatenate(ds)
while t1 > self.data_series.t1():
self._fn1 = self.localdb.nextof(self._fn1)
logger.info('%s -> %s' % (t1, self._fn1))
ds = loadfile(self._fn1)[0]
self.data_series = self.data_series.concatenate(ds)
def __len__(self):
return len(self.data_series)
import unittest
import doctest
[docs]def doctests():
return unittest.TestSuite((
doctest.DocTestSuite(),
))
if __name__ == '__main__':
unittest.main(defaultTest='doctests')