Source code for irfpy.vima.scidata

''' VEX/IMA science data, providing the count rate data file access

A file-based VEX/IMA science data accessor.
The MATLAB formatted file is loaded.

.. note::

    From v4.0.6, the default dataset became the "aurora" database (aka full database).

For high-level use, you may use
:func:`get_ima_count_file`.
It will read the corresponding file.

The returned is :class:`ImaCountFile` object.
Basically you may use

- :attr:`ImaCountFile.mat` to access the data matrix.
- :meth:`ImaCountFile.getobstime` to access the observed time.

A simple example follows

>>> t = datetime.datetime(2006, 12, 5, 7)
>>> imafile = get_ima_count_file(t)
>>> if imafile is not None:
...     mat = imafile.mat
... else:
...     pass

*How to handle the instance ``mat``?*

See an independent document :ref:`tutorial_vima_scidata`.

The data url should be specified by
[vima] imamatbase in ``.irfpyrc``.

::

    [vima]
    imamatbase = /path/to/datatree/

.. todo::

    Disable the functionality of URI.
    Only limit to local access!
'''
import os

import datetime

import logging

#import urllib.request, urllib.parse, urllib.error

import warnings

import numpy as np
import scipy.io

from irfpy.util.irfpyrc import Rc
from irfpy.util.utc import num2date as _num2date

from irfpy.asperacommon.excepts import DbPathNotFoundError
from irfpy.asperacommon.excepts import DatafileNotFoundError

import irfpy.imacommon.scidata as imascidata

