Source code for irfpy.imacommon.imaextra2

''' Common functionality and data structures for IMA extra datasets.

IMA extra is a file that Andrei prepared.
This module provides classes of the netcdf formatted IMA extra datasets (gzipped).
This replaces :mod:`irfpy.imacommon.imaextra` module.

This module provides low-level functionality and data structures commonly
used for ASPERA-3 and -4 IMA extra. The users do not need to use this module directly.

*For developer*

This module contains several classes, but most of them do not need to be used by user.
The classes in this module are extended at :mod:`mima.imaextra` or :mod:`vima.imaextra` modules.

Nevertheless, it is worthwhile to create a list of class with a bit high-level explanation.

- :class:`ImaDdNcFileCommon` represents a single file.
- :class:`ImaExtraFileCommon` represents a single file of ``imaextra`` data, extending :class:`ImaDdNcFileCommon`
- :class:`ImaParamFileCommon` represents a single file of ``imaparam`` data, extending :class:`ImaDdNcFileCommon`
- :class:`ImaDdNcDatabase` represents a tree of dataset, both for ``imaextra`` and ``imaparam``
- :class:`ImaDdNcFileFactory` is a caching system, namely, from the data tree (:class:`ImaDdNcDatabase`),
  a single file object (:class:`ImaExtraFileCommon` or :class:`ImaParamFileCommon`) is produced
- :class:`ImaParamCommon` represents a time series of ``imaparam`` data.
- :class:`ImaExtraCommon` represents a time series of ``imaextra`` data.

*(END for developer)*
'''

import os
import sys
import logging
_logger = logging.getLogger(__name__)

import datetime

import gzip
import tempfile

import numpy as np

from netCDF4 import MFDataset

import irfpy.asperacommon.dd


