''' Common functionality and data structures for IMA extra datasets.
IMA extra is a file that Andrei prepared.
This module provides classes of the netcdf formatted IMA extra datasets (gzipped).
This replaces :mod:`irfpy.imacommon.imaextra` module.
This module provides low-level functionality and data structures commonly
used for ASPERA-3 and -4 IMA extra. The users do not need to use this module directly.
*For developer*
This module contains several classes, but most of them do not need to be used by user.
The classes in this module are extended at :mod:`mima.imaextra` or :mod:`vima.imaextra` modules.
Nevertheless, it is worthwhile to create a list of class with a bit high-level explanation.
- :class:`ImaDdNcFileCommon` represents a single file.
- :class:`ImaExtraFileCommon` represents a single file of ``imaextra`` data, extending :class:`ImaDdNcFileCommon`
- :class:`ImaParamFileCommon` represents a single file of ``imaparam`` data, extending :class:`ImaDdNcFileCommon`
- :class:`ImaDdNcDatabase` represents a tree of dataset, both for ``imaextra`` and ``imaparam``
- :class:`ImaDdNcFileFactory` is a caching system, namely, from the data tree (:class:`ImaDdNcDatabase`),
a single file object (:class:`ImaExtraFileCommon` or :class:`ImaParamFileCommon`) is produced
- :class:`ImaParamCommon` represents a time series of ``imaparam`` data.
- :class:`ImaExtraCommon` represents a time series of ``imaextra`` data.
*(END for developer)*
'''
import os
import sys
import logging
_logger = logging.getLogger(__name__)
import datetime
import gzip
import tempfile
import numpy as np
from netCDF4 import MFDataset
import irfpy.asperacommon.dd
[docs]class ImaDdNcFileCommon:
''' A common functionality of IMA DD/NC file.
This will be extened to classes :class:`irfpy.mima.imaextra.ImaExtraFile`,
:class:`irfpy.vima.imaextra.ImaExtraFile`, etc.
It has a member variable ``dim`` and ``var``, which respectively have
informationon dimensions and variables.
'''
logger = logging.getLogger(__name__ + '.ImaDdNcFileCommon')
def __init__(self, filename, gunzip=True):
''' Open the file and read the data.
'''
self.filename = filename
self.gunzip = gunzip
if gunzip:
with gzip.open(filename) as unc:
s = unc.read()
else:
with open(filename) as unc:
s = unc.read()
with tempfile.NamedTemporaryFile(delete=False) as ncf:
ncfname = ncf.name
ncf.write(s)
(self.dim, self.var) = self.__read_ncdf(ncfname)
#self.logger.debug(self.var['StartTime'])
self.logger.debug('Removing temporary file %s' % ncfname)
try:
os.remove(ncfname)
except:
self.logger.warning('Removing file {} failed. Not critical. Continue.'.format(ncfname))
def __read_ncdf(self, fn):
self.logger.info(fn)
with MFDataset(fn) as f:
dimkey = list(f.dimensions.keys())
varkey = list(f.variables.keys())
dim = {}
var = {}
for d in dimkey:
self.logger.debug('dimemsions["%s"]' % d)
dim[d] = f.dimensions[d]
for v in varkey:
self.logger.debug('variables["%s"]' % v)
var[v] = f.variables[v][:]
return (dim, var)
def __get_ddtimes(self):
''' Return the DD time as an array of string.
'''
ddtime = self.var['Time'] # List of np.array (dtype=byte)
#self.logger.debug('Time Original:')
#self.logger.debug(ddtime)
ddtime_array = [(b''.join(dd)).decode('latin=1') for dd in ddtime]
#self.logger.debug('Time Converted:')
#self.logger.debug(ddtime_array)
return ddtime_array
[docs] def getobstime(self):
''' Return the array of the observation time as ``datetime.datetime`` instance.
'''
ts = self.__get_ddtimes()
td = [irfpy.asperacommon.dd.ddtime2datetime(t) for t in ts]
return td
[docs] def t0(self):
''' Return the start time.
'''
return irfpy.asperacommon.dd.ncdfdd2datetime(self.var['StartTime'])
[docs] def t1(self):
''' Return the stop time.
'''
return irfpy.asperacommon.dd.ncdfdd2datetime(self.var['StopTime'])
[docs]class ImaDdTimeSeries:
''' Represents a time series of DD data.
'''
def __init__(self, data_key_list):
self.data_keys = data_key_list
self.initialize()
[docs] def initialize(self):
self.ndat = 0
self.obstime = []
self.data = {}
for k in self.data_keys:
self.data[k] = None
[docs] def tabulate(self):
raise NotImplementedError()
[docs] def append(self, other):
raise NotImplementedError()
[docs] def trim(self, t0, t1):
raise NotImplementedError()
[docs]class ImaParamCommon(ImaDdTimeSeries):
''' IMA parameter data class.
A common algorithm tha handles IMA parameter data.
It is used for both MEX and VEX.
'''
def __init__(self, key_list):
ImaDdTimeSeries.__init__(self, key_list)
[docs] def tabulate(self, fp=sys.stdout):
tab = '\t'
header = 'serial\ttime\t'
header = header + '\t'.join(self.data_keys)
print('#' + header, file=fp)
for i in range(self.ndat):
print(i, tab, end=' ', file=fp)
print(self.obstime[i].strftime('%FT%T'), tab, end=' ', file=fp)
for k in self.data_keys:
if self.data[k][i].shape == (3,):
for j in range(3):
print(self.data[k][i][j], tab, end=' ', file=fp)
else:
print(self.data[k][i], tab, end=' ', file=fp)
print(file=fp)
[docs] def append(self, other):
# First, concatenate the ImaParam
self.ndat = self.ndat + other.ndat
if self.ndat == 0:
return self
self.obstime = self.obstime + other.obstime
for k in self.data_keys:
if self.data[k] is None:
self.data[k] = other.data[k]
elif other.data[k] is None:
pass # Do nothing on self.data[k]
else:
self.data[k] = np.concatenate((self.data[k], other.data[k]))
# Second, check the duplication.
# Index for unique data according to the obstime.
self.obstime, unique_index = np.unique(self.obstime, return_index=True)
self.ndat = len(self.obstime)
for k in self.data_keys:
# Then, unique according to the obstime
self.data[k] = self.data[k][unique_index]
# Thrid, sort by time. The sorted index is stored.
sorted_index = np.argsort(self.obstime)
self.obstime.sort()
for k in self.data_keys:
# Then, unique according to the obstime
self.data[k] = self.data[k][sorted_index]
self.obstime = self.obstime.tolist()
[docs] def trim(self, t0, t1):
''' You may trim the dataset to the specified time interval (inclusive).
'''
if self.ndat == 0:
return self
obstime = np.array(self.obstime)
indexvalid = np.where(np.logical_and(obstime >= t0, obstime <= t1))[0]
self.obstime = obstime[indexvalid].tolist()
self.ndat = len(self.obstime)
for k in list(self.data.keys()):
self.data[k] = self.data[k][indexvalid]
[docs]class ImaParamFileCommon(ImaDdNcFileCommon):
def __init__(self, *args, **kwds):
ImaDdNcFileCommon.__init__(self, *args, **kwds)
[docs]class ImaDdNcFileFactory:
''' A factory class for IMA extra dataset.
Instead of creating :class:`ImaExtraFile` or
:class:`ImaParamFile` instance directly,
the class returns the instance with using a ``RingCache``.
.. code-block:: py
iexfact = ImaDdNcFileFactory.createFactory()
iexfact.getImaExtraFile('imaextra20123640214.nc.gz')
'''
def __init__(self, template_class, cachesize=5):
self.cache = irfpy.util.ringcache.RingCache(cachesize)
self.template_class = template_class
[docs] def getDdNcFile(self, filename, gunzip=True):
#logger = logging.getLogger(__name__ + '.ImaDdNcFileFactory.getDdNcFile')
if self.cache.hasKey(filename):
#logger.debug('Filename in cache: {}'.format(filename))
return self.cache.get(filename)
else:
#logger.debug('Filename to be loaded: {}'.format(filename))
#logger.debug('Template class is {}'.format(self.template_class))
iex = self.template_class(filename, gunzip=gunzip)
self.cache.add(filename, iex)
return iex
[docs]class ImaDdNcDatabase:
''' IMA DD/NC database
The IMA DD/NC data base stores the filename and the start time
guessed from the file name.
By using this database, one can easily get the file name with a key
of the observation time.
'''
logger = logging.getLogger(__name__ + '.ImaDdNcDatabase')
def __init__(self, database, prefix_name, ddncfactory, verbose=False):
''' Create the database.
Use :meth:`createDatabase` class method for creating database.
:param database: Database directory path. If ``None``, the
[vima]-imaextrabase entry from irfpyrc is read.
:param prefix_name: The filename should be ``imaXXXXX20103091457.nc.gz``.
The prefix, ``imaXXXXX`` should be specified to identify
the start of the time expression.
:param ddncfactory: Factory object that create the object of the file contents.
It should be an object of the class that extends :class:`ImaDdNcFileFactory`.
For example, :class:`ImaExtraFileFactory` can be given.
:keyword verbose: The keyword is outdated. Do nothing. Use logging to display
a log with the name of ``irfpy.imacommon.imaextra2.ImaDdNcDatabase``.
'''
self.logger.debug('database=%s' % database)
self.ddncfactory = ddncfactory
s = len(prefix_name)
self.filelist = irfpy.util.timeseriesdb.DB()
for pp, dd, ff in os.walk(database):
for f in ff:
if f.startswith(prefix_name) and f.endswith('.nc.gz'):
yr = int(f[s:s + 4])
dddoi = int(f[s + 4:s + 7])
hr = int(f[s + 7:s + 9])
mi = int(f[s + 9:s + 11])
tt = datetime.datetime(yr, 1, 1, hr, mi) + datetime.timedelta(days=dddoi)
self.filelist.append(os.path.join(pp, f), tt)
# self.logger.debug('Append -> %s %s ' % (os.path.join(pp, f), tt))
self.exact_time_cache = {}
def __len__(self):
return len(self.filelist)
[docs] def nextof(self, filename):
''' Return the next file
'''
return self.filelist.nextof(filename)
[docs] def previousof(self, filename):
''' Return the previous file
'''
return self.filelist.previousof(filename)
[docs] def get_filename_from_db(self, t):
''' Return the filename from database.
'''
return self.filelist.get(t)
[docs] def get_timerange(self, filename):
''' Return the time range (exact) from filename
'''
if filename in self.exact_time_cache:
t0, t1 = self.exact_time_cache[filename]
else:
iex = self.ddncfactory.getDdNcFile(filename)
t0 = iex.t0()
t1 = iex.t1() # Should be inclusive!
#self.logger.debug('%s: %s -- %s' % (filename, t0, t1))
self.exact_time_cache[filename] = [t0, t1]
return t0, t1
[docs] def get_filename(self, t, nodata='exception'):
''' Return the filename from data file.
Users are recommended to use this method.
:param t: Time.
:param nodata: Set behaviour if the specified time
falls into the data gap. 'before' returns the
file name that has the previous data to ``t``.
'after' return the file name that has the next data of ``t``.
Otherwise, raise an :class:`irfpy.util.timeseriesdb.DataNotInDbError`.
'''
# First, get the preliminary file name.
try:
filename = self.get_filename_from_db(t)
except irfpy.util.timeseriesdb.DataNotInDbError as e:
if nodata == 'after':
return self.filelist.get(self.filelist.t0())
t0, t1 = self.get_timerange(filename)
self.logger.debug('Examine: %s' % t)
if t >= t0 and t <= t1:
self.logger.debug('# %s # --- %s -o- %s ---' % (filename, t0, t1))
return filename
elif t < t0: # If the given time is earlier than the expected file's start
self.logger.debug('# %s # -o- %s --- %s ---' % (filename, t0, t1))
while True:
try:
filename = self.filelist.previousof(filename) # If the data is before the earliest, here exception raised.
except irfpy.util.timeseriesdb.DataNotInDbError as e:
if nodata == 'after':
return filename
else:
raise e
t0, t1 = self.get_timerange(filename)
if t >= t0 and t <= t1:
self.logger.debug('# %s # --- %s -o- %s ---' % (filename, t0, t1))
return filename
elif t > t1:
self.logger.debug('# %s # --- %s --- %s -o-' % (filename, t0, t1))
if nodata == 'before':
return filename
elif nodata == 'after':
return self.filelist.nextof(filename)
else:
raise irfpy.util.timeseriesdb.DataNotInDbError('Data@%s not in the DB' % t)
else:
self.logger.debug('# %s # -o- %s --- %s ---' % (filename, t0, t1))
elif t > t1: # If the given time is later than the expected file's end
self.logger.debug('# %s # --- %s --- %s -o-' % (filename, t0, t1))
while True:
try:
filename = self.filelist.nextof(filename) # If the data is after the last file, here exception is raised
except irfpy.util.timeseriesdb.DataNotInDbError as e:
if nodata == 'before':
return filename
else:
raise e
t0, t1 = self.get_timerange(filename)
if t >= t0 and t <= t1:
self.logger.debug('# %s # --- %s -o- %s ---' % (filename, t0, t1))
return filename
elif t < t0:
self.logger.debug('# %s # -o- %s --- %s ---' % (filename, t0, t1))
if nodata == 'before':
return self.filelist.previousof(filename)
elif nodata == 'after':
return filename
else:
raise irfpy.util.timeseriesdb.DataNotInDbError('Data@%s not in the DB' % t)
else:
self.logger.debug('# %s # --- %s --- %s -o-' % (filename, t0, t1))
[docs] def get_filenames(self, t0, t1):
''' Return the list of filename
'''
t0file = self.get_filename(t0, nodata='after')
iex0 = self.ddncfactory.getDdNcFile(t0file)
t0iex0 = iex0.t0()
t1iex0 = iex0.t1()
if t1 < t0iex0:
# In this case, the given period is inside data gap.
return []
t1file = self.get_filename(t1, nodata='before')
self.logger.info('t0file=%s' % t0file)
self.logger.info('t1file=%s' % t1file)
files = [t0file]
while files[-1] != t1file:
files.append(self.nextof(files[-1]))
return files