Source code for irfpy.npicommon.background

''' MEX/VEX NPI background data as proxy of SEP

MEX/VEX NPI bacground data is made routinely at


import os
import os as _os
import glob as _glob
import datetime
import dateutil.parser
import logging

import re

import numpy as np

from irfpy.util.timeseries import ScalarSeries
import irfpy.util.timeseriesdb

from irfpy.util import datacenter as _dc

_logger = logging.getLogger(__name__)

[docs]class DataCenterBackground(_dc.BaseDataCenter): """ Data center for MEX/VEX NPI background """ def __init__(self, pathname, name="NPI bakground (generic)"): self.pathname = pathname _dc.BaseDataCenter.__init__(self, name=name)
[docs] def search_files(self): fn = _glob.glob(_os.path.join(self.pathname, 'E_MU??????_npi.dat')) _logger.debug("Loaded: " + str(fn)) if len(fn) == 0: _logger.warning( "Data base contains no data. Check if the path ({}) contains valid data.".format(self.pathname)) return fn
[docs] def approximate_starttime(self, filename): fname = _os.path.basename(filename) _logger.debug('filename {}'.format(filename)) _logger.debug('basename {}'.format(fname)) yr = int(fname[4:8]) mo = int(fname[8:10]) exp_start = datetime.datetime(yr, mo, 1) _logger.debug('Start ~{}'.format(exp_start)) return exp_start
[docs] def read_file(self, filename): dat = np.genfromtxt(filename, converters={0: lambda s: dateutil.parser.parse(s)}) tlist = [a[0] for a in dat] dataset = [(a[1], a[2]) for a in dat] return tlist, dataset
[docs]def loadfile(filename): ''' From ``filename`` load the data and re-format it. ''' logger = logging.getLogger('background.loadfile') # logger.setLevel(logging.DEBUG) # vals = np.genfromtxt(filename, converters={ # 0: (lambda s: dateutil.parser.parse(str(s))), # 1: (lambda s: dateutil.parser.parse(str(s))),}) with open(filename) as fp: valstr = [s.split() for s in fp.readlines()] vals = [(dateutil.parser.parse(s[0]), float(s[1]), float(s[2])) for s in valstr] # vals is array of data record. # data record consists of 'day' 'hour' 'bg' 'tot' logger.debug('LEN=%d' % len(vals)) t = [] bg = [] tot = [] for val in vals: # val, a data record consists of 'time' 'bg' 'tot' t.append(val[0]) bg.append(val[1]) tot.append(val[2]) bg = ScalarSeries(t, bg) tot = ScalarSeries(t, tot) return bg, tot
[docs]class Database: ''' A data base. Assumes the files are downloaded locally. ''' def __init__(self, datapath): ''' :param datapath: Path to the data (locally downloaded) ''' logger = logging.getLogger(self.__class__.__name__) # logger.setLevel(logging.DEBUG) fnames = os.listdir(datapath) self.localdb = irfpy.util.timeseriesdb.DB() # File name should be "E_IMyyyymm.dat" where yyyy is year # and mm is month. pattern = re.compile(r'E_MU(\d\d\d\d)(\d\d)_npi\.dat$') for fname in fnames: search = if search is not None: separation = search.groups() y = int(separation[0]) m = int(separation[1]) self.localdb.append(os.path.join(datapath, fname), datetime.datetime(y, m, 1)) self.data_series = None ''' Background level data. This is a loaded and concatenated background level data, a :class:`irfpy.util.timeseries.ScalarSeries` instance. ''' self._fn0 = None self._fn1 = None
[docs] def get_background_level(self, t0, t1): ''' Return the background level. ''' self.load_background_files(t0, t1) return self.data_series.get_data(t0, t1)
[docs] def load_background_files(self, t0, t1): logger = logging.getLogger(self.__class__.__name__) if self.data_series is None: fnamn = self.localdb.get(t0)'%s -> %s' % (t0, fnamn)) self.data_series = loadfile(fnamn)[0] self._fn0 = fnamn self._fn1 = fnamn while t0 < self.data_series.t0(): self._fn0 = self.localdb.previousof(self._fn0)'%s -> %s' % (t0, self._fn0)) ds = loadfile(self._fn0)[0] self.data_series = self.data_series.concatenate(ds) while t1 > self.data_series.t1(): self._fn1 = self.localdb.nextof(self._fn1)'%s -> %s' % (t1, self._fn1)) ds = loadfile(self._fn1)[0] self.data_series = self.data_series.concatenate(ds)
def __len__(self): return len(self.data_series)
import unittest import doctest
[docs]def doctests(): return unittest.TestSuite(( doctest.DocTestSuite(), ))
if __name__ == '__main__': unittest.main(defaultTest='doctests')