[docs]class ImaDdNcFileCommon: ''' A common functionality of IMA DD/NC file. This will be extened to classes :class:`irfpy.mima.imaextra.ImaExtraFile`, :class:`irfpy.vima.imaextra.ImaExtraFile`, etc. It has a member variable ``dim`` and ``var``, which respectively have informationon dimensions and variables. ''' logger = logging.getLogger(__name__ + '.ImaDdNcFileCommon') def __init__(self, filename, gunzip=True): ''' Open the file and read the data. ''' self.filename = filename self.gunzip = gunzip if gunzip: with gzip.open(filename) as unc: s = unc.read() else: with open(filename) as unc: s = unc.read() with tempfile.NamedTemporaryFile(delete=False) as ncf: ncfname = ncf.name ncf.write(s) (self.dim, self.var) = self.__read_ncdf(ncfname) #self.logger.debug(self.var['StartTime']) self.logger.debug('Removing temporary file %s' % ncfname) try: os.remove(ncfname) except: self.logger.warning('Removing file {} failed. Not critical. Continue.'.format(ncfname)) def __read_ncdf(self, fn): self.logger.info(fn) with MFDataset(fn) as f: dimkey = list(f.dimensions.keys()) varkey = list(f.variables.keys()) dim = {} var = {} for d in dimkey: self.logger.debug('dimemsions["%s"]' % d) dim[d] = f.dimensions[d] for v in varkey: self.logger.debug('variables["%s"]' % v) var[v] = f.variables[v][:] return (dim, var) def __get_ddtimes(self): ''' Return the DD time as an array of string. ''' ddtime = self.var['Time'] # List of np.array (dtype=byte) #self.logger.debug('Time Original:') #self.logger.debug(ddtime) ddtime_array = [(b''.join(dd)).decode('latin=1') for dd in ddtime] #self.logger.debug('Time Converted:') #self.logger.debug(ddtime_array) return ddtime_array
[docs] def getobstime(self): ''' Return the array of the observation time as ``datetime.datetime`` instance. ''' ts = self.__get_ddtimes() td = [irfpy.asperacommon.dd.ddtime2datetime(t) for t in ts] return td
[docs] def t0(self): ''' Return the start time. ''' return irfpy.asperacommon.dd.ncdfdd2datetime(self.var['StartTime'])
[docs] def t1(self): ''' Return the stop time. ''' return irfpy.asperacommon.dd.ncdfdd2datetime(self.var['StopTime'])
[docs]class ImaExtraFileCommon(ImaDdNcFileCommon): """ IMAEXTRA file """ def __init__(self, *args, **kwds): ImaDdNcFileCommon.__init__(self, *args, **kwds)
[docs]class ImaDdTimeSeries: ''' Represents a time series of DD data. ''' def __init__(self, data_key_list): self.data_keys = data_key_list self.initialize()
[docs] def initialize(self): self.ndat = 0 self.obstime = [] self.data = {} for k in self.data_keys: self.data[k] = None
[docs] def tabulate(self): raise NotImplementedError()
[docs] def append(self, other): raise NotImplementedError()
[docs] def trim(self, t0, t1): raise NotImplementedError()
[docs]class ImaParamCommon(ImaDdTimeSeries): ''' IMA parameter data class. A common algorithm tha handles IMA parameter data. It is used for both MEX and VEX. ''' def __init__(self, key_list): ImaDdTimeSeries.__init__(self, key_list)
[docs] def tabulate(self, fp=sys.stdout): tab = '\t' header = 'serial\ttime\t' header = header + '\t'.join(self.data_keys) print('#' + header, file=fp) for i in range(self.ndat): print(i, tab, end=' ', file=fp) print(self.obstime[i].strftime('%FT%T'), tab, end=' ', file=fp) for k in self.data_keys: if self.data[k][i].shape == (3,): for j in range(3): print(self.data[k][i][j], tab, end=' ', file=fp) else: print(self.data[k][i], tab, end=' ', file=fp) print(file=fp)
[docs] def append(self, other): # First, concatenate the ImaParam self.ndat = self.ndat + other.ndat if self.ndat == 0: return self self.obstime = self.obstime + other.obstime for k in self.data_keys: if self.data[k] is None: self.data[k] = other.data[k] elif other.data[k] is None: pass # Do nothing on self.data[k] else: self.data[k] = np.concatenate((self.data[k], other.data[k])) # Second, check the duplication. # Index for unique data according to the obstime. self.obstime, unique_index = np.unique(self.obstime, return_index=True) self.ndat = len(self.obstime) for k in self.data_keys: # Then, unique according to the obstime self.data[k] = self.data[k][unique_index] # Thrid, sort by time. The sorted index is stored. sorted_index = np.argsort(self.obstime) self.obstime.sort() for k in self.data_keys: # Then, unique according to the obstime self.data[k] = self.data[k][sorted_index] self.obstime = self.obstime.tolist()
[docs] def trim(self, t0, t1): ''' You may trim the dataset to the specified time interval (inclusive). ''' if self.ndat == 0: return self obstime = np.array(self.obstime) indexvalid = np.where(np.logical_and(obstime >= t0, obstime <= t1))[0] self.obstime = obstime[indexvalid].tolist() self.ndat = len(self.obstime) for k in list(self.data.keys()): self.data[k] = self.data[k][indexvalid]
[docs]class ImaExtraCommon(ImaDdTimeSeries): ''' IMA extra data class. A common algorithm tha handles IMA extra data. It is used for both MEX and VEX. ''' def __init__(self, key_list): ImaDdTimeSeries.__init__(self, key_list)
[docs] def getHpSpec(self): """ Return the proton spectra. Counts. Order is AEPT (irfpy standard) :return: Proton spectra, in the shape of (A16, E96, P16, Time). Counts. """ spec = self.data['HpSpec'] # It is (T, P, E, A) order. spec = np.transpose(spec, (3, 2, 1, 0)) return spec
[docs] def getHeavySpec(self): """ Return the heavy ion spectra. Counts. Order is AEPT (irfpy standard) :return: Heavy ion spectra, in the shape of (A16, E96, P16, Time). Counts. """ spec = self.data['HeavySpec'] # It is (T, P, E, A) order. spec = np.transpose(spec, (3, 2, 1, 0)) return spec
[docs] def getRestRm(self): """ Return the rest matrix. Counts. Order is MAEPT (irfpy standard) :return: Rest matrix, in the shape of (M32, A16, E96, P16, Time). Counts. """ spec = self.data['RestRm'] # It is (T, P, E, A, M) order. spec = np.transpose(spec, (4, 3, 2, 1, 0)) return spec
[docs] def getobstime(self): """ Return the tuple of time. :return: Tuple of time. """ return self.obstime
[docs] def tabulate(self, fp=sys.stdout): print('ImaExtra dataset. Not implemented the tabulation.', file=fp)
[docs] def append(self, other): # First, concatenate the ImaParam self.ndat = self.ndat + other.ndat if self.ndat == 0: return self self.obstime = self.obstime + other.obstime # it is 1d for k in self.data_keys: if self.data[k] is None: self.data[k] = other.data[k] elif other.data[k] is None: pass # Do nothing on self.data[k] else: self.data[k] = np.concatenate((self.data[k], other.data[k]), axis=0) # Second, check the duplication. # Index for unique data according to the obstime. self.obstime, unique_index = np.unique(self.obstime, return_index=True) self.ndat = len(self.obstime) for k in self.data_keys: # Then, unique according to the obstime self.data[k] = self.data[k][unique_index, ...] # Thrid, sort by time. The sorted index is stored. sorted_index = np.argsort(self.obstime) self.obstime.sort() for k in self.data_keys: # Then, unique according to the obstime self.data[k] = self.data[k][sorted_index, ...] self.obstime = self.obstime.tolist()
[docs] def trim(self, t0, t1): if self.ndat == 0: return self obstime = np.array(self.obstime) indexvalid = np.where(np.logical_and(obstime >= t0, obstime <= t1))[0] self.obstime = obstime[indexvalid].tolist() self.ndat = len(self.obstime) for k in list(self.data.keys()): self.data[k] = self.data[k][indexvalid, ...]
[docs] def trim_none(self): if None not in self.obstime: return _logger.warning("Data contained corrupted time. Disregarding None time data.") idx = np.where(np.array(self.obstime) != None)[0] self.obstime = list(np.array(self.obstime)[idx]) for k in self.data_keys: self.data[k] = self.data[k][idx, ...] _logger.warning("Data number changes from {} to {}.".format(self.ndat, len(self.obstime))) _logger.warning("This error is seen between {:%FT%T} and {:%FT%T}".format( min(self.obstime), max(self.obstime))) self.ndat = len(self.obstime)
[docs]class ImaParamFileCommon(ImaDdNcFileCommon): def __init__(self, *args, **kwds): ImaDdNcFileCommon.__init__(self, *args, **kwds)
[docs]class ImaDdNcFileFactory: ''' A factory class for IMA extra dataset. Instead of creating :class:`ImaExtraFile` or :class:`ImaParamFile` instance directly, the class returns the instance with using a ``RingCache``. .. code-block:: py iexfact = ImaDdNcFileFactory.createFactory() iexfact.getImaExtraFile('imaextra20123640214.nc.gz') ''' def __init__(self, template_class, cachesize=5): self.cache = irfpy.util.ringcache.RingCache(cachesize) self.template_class = template_class
[docs] def getDdNcFile(self, filename, gunzip=True): #logger = logging.getLogger(__name__ + '.ImaDdNcFileFactory.getDdNcFile') if self.cache.hasKey(filename): #logger.debug('Filename in cache: {}'.format(filename)) return self.cache.get(filename) else: #logger.debug('Filename to be loaded: {}'.format(filename)) #logger.debug('Template class is {}'.format(self.template_class)) iex = self.template_class(filename, gunzip=gunzip) self.cache.add(filename, iex) return iex
[docs]class ImaDdNcDatabase: ''' IMA DD/NC database The IMA DD/NC data base stores the filename and the start time guessed from the file name. By using this database, one can easily get the file name with a key of the observation time. ''' logger = logging.getLogger(__name__ + '.ImaDdNcDatabase') def __init__(self, database, prefix_name, ddncfactory, verbose=False): ''' Create the database. Use :meth:`createDatabase` class method for creating database. :param database: Database directory path. If ``None``, the [vima]-imaextrabase entry from irfpyrc is read. :param prefix_name: The filename should be ``imaXXXXX20103091457.nc.gz``. The prefix, ``imaXXXXX`` should be specified to identify the start of the time expression. :param ddncfactory: Factory object that create the object of the file contents. It should be an object of the class that extends :class:`ImaDdNcFileFactory`. For example, :class:`ImaExtraFileFactory` can be given. :keyword verbose: The keyword is outdated. Do nothing. Use logging to display a log with the name of ``irfpy.imacommon.imaextra2.ImaDdNcDatabase``. ''' self.logger.debug('database=%s' % database) self.ddncfactory = ddncfactory s = len(prefix_name) self.filelist = irfpy.util.timeseriesdb.DB() for pp, dd, ff in os.walk(database): for f in ff: if f.startswith(prefix_name) and f.endswith('.nc.gz'): yr = int(f[s:s + 4]) dddoi = int(f[s + 4:s + 7]) hr = int(f[s + 7:s + 9]) mi = int(f[s + 9:s + 11]) tt = datetime.datetime(yr, 1, 1, hr, mi) + datetime.timedelta(days=dddoi) self.filelist.append(os.path.join(pp, f), tt) # self.logger.debug('Append -> %s %s ' % (os.path.join(pp, f), tt)) self.exact_time_cache = {} def __len__(self): return len(self.filelist)
[docs] def nextof(self, filename): ''' Return the next file ''' return self.filelist.nextof(filename)
[docs] def previousof(self, filename): ''' Return the previous file ''' return self.filelist.previousof(filename)
[docs] def get_filename_from_db(self, t): ''' Return the filename from database. ''' return self.filelist.get(t)
[docs] def get_timerange(self, filename): ''' Return the time range (exact) from filename ''' if filename in self.exact_time_cache: t0, t1 = self.exact_time_cache[filename] else: iex = self.ddncfactory.getDdNcFile(filename) t0 = iex.t0() t1 = iex.t1() # Should be inclusive! #self.logger.debug('%s: %s -- %s' % (filename, t0, t1)) self.exact_time_cache[filename] = [t0, t1] return t0, t1
[docs] def get_filename(self, t, nodata='exception'): ''' Return the filename from data file. Users are recommended to use this method. :param t: Time. :param nodata: Set behaviour if the specified time falls into the data gap. 'before' returns the file name that has the previous data to ``t``. 'after' return the file name that has the next data of ``t``. Otherwise, raise an :class:`irfpy.util.timeseriesdb.DataNotInDbError`. ''' # First, get the preliminary file name. try: filename = self.get_filename_from_db(t) except irfpy.util.timeseriesdb.DataNotInDbError as e: if nodata == 'after': return self.filelist.get(self.filelist.t0()) t0, t1 = self.get_timerange(filename) self.logger.debug('Examine: %s' % t) if t >= t0 and t <= t1: self.logger.debug('# %s # --- %s -o- %s ---' % (filename, t0, t1)) return filename elif t < t0: # If the given time is earlier than the expected file's start self.logger.debug('# %s # -o- %s --- %s ---' % (filename, t0, t1)) while True: try: filename = self.filelist.previousof(filename) # If the data is before the earliest, here exception raised. except irfpy.util.timeseriesdb.DataNotInDbError as e: if nodata == 'after': return filename else: raise e t0, t1 = self.get_timerange(filename) if t >= t0 and t <= t1: self.logger.debug('# %s # --- %s -o- %s ---' % (filename, t0, t1)) return filename elif t > t1: self.logger.debug('# %s # --- %s --- %s -o-' % (filename, t0, t1)) if nodata == 'before': return filename elif nodata == 'after': return self.filelist.nextof(filename) else: raise irfpy.util.timeseriesdb.DataNotInDbError('Data@%s not in the DB' % t) else: self.logger.debug('# %s # -o- %s --- %s ---' % (filename, t0, t1)) elif t > t1: # If the given time is later than the expected file's end self.logger.debug('# %s # --- %s --- %s -o-' % (filename, t0, t1)) while True: try: filename = self.filelist.nextof(filename) # If the data is after the last file, here exception is raised except irfpy.util.timeseriesdb.DataNotInDbError as e: if nodata == 'before': return filename else: raise e t0, t1 = self.get_timerange(filename) if t >= t0 and t <= t1: self.logger.debug('# %s # --- %s -o- %s ---' % (filename, t0, t1)) return filename elif t < t0: self.logger.debug('# %s # -o- %s --- %s ---' % (filename, t0, t1)) if nodata == 'before': return self.filelist.previousof(filename) elif nodata == 'after': return filename else: raise irfpy.util.timeseriesdb.DataNotInDbError('Data@%s not in the DB' % t) else: self.logger.debug('# %s # --- %s --- %s -o-' % (filename, t0, t1))
[docs] def get_filenames(self, t0, t1): ''' Return the list of filename ''' t0file = self.get_filename(t0, nodata='after') iex0 = self.ddncfactory.getDdNcFile(t0file) t0iex0 = iex0.t0() t1iex0 = iex0.t1() if t1 < t0iex0: # In this case, the given period is inside data gap. return [] t1file = self.get_filename(t1, nodata='before') self.logger.info('t0file=%s' % t0file) self.logger.info('t1file=%s' % t1file) files = [t0file] while files[-1] != t1file: files.append(self.nextof(files[-1])) return files