[docs]class ImaCountFile: ''' VEX/IMA data file. >>> imacnt = ImaCountFile(ImaCountFile.get_sample_filename()) # doctest: +SKIP ''' rc = Rc()
[docs] @classmethod def get_sample_filename(cls): # from pkg_resources import resource_filename # fn = resource_filename(__name__, # os.path.join('sample', '20070101050000.mat')) p = cls.get_base_path() fn = os.path.join(p, '2007', '01', '01', 'DDS_VIA_20070101T050000.mat') logging.debug(fn) return fn
[docs] @classmethod def get_uribase_path(cls): import warnings warnings.warn("get_uribase_path no longer supported. Use get_base_path().") return cls.get_base_path()
[docs] @classmethod def get_uri(cls, yr, mo, dy, hr): import warnings warnings.warn("get_uri no longer supported. Use get_filename().") return cls.get_filename(yr, mo, dy, hr)
[docs] @classmethod def get_base_path(cls): path = cls.rc.get('vima', 'imamatbase') if path is None: raise DbPathNotFoundError( 'No entry of [vima]/imamatbase in irfpyrc.') return path
[docs] @classmethod def get_filename(cls, yr, mo, dy, hr): ttry = datetime.datetime(yr, mo, dy, hr, 0, 0) subpath = os.path.join(cls.get_base_path, ttry.strftime('%Y%m'), ttry.strftime('%Y%m%d%H0000.mat')) return subpath
def __init__(self, filename): ''' Load the matlab count rate file :param filename: The filename of the file. URI is no more supported (at least maintained.) ''' if filename.startswith('http://') or filename.startswith('https://') or filename.startswith('file:///'): warnings.warn('No URI support. Use local file.') self.logger = logging.getLogger(self.__class__.__name__) # try: # self.file, self.uriheader = urllib.request.urlretrieve(filename) # except IOError as e: # emsg = 'Failed to open the matlab file at %s' % filename # self.logger.error(emsg) # raise DatafileNotFoundError(emsg) self.uri = self.file = filename # self.mat contains all the matlab data. try: self.mat = scipy.io.loadmat(self.file) ''' Matlab formatted data ''' except (IOError, FileNotFoundError, TypeError) as e: emsg = 'Failed to open the matlab file: %s' % self.file # self.logger.error(emsg) raise DatafileNotFoundError(emsg)
[docs] def getobstime(self): ''' Return the observation time list. :returns: Observation time list (component is in ``datetime.datetime`` instance) is returned. >>> imacnt = ImaCountFile(ImaCountFile.get_sample_filename()) >>> print(len(imacnt.getobstime())) 104 ''' obstime_mat = list(set(self.mat.get('iontime')[0])) obstime_mat.sort() obstime_mpl = [_num2date(i - 366) for i in obstime_mat] obstime_mpl = [i.replace(tzinfo=None) for i in obstime_mpl] return obstime_mpl
[docs] def gettimerange(self, start_offset=0, stop_offset=0): ''' Return the time range of the observation. :returns: Range of data file time as a list of ``datetime.datetime`` instances. [*t0* + ``start_offset``, *t1* + ``stop_offset``] is returned, where *t0* is the start time (the time of the first packet) and *t1* is the end time (the time of the last packet). This means that the end time *t1* does not agree with the end of the observation time. The end of the observation time is the end time plus data acquisition time (12 sec or 24 sec, typically). Specifying ``stop_offset`` will compensate this issue. :keyword start_offset: Offset in seconds to be added to the start time. :keyword stop_offset: Offset in seconds to be added to the stop time. >>> imacnt = ImaCountFile(ImaCountFile.get_sample_filename()) >>> print(imacnt.gettimerange()) [datetime.datetime(2007, 1, 1, 5, 19, 11, 880840), datetime.datetime(2007, 1, 1, 6, 0, 24, 256140)] >>> print(imacnt.gettimerange(stop_offset=12)) [datetime.datetime(2007, 1, 1, 5, 19, 11, 880840), datetime.datetime(2007, 1, 1, 6, 0, 36, 256140)] ''' tlist = self.getobstime() off0 = datetime.timedelta(seconds=start_offset) off1 = datetime.timedelta(seconds=stop_offset) return [min(tlist) + off0, max(tlist) + off1]
[docs]def isdb(): try: dbpath = ImaCountFile.get_base_path() except DbPathNotFoundError: return False return os.path.isdir(dbpath)
[docs]def get_ima_count_file_reduced(time): ''' Return the :class:`ImaCountFile` instance covering the specified time, from reduced-HD embedded dataset. :param time: Time of the data :type time: ``datetime.datetime`` instance. Note: This is not yet fully debugged. ''' ttry = time path = os.path.join( ImaCountFile.get_base_path(), ttry.strftime('%Y%m'), ttry.strftime('%Y%m%d%H0000.mat')) try: imacnt = ImaCountFile(path) t0, t1 = imacnt.gettimerange() if time < t0: ttry = time - datetime.timedelta(seconds=3600) except DatafileNotFoundError as e: ttry = time - datetime.timedelta(seconds=3600) # Rewind 1 hour if ttry != time: ttry = time - datetime.timedelta(seconds=3600) # Data 1 hour ago should be fetched. path = os.path.join( ImaCountFile.get_base_path(), ttry.strftime('%Y%m'), ttry.strftime('%Y%m%d%H0000.mat')) try: imacnt = ImaCountFile(path) except DatafileNotFoundError as e: return None return imacnt
[docs]def get_ima_count_file_full(time): ''' Return the :class:`ImaCountFile` instance covering the specified time, from full-HK embedded data. :param time: Time of the data :type time: ``datetime.datetime`` instance. Note: This is not yet fully debugged. ''' ttry = time path = os.path.join( ImaCountFile.get_base_path(), ttry.strftime('%Y'), ttry.strftime('%m'), ttry.strftime('%d'), ttry.strftime('DDS_VIA_%Y%m%dT%H0000.mat')) try: imacnt = ImaCountFile(path) t0, t1 = imacnt.gettimerange() if time < t0: ttry = time - datetime.timedelta(seconds=3600) except DatafileNotFoundError as e: ttry = time - datetime.timedelta(seconds=3600) # Rewind 1 hour if ttry != time: ttry = time - datetime.timedelta(seconds=3600) # Data 1 hour ago should be fetched. path = os.path.join( ImaCountFile.get_base_path(), ttry.strftime('%Y'), ttry.strftime('%m'), ttry.strftime('%d'), ttry.strftime('DDS_VIA_%Y%m%dT%H0000.mat')) try: imacnt = ImaCountFile(path) except DatafileNotFoundError as e: return None return imacnt
[docs]def get_ima_count_file(time): ''' Return the :class:`ImaCountFile` instance covering the specified time. The dataset is selected either from - Full-HK embedded data (aka Leif-matfile, or aurora-matfile). :func:`get_ima_count_file_full` - Reduced-HK embedded data (aka Herbert-matfile, or spaghetti-matfile). :func:`get_ima_count_file_reduced` Because of slightly different name convention, this function judges which dataset user is referring automatically. Full-HK is prioritized, and definitively recommended to be used. :param time: Time of the data :type time: ``datetime.datetime`` instance. :returns: Ima count file. :rtype: :class:`ImaCountFile` instance. If there is no data file at the given time, None is returned. ''' icf = get_ima_count_file_full(time) if icf is None: icf = get_ima_count_file_reduced(time) return icf
[docs]def reshape_mat3d(mat, npol=16): ''' Reshape the matlab contents into MAEPT order The function reshapes the matrix to MAEPT order (mass-azimuth-energy-polar-time). Polar (elevaion) number should be specified, if you know it is not 16 elevation mode. .. todo:: Not polar number, but mode should be specified in the future. :returns: (spectra, masslist, azimlist, polarlist, timelist). spectra has (N, 32, 16, 96, 16) element, where N is the number of 3-D data. masslist, azimlist, polarlist has (N, 2, 16, 96, 16) shape. timelist has (N, 16, 96, 16) shape. >>> f = get_ima_count_file(datetime.datetime(2013, 1, 17, 7)) >>> if f is not None: ... s, m, a, e, t = reshape_mat3d(f.mat) ... else: ... s = m = a = e = t = None Spectra matrix, s, should have (mass, azim, ene, polar, time) shape >>> print(s.shape) # doctest: +SKIP (32, 16, 96, 16, 19) Mass matrix, m, should have (2 = [stardidx, stopidx], mass, azim, polar, time) shape >>> print(m.shape) # doctest: +SKIP (2, 32, 16, 16, 19) Time matrix, t, must have (mass, azim, polar, time) shape >>> print(t.shape) # doctest: +SKIP (32, 16, 16, 19) You may (forcely) change the number of elevation scan >>> if f is not None: ... s, m, a, e, t = reshape_mat3d(f.mat, npol=8) ... else: ... s = m = a = e = t = None >>> print(s.shape) # doctest: +SKIP (32, 16, 96, 8, 38) ''' return imascidata.reshape_mat3d(mat, npol=npol)
[docs]def reshape_mat(mat): ''' Reshape the matlab contents into MAET order. The function reshape the matrix to MAET order. :param mat: Matlab contents. :returns: (spectra, masslist, azimlist, elevlist, timelist) Each output has the shape of (32, 16, 96, T) for spectra; (32, 16, T) for timelist; and (2, 32, 16, T) for others. T is the number of time. :param mat: Matlab format data file :returns: Tuple of array. (spectra, mass, azimuth, polar, time). >>> f = get_ima_count_file(datetime.datetime(2013, 1, 17, 7)) >>> if f is not None: ... s, m, a, e, t = reshape_mat(f.mat) ... else: ... s = m = a = e = t = None spectrum, s, has the (mass, azimuth, energy, time (withpolar), ) shape. >>> print(s.shape) # doctest: +SKIP (32, 16, 96, 304) mass, m, has the (2 = [startindex, stopindex], mass, azimuth, time (polar), ) shape >>> print(m.shape) # doctest: +SKIP (2, 32, 16, 304) Time, t, has the (mass, azimuth, time (polar), ) shape >>> print(t.shape) # doctest: +SKIP (32, 16, 304) ''' return imascidata.reshape_mat(mat)
[docs]def reshape_mat_24(mat): ''' (Deprecated!) Reshape the matlab contents into MAET order. Use :func:`reshape_mat`. ''' import warnings warnings.warn('Use reshape_mat function.', category=DeprecationWarning) return reshape_mat(mat)