Source code for irfpy.vmag.scidata

'''MAG data module.

MAG is the magnetometer on board VEX.

There are two dataset supported. Graz-IRF collaborative dataset (4s time resolution)
and PSA dataset (1s time resolution). The 1s dataset can be accessed by the module
named :mod:`irfpy.vmag.scidata1s`.

*Graz-IRF collaborative dataset (4s data product)*

This MAG dataset was provided from Graz, and stored into a folder of user's preference.
The path should be specified in ``.irfpyrc`` with the entry of
``vmagbase`` under the section of ``[vmag]``.

See example below::

    [vmag]
    vmagbase = /venus/mag


*Getting data*

The simplest way of getting data is as follows:

>>> t0 = datetime.datetime(2006, 12, 5, 10, 0, 0)
>>> tobs, magvso = get_mag(t0)

You can get data in S/C frame using ``frame`` keyword.

>>> tobs, magsc = get_mag(t0, frame='sc')

Note that the ``t0`` specified is not exactly the same as ``tobs`` returned.
This is because the method just returns the data in the database tagged as
the previous time of the specified time.

Thus, it is sometimes better if you know the exact time in the dataset.
You can use get_obstime for this purpose.

>>> t0 = datetime.datetime(2006, 12, 5, 10, 0, 0)
>>> t1 = datetime.datetime(2006, 12, 5, 10, 10, 0)
>>> tlist = get_obstime(t0, t1)

This will give you the exact time of the data in the database.

If you want the data in the time series, you may also use the following syntax.

>>> t0 = datetime.datetime(2006, 12, 5, 10, 0, 0)
>>> t1 = datetime.datetime(2006, 12, 5, 10, 10, 0)
>>> tobs, magvso = get_magarray(t0, t1, frame='vso')

The data may contain untrusted data. In this case, 3-element array composed of ``nan`` is returned.

>>> t0 = datetime.datetime(2014, 11, 15, 0, 6, 1)
>>> tobs, magvso = get_mag(t0)
>>> print(magvso)      # doctest: +NORMALIZE_WHITESPACE
[nan nan nan]

**Note for developer**

The module structure follows the guideline of data base accessor.
See :ref:`recipe_vels_hierarchy` for reference.

**Note for logging**

A python logger with name 'irfpy.vmag.scidata' is used.
'''

import os
import datetime
import dateutil.parser
import logging
logger = logging.getLogger(__name__)

import bisect

import gzip

import numpy
import numpy as _np

import irfpy.util.timeseriesdb
import irfpy.util.ringcache
import irfpy.util.irfpyrc
import irfpy.util.filepairv as fpair


from irfpy.vmag import __version__

flag = '99999.999'


[docs]class MagData: ''' MAG data at a specified time. (4-s data) MAG data contains the following information. *Observation time* (``starttime``) The start time of the observation. ``datetime.datetime`` instance. *B field in VSO frame* (``bvso``) The magnetic field in the VSO frame. Instance of ``numpy.array`` with the shape of (3,). *B field in VEX frame* (``bsc``) The magnetic field in the VEX frame. Instance of ``numpy.array`` with the shape of (3,). Accessor by index is also supported. ''' def __init__(self, t, bvso, bsc): if not isinstance(t, datetime.datetime): raise TypeError('Wrong argument (0), should be datetime instance') try: try: if bvso.shape != (3, ): raise ValueError('Wrong shape (1)') except AttributeError: raise TypeError('Wrong argument(1), should be numpy array') except ValueError: raise try: try: if bsc.shape != (3, ): raise ValueError('Wrong shape (2)') except AttributeError: raise TypeError('Wrong argument(2), should be numpy array') except ValueError: raise self.starttime = t self.b = {} self.b['vso'] = bvso.copy() self.b['sc'] = bsc.copy()
[docs] def get_starttime(self): return self.starttime
[docs] def get_data(self, frame=None): if frame is None: frame = 'vso' try: b = self.b[frame].copy() except KeyError as e: emsg = 'No coordinates %s defined.' % frame raise KeyError(emsg) return b
[docs]def parse_magfile(filename): """ The mag data file is read and parsed. :param filename: Filename :return: A dictionary , with a key of time and a value of data. """ logger.debug('Parsing the file: {}'.format(filename)) if filename.lower().endswith('.gz'): fp = gzip.open(filename, 'rt') else: fp = open(filename) datadict = {} # One can use numpy.genfromtxt() for below, but the performance is almost identical (180129). for line in fp: if line.startswith('#'): continue ltoken = line.split() if len(ltoken) != 15: continue t = dateutil.parser.parse(ltoken[0]) for i in range(2, 9): if ltoken[i] == flag: ltoken[i] = 'nan' bsc = numpy.array([float(ltoken[2]), float(ltoken[3]), float(ltoken[4])]) bvso = numpy.array([float(ltoken[6]), float(ltoken[7]), float(ltoken[8])]) datadict[t] = MagData(t, bvso, bsc) fp.close() logger.debug('Parsing finished') return datadict
[docs]class MagFile: ''' Class for a single MAG data file. :param filename: File name of MAG data. Normally this is to reading the file, the instance should be regarded as *immutable*. Contents of an instance is - Time array, sorted array of the observation time - Data dict, :class:`MagData` instance corresponding to the time ''' def __init__(self, filename): self.filename = filename self.parse_file()
[docs] def parse_file(self): ''' Parse the file. It takes 6 s for my Mac. Should be much shorter, though. ''' cache = self.filename + '.filepairv' self.datadict = fpair.filepair(self.filename, cache, converter=parse_magfile, version=__version__) # self.datadict = parse_magfile(self.filename) self.tlist = sorted(self.datadict.keys())
[docs] def get_data(self, t): ''' Pick the data of specified time :param t: Time. :type t: ``datetime.datetime`` :returns: The corresponding :class:`MagData`. :rtype: :class:`MagData` instance ''' magtimes = self.get_obstime()[:] magtimes.append(magtimes[-1] + datetime.timedelta(seconds=4)) idx = bisect.bisect(magtimes, t) if idx <= 0: raise ValueError('No data for %s in this datafile (too early)' % t.strftime('%FT%T')) if idx >= len(magtimes): raise ValueError('No data for %s in this datafile (too late)' % t.strftime('%FT%T')) idx = idx - 1 t0 = magtimes[idx] return self.datadict[t0]
[docs] def get_obstime(self): '''Return the observation time list. :returns: Observation time list of ``datetime.datetime`` instance :rtype: ``list`` of ``datetime.datetime`` ''' return self.tlist
[docs] def get_timerange(self, start_offset=0, stop_offset=0): ''' Return the time range of the observation. :returns: Range of data file time as a list of ``datetime.datetime`` instances. [*t0* + ``start_offset``, *t1* + ``stop_offset``] is returned, where *t0* is the start time (the time of the first packet) and *t1* is the end time (the time of the last packet). This means that the end time *t1* does not agree with the end of the observation time. The end of the observation time should be the end time plus data acquisition time (4 sec, typically). Specifying ``stop_offset`` will compensate this issue. :keyword start_offset: Offset in seconds to be added to the start time. :keyword stop_offset: Offset in seconds to be added to the stop time. ''' off0 = datetime.timedelta(seconds=start_offset) off1 = datetime.timedelta(seconds=stop_offset) return [self.tlist[0] + off0, self.tlist[-1] + off1]
[docs]class MagDB: ''' Class for MAG data base. :param rootfolder: Root folder. Data base is assumed to have a strucuture like :: <rootfolder> -+- MAG_2009-01-01T00-00-00_DOY_001_S004_3_VSO_SAPV.dat +- MAG_2009-01-02T00-00-00_DOY_002_S004_3_VSO_SAPV.dat +- MAG_2009-01-03T00-00-00_DOY_003_S004_3_VSO_SAPV.dat +- ... The data base is in the spaghetti.irf.se by manually mirroring from the Graz server thank to Tielong. The ``<rootfolder>`` in the spaghetti is ``/venus/mag/``. ''' def __init__(self, rootfolder, enable_cache=True, cachesize=10): '''Initialize MagDB :param rootfolder: The root folder. :keyword enable_cache: Enable cache. :keyword cachesize: Cache size ''' self.db = irfpy.util.timeseriesdb.DB() self.enable_cache = enable_cache if enable_cache: self.magfilecache = irfpy.util.ringcache.RingCache(cachesize) if os.path.isdir(rootfolder): list = os.listdir(rootfolder) # <rootfolder>/###### for l in list: if l.endswith('~'): continue if not (l.endswith('.dat') or l.endswith('.dat.gz')): continue ltoken = l.split('_') if len(ltoken) != 8: continue t0 = dateutil.parser.parse(ltoken[1]).replace(tzinfo=None) full_l = os.path.join(rootfolder, l) self.db.append(full_l, t0) logger.debug("# Match: ``%s`` @ %s" % (full_l, t0.strftime('%FT%T'))) else: raise IOError( '``%s`` does not exist or is not directory.' % rootfolder) logger.info("# Data base loaded. Total %d files." % len(self.db))
[docs] def get_magfile(self, filename): ''' Get the instance for the MAG data file. ''' if self.enable_cache: if self.magfilecache.hasKey(filename): mag = self.magfilecache.get(filename) else: mag = MagFile(filename) self.magfilecache.add(filename, mag) return mag else: return MagFile(filename)
[docs] def get_filename_fromdb(self, t): ''' Return the file name registered in the data base. No clever check will be done as :meth:`get_filename`. Just return the file name registered in the data base. Use :meth:`get_filename` for strict use. ''' return self.db.get(t)
[docs] def get_filename(self, t, offset=4): ''' Use :meth:`get_filename_fromdb`. ''' return self.get_filename_fromdb(t)
[docs] def previousof(self, filename): return self.db.previousof(filename)
[docs] def nextof(self, filename): return self.db.nextof(filename)
[docs] @classmethod def get_defaultdb(cls, enable_cache=True, cachesize=10): ''' Return the instance of MagDB from default rootfolder. The default rootfolder can be defined by ``[vmag]-vmagbase`` in ``.irfpyrc``. ''' rc = irfpy.util.irfpyrc.Rc() rootfolder = rc.get('vmag', 'vmagbase') logger.info('Default DB loading from ``%s``.' % rootfolder) if rootfolder is None: emesg = "No entry of [vmag]-vmagbase in .irfpyrc." raise RuntimeError(emesg) return MagDB(rootfolder, enable_cache=enable_cache, cachesize=cachesize)
[docs]def get_defaultdb(enable_cache=True): ''' This is an easy-accessor for :meth:`ElsCountFileDB.get_defaultdb`. ''' return MagDB.get_defaultdb(enable_cache=enable_cache)
# Below is for easy access of the dataset to return the data. class __DataFactory: ''' Factory of the MAG data. Used by package interface functions. This factory would consist only of class methods. The class should be considered as a singleton by the developer of the moduel :mod:`scidata`. The only class member is the default instance of :class:`MagDB`. The instance is generated by :meth:`get_defaultdb`. ''' __database = None # MagDB instance, this should not be referred explictily. # ``db = cls._mkdb()`` will return this database (and create it if necessary). @classmethod def isdb(cls): return cls.__database is not None @classmethod def _mkdb(cls): if cls.__database is None: cls.__database = get_defaultdb() return cls.__database @classmethod def get_mag(cls, t, frame=None): db = cls._mkdb() fn = db.get_filename_fromdb(t) dfile = db.get_magfile(fn) t0, t1 = dfile.get_timerange() # This while statement should not be fired, # because MAG data file name is exactly the same # time as the first data. while t < t0: try: fn = db.previousof(fn) except irfpy.util.timeseriesdb.DataNotInDbError: return ValueError('Specified time %s not found in DB.' % t) dfile = db.get_magfile(fn) t0, t1 = dfile.get_timerange() data_at_t = dfile.get_data(t) tobs = data_at_t.get_starttime() b = data_at_t.get_data(frame=frame) return tobs, b @classmethod def get_mags(cls, t0, t1, frame=None): tlist = cls.get_obstime(t0, t1) tobslist = [] blist = [] for t in tlist: tobs, b = cls.get_mag(t, frame=frame) tobslist.append(tobs) blist.append(b) return tobslist, blist @classmethod def get_magarray(cls, t0, t1, frame=None): tlist, blist = cls.get_mags(t0, t1, frame=frame) barray = _np.array(blist) return tlist, barray @classmethod def get_obstime(cls, t0, t1): ''' Implementation of :meth:`get_obstime`. ''' db = cls._mkdb() obstime = [] t_file = db.get_filename_fromdb(t0) logger.debug('First t_file name = %s' % t_file) # Read one file before for no loss of data, # probably redundunt. try: t_file = db.previousof(t_file) logger.debug('For safety one file back. %s.' % t_file) except irfpy.util.timeseriesdb.DataNotInDbError: t_file = t_file logger.debug('No file back. %s.' % t_file) mag_file = db.get_magfile(t_file) t_range = mag_file.get_timerange() while t_range[0] <= t1: logger.debug('Tfile = %s, T0 = %s, T1 = %s' % (t_file, t_range[0], t_range[1])) obstime = obstime + mag_file.get_obstime() logger.debug('-> Done. N(obstime) = %d' % len(obstime)) try: t_file = db.nextof(t_file) logger.debug('Try to proceed one file. %s' % t_file) except irfpy.util.timeseriesdb.DataNotInDbError: logger.info('No further file.') break mag_file = db.get_magfile(t_file) t_range = mag_file.get_timerange() otime = [] for t in obstime: if t > t0 - datetime.timedelta(seconds=4) and t <= t1: otime.append(t) otime.sort() return otime
[docs]def isdb(): ''' Return if the default database is available or not. If ``False`` is returned, please check the entry of :: [vmag] vmagbase = /venus/mag ''' if __DataFactory.isdb(): return True else: try: __DataFactory._mkdb() return True except: return False
[docs]def get_mag(t, frame=None): ''' Return the MAG data in the specific time. :param t: Time :type t: ``datetime.datetime``. :returns: An array of the observation time and corresponding magnetic field data in the shape of (3,). >>> t0 = datetime.datetime(2006, 12, 5, 7, 59, 59, 820996) >>> t, mag = get_mag(t0) >>> print(t) # This may fail if data base changed. 2006-12-05 07:59:56 >>> print(mag.shape) # This may fail if data base changed. (3,) >>> print(mag) # This may fail if data base changed. [ 4.078 1.431 -0.049] >>> t, magsc = get_mag(t0, frame='sc') >>> print(t) 2006-12-05 07:59:56 >>> print(magsc) # doctest: +NORMALIZE_WHITESPACE [3.433 0.031 2.625] ''' return __DataFactory.get_mag(t, frame=frame)
[docs]def get_magarray(t0, t1, frame=None): ''' Return the MAG data between t0 and t1. :param t0: Start time, inclusive. :param t1: End time, inclusive. :keyword frame: Frame. ``'vso'`` (default) or ``'sc'``. :type frame: ``string`` :returns: A tuple (``obstime``, ``magarray``). ``obstime`` is a list of the observation time. ``magarray`` is a numpy array with a shape of (N, 3), where N is the number of data. >>> t0 = datetime.datetime(2006, 12, 5, 7, 30) >>> t1 = datetime.datetime(2006, 12, 5, 7, 31) >>> tlist, magarr = get_magarray(t0, t1) >>> print(len(tlist)) 16 >>> print(magarr.shape) (16, 3) ''' return __DataFactory.get_magarray(t0, t1, frame=frame)
[docs]def get_mags(t0, t1, frame=None): ''' Return the MAG data between t0 and t1. :param t0: Start time, inclusive. :param t1: End time, inclusive. :keyword frame: Frame. ``'vso'`` or ``'sc'``. :type frame: ``string`` :returns: A tuple (``obstime``, ``list_of_vector``). ``obstime`` is a list of the observation time. ``list_of_vector`` is a list of magnetic field, namely, a list of (3,) shaped array. The function is very similar to :func:`get_magarray`, but the structure is different. This function is better used for iterating the results, while the time series analysis (e.g. taking averages, variations) or just plotting, you may use :func:`get_magarray`. >>> t0 = datetime.datetime(2006, 12, 5, 7, 30) >>> t1 = datetime.datetime(2006, 12, 5, 7, 30, 15) >>> tlist, maglist = get_mags(t0, t1) >>> print(len(tlist)) 4 >>> for t, b in zip(tlist, maglist): ... print(t, b[0], b[1], b[2]) 2006-12-05 07:30:00 -6.163 -3.536 3.544 2006-12-05 07:30:04 -4.923 -3.832 4.088 2006-12-05 07:30:08 -4.826 -3.636 3.875 2006-12-05 07:30:12 -5.358 -2.949 3.315 ''' return __DataFactory.get_mags(t0, t1, frame=frame)
[docs]def get_obstime(t0, t1): ''' Get the obseration times (4-s resolution). :keyword t0: Start time :type t0: ``datetime.datetime`` :keyword t1: Start time :type t1: ``datetime.datetime`` :returns: List of ``datetime.datetime``. The following tests shows simple samples of using :meth:`get_obstime`. This is based on database on 110704, so tests may be failed according to the change in the database. >>> if not isdb(): ... pass # doctest: +SKIP ... else: ... t0 = datetime.datetime(2006, 12, 5) ... t1 = datetime.datetime(2006, 12, 6) ... t = get_obstime(t0, t1) ... print(len(t)) # Database change may cause fails of the test. 21601 >>> if not isdb(): ... pass # doctest: +SKIP ... else: ... print(t[0]) # Database change may cause fails of the test. 2006-12-05 00:00:00 >>> if not isdb(): ... pass # doctest: +SKIP ... else: ... print(t[-1]) # Database change may cause fails of the test. 2006-12-06 00:00:00 ''' return __DataFactory.get_obstime(t0, t1)
[docs]def get_average_mag(t0, t1, frame=None): """ Return the average magnetic field over t0 and t1. Averaging has been done component-by-component. :param t0: Start time :param t1: Stop time :returns: ``(Bx, By, Bz)`` >>> t0 = datetime.datetime(2012, 3, 1, 0, 0) >>> t1 = datetime.datetime(2012, 3, 1, 0, 10) >>> bx, by, bz = get_average_mag(t0, t1) >>> print('{:.3f} {:.3f} {:.3f}'.format(bx, by, bz)) 4.685 0.487 -1.157 """ tlist, mag = get_magarray(t0, t1, frame=frame) if len(tlist) == 0: return _np.array([_np.nan, _np.nan, _np.nan]) return _np.nanmean(mag, axis=0)
[docs]def get_avgstd_mag(t0, t1, frame=None): """ Return the average and std of magnetic field over t0 and t1. Average/Std has been taken component-by-component. :param t0: Start time :param t1: Stop time :returns: ``(Bx, By, Bz), (dBx, dBy, dBz)`` >>> t0 = datetime.datetime(2012, 3, 1, 0, 0) >>> t1 = datetime.datetime(2012, 3, 1, 0, 10) >>> (bx, by, bz), (dbx, dby, dbz) = get_avgstd_mag(t0, t1) >>> print('{:.3f} {:.3f} {:.3f}'.format(bx, by, bz)) 4.685 0.487 -1.157 >>> print('{:.3f} {:.3f} {:.3f}'.format(dbx, dby, dbz)) 0.551 0.742 0.460 """ tlist, mag = get_magarray(t0, t1, frame=frame) if len(tlist) == 0: return _np.array([_np.nan, _np.nan, _np.nan]), _np.array([_np.nan, _np.nan, _np.nan]) return _np.nanmean(mag, axis=0), _np.nanstd(mag, axis=0)
[docs]def clockangle(y, z): """ Calculate the clock angle. This calculate the clock angle. No abberation is considered. The definition of clock angle is that * 0 degree at positive Bz only * 90 degrees at positive By only * No abberation considered (i.e. thus, Bx not used) * The range is from -180 (exclusive) to +180 (inclusive) :param by: By :param bz: Bz :returnc: Clock angle in degrees >>> print(clockangle(1, 0)) 90.0 >>> print(clockangle(0, 1)) 0.0 >>> print(clockangle(-1, 0)) -90.0 >>> print(clockangle(0, -1)) 180.0 """ return _np.rad2deg(_np.arctan2(y, z))