r'''MAG data (1s data from PSA) module.
MAG is the magnetometer on board VEX.
There are two dataset supported. Graz-IRF collaborative dataset (4s time resolution)
and PSA dataset (1s time resolution). The 4s dataset can be accessed by the module
named :mod:`irfpy.vmag.scidata`.
*PSA 1s dataset*
This MAG dataset was provided from PSA, produced by Graz.
Follow the PSA regulation for the use of this dataset.
PSA dataset should be downloaded prior.
It is avialable at ftp://psa.esac.esa.int/pub/mirror/VENUS-EXPRESS/MAG.
The 1s dataset is for the directories ``VEX-V-Y-MAG-3*``.
.. tip::
To save the local disk volume, the dataset (table; ``*.TAB``) can be gzipped.
The folder stroing the dataset should be specified in
``${HOME}/.irfpyrc`` file as follows::
[vmag]
vmagbase_1s = /venus/mag.1s/MAG
*Use of data*
The `data center approach <https://irfpy.irf.se/projects/util/tutorial/tutorial_datacenter.html>`_
can be used.
First, create the data center.
>>> from irfpy.vmag import scidata1s
>>> dc = scidata1s.DataCenterMag1s()
You can access the data via ``dc.nearest()``, ``dc.iter()``, or ``dc.get_array()``.
To get the nearest data to the given time:
>>> import datetime
>>> obstime, magdata = dc.nearest(datetime.datetime(2009, 12, 24, 23, 48, 50))
>>> print(obstime)
2009-12-24 23:48:49.563000
>>> print(magdata) # doctest: +SKIP
[2.954 -3.415 4.391]
To iterate the data
>>> t0 = datetime.datetime(2009, 12, 24, 23, 48, 50)
>>> t1 = datetime.datetime(2009, 12, 24, 23, 49, 0)
>>> for obstime, magdata in dc.iter(t0, t1):
... print(obstime)
... print(magdata) # doctest: +SKIP
2009-12-24 23:48:50.563000
[2.943 -3.441 4.366]
2009-12-24 23:48:51.563000
[2.931 -3.464 4.406]
2009-12-24 23:48:52.563000
[2.887 -3.465 4.393]
2009-12-24 23:48:53.563000
[2.904 -3.438 4.399]
2009-12-24 23:48:54.563000
[2.912 -3.424 4.362]
2009-12-24 23:48:55.563000
[-- -- --]
2009-12-24 23:48:56.563000
[-- -- --]
2009-12-24 23:48:57.563000
[-- -- --]
2009-12-24 23:48:58.563000
[-- -- --]
2009-12-24 23:48:59.563000
[-- -- --]
The returned data is an object of ``numpy masked array``.
>>> import numpy as np
>>> time_list, mag_list = dc.get_array(t0, t1)
>>> mag_ma_array = np.ma.masked_array(mag_list) # Masked array not to see the invalid values.
>>> print(mag_ma_array) # doctest: +SKIP
[[2.943 -3.441 4.366]
[2.931 -3.464 4.406]
[2.887 -3.465 4.393]
[2.904 -3.438 4.399]
[2.912 -3.424 4.362]
[-- -- --]
[-- -- --]
[-- -- --]
[-- -- --]
[-- -- --]]
It is sometimes useful to use ``.filled(np.nan)`` functionality to make the data into the normal numpy array.
>>> mag_array = mag_ma_array.filled(np.nan) # Fill the masked value to NaN
>>> print(mag_array) # doctest: +SKIP
[[ 2.943 -3.441 4.366]
[ 2.931 -3.464 4.406]
[ 2.887 -3.465 4.393]
[ 2.904 -3.438 4.399]
[ 2.912 -3.424 4.362]
[ nan nan nan]
[ nan nan nan]
[ nan nan nan]
[ nan nan nan]
[ nan nan nan]]
'''
import os as _os
import datetime as _datetime
import dateutil.parser as _parser
import logging as _logging
import gzip as _gzip
import numpy as _np
from irfpy.util import exception as _ex
import irfpy.util.timeseries as _timeseries
import irfpy.util.irfpyrc as _irfpyrc
from irfpy.util import filepairv as _filepair
from irfpy.vmag import __version__
from irfpy.util import datacenter as _datacenter
_logger = _logging.getLogger(__name__)
bad_data_flag = '99999.999'
[docs]class DataCenterMag1s(_datacenter.BaseDataCenter):
""" Magnetic field data center.
>>> magdc = DataCenterMag1s()
"""
def __init__(self):
rc = _irfpyrc.Rc()
self.basepath = rc.get('vmag', 'vmagbase_1s')
if not _os.path.isdir(self.basepath):
msg = 'The path {} not found for VMAG1s data center.'.format(self.basepath)
_logger.error(msg)
raise ValueError(msg)
_datacenter.BaseDataCenter.__init__(self)
[docs] def search_files(self):
""" Search the file from the base tree.
:return:
"""
filelist = []
for _p, _d, _f in _os.walk(self.basepath):
if _p.find('VEX-V-Y-MAG-3-') == -1: # This is to check for Level-3 data
continue
for _ff in _f:
if _ff.startswith('MAG_'):
if _ff.endswith('.TAB'):
filelist.append(_os.path.join(_p, _ff))
elif _ff.endswith('.TAB.gz'):
filelist.append(_os.path.join(_p, _ff))
_logger.info("File list: Length={}".format(len(filelist)))
return filelist
[docs] def read_file(self, filename):
return MagFile.get_contents(filename)
[docs] def approximate_starttime(self, filename):
base = _os.path.basename(filename)
datesplit = base.split('_')
y = int(datesplit[1][:4])
m = int(datesplit[1][4:6])
d = int(datesplit[1][6:])
t = _datetime.datetime(y, m, d)
return t
[docs]class MagFile:
''' Class for a single MAG data file.
:param filename: File name of MAG data.
Normally this is to reading the file, the instance should be
regarded as *immutable*.
Contents of an instance is
- Time array, sorted array of the observation time
- Data dict, :class:`MagData` instance corresponding to the time
'''
flag = '99999.999'
[docs] @classmethod
def open_psa_file(self, filename):
""" Open a PSA file.
:param filename: File name
:return: An opened file object.
Note that you should close the file object explicitly after the use.
"""
if filename.lower().endswith('.gz'):
fp = _gzip.open(filename, 'rt', errors='surrogateescape')
else:
fp = open(filename, encoding='ascii', errors='surrogateescape')
return fp
[docs] @classmethod
def parse_a_file(self, filename):
import time
__t0 = time.time()
_logger.info('Parsing {}'.format(filename))
fp = self.open_psa_file(filename)
header = self.count_header(filename)
for i in range(header - 1):
fp.readline()
tlist = []
maglist = []
for line in fp:
dat = line.split()
try:
t = _parser.parse(dat[0])
mag = _np.array([float(dat[1]), float(dat[2]), float(dat[3])])
mag = _np.ma.masked_equal(mag, float(bad_data_flag))
tlist.append(t)
maglist.append(mag)
except Exception:
_logger.warning('Parsing error. Ignoring the line.')
_logger.warning(' File: {}'.format(filename))
_logger.warning(' Line: {}'.format(line))
fp.close()
objSer = _timeseries.ObjectSeries(tlist, maglist)
_logger.info('Parsing finished. {} sec'.format(time.time() - __t0))
return objSer
[docs] @classmethod
def get_contents(self, filename):
import time
__t0 = time.time()
_logger.info('Parsing start using filepair.')
objSer = _filepair.filepair(filename,
filename + '.filepair.gz',
converter=self.parse_a_file, version=__version__)
_logger.info('Parsing finished using filepair. {} sec'.format(time.time() - __t0))
return objSer
#def get_mag(t, frame=None):
# ''' Return the MAG data in the specific time.
#
# :param t: Time
# :type t: ``datetime.datetime``.
# :returns: An array of the observation time and
# corresponding magnetic field data in the shape of (3,).
#
# >>> t0 = datetime.datetime(2006, 12, 5, 7, 59, 59, 820996)
# >>> t, mag = get_mag(t0)
# >>> print(t) # This may fail if data base changed.
# 2006-12-05 07:59:56
# >>> print(mag.shape) # This may fail if data base changed.
# (3,)
# >>> print(mag) # This may fail if data base changed.
# [ 4.078 1.431 -0.049]
#
# >>> t, magsc = get_mag(t0, frame='sc')
# >>> print(t)
# 2006-12-05 07:59:56
# >>> print(magsc) # doctest: +NORMALIZE_WHITESPACE
# [3.433 0.031 2.625]
# '''
# raise NotImplementedError('')
#
# return __DataFactory.get_mag(t, frame=frame)
#
#
#def get_magarray(t0, t1, frame=None):
# ''' Return the MAG data between t0 and t1.
#
# :param t0: Start time, inclusive.
# :param t1: End time, inclusive.
# :keyword frame: Frame. ``'vso'`` (default) or ``'sc'``.
# :type frame: ``string``
# :returns: A tuple (``obstime``, ``magarray``).
# ``obstime`` is a list of the observation time.
# ``magarray`` is a numpy array with a shape of (N, 3), where N is the number of data.
#
# >>> t0 = datetime.datetime(2006, 12, 5, 7, 30)
# >>> t1 = datetime.datetime(2006, 12, 5, 7, 31)
# >>> tlist, magarr = get_magarray(t0, t1)
# >>> print(len(tlist))
# 16
# >>> print(magarr.shape)
# (16, 3)
# '''
# raise NotImplementedError('')
#
# return __DataFactory.get_magarray(t0, t1, frame=frame)
#
#
#def get_mags(t0, t1, frame=None):
# ''' Return the MAG data between t0 and t1.
#
# :param t0: Start time, inclusive.
# :param t1: End time, inclusive.
# :keyword frame: Frame. ``'vso'`` or ``'sc'``.
# :type frame: ``string``
# :returns: A tuple (``obstime``, ``list_of_vector``).
# ``obstime`` is a list of the observation time.
# ``list_of_vector`` is a list of magnetic field, namely, a list of (3,) shaped array.
#
# The function is very similar to :func:`get_magarray`, but the structure is different.
#
# This function is better used for iterating the results, while the time series analysis
# (e.g. taking averages, variations) or just plotting, you may use :func:`get_magarray`.
#
# >>> t0 = datetime.datetime(2006, 12, 5, 7, 30)
# >>> t1 = datetime.datetime(2006, 12, 5, 7, 30, 15)
# >>> tlist, maglist = get_mags(t0, t1)
# >>> print(len(tlist))
# 4
# >>> for t, b in zip(tlist, maglist):
# ... print(t, b[0], b[1], b[2])
# 2006-12-05 07:30:00 -6.163 -3.536 3.544
# 2006-12-05 07:30:04 -4.923 -3.832 4.088
# 2006-12-05 07:30:08 -4.826 -3.636 3.875
# 2006-12-05 07:30:12 -5.358 -2.949 3.315
# '''
# raise NotImplementedError('')
#
# return __DataFactory.get_mags(t0, t1, frame=frame)
#
#
#def get_obstime(t0, t1):
# ''' Get the obseration times (4-s resolution).
#
# :keyword t0: Start time
# :type t0: ``datetime.datetime``
# :keyword t1: Start time
# :type t1: ``datetime.datetime``
# :returns: List of ``datetime.datetime``.
#
# The following tests shows simple samples of using
# :meth:`get_obstime`.
# This is based on database on 110704, so tests may be failed
# according to the change in the database.
#
# >>> if not isdb():
# ... pass # doctest: +SKIP
# ... else:
# ... t0 = datetime.datetime(2006, 12, 5)
# ... t1 = datetime.datetime(2006, 12, 6)
# ... t = get_obstime(t0, t1)
# ... print(len(t)) # Database change may cause fails of the test.
# 21601
#
# >>> if not isdb():
# ... pass # doctest: +SKIP
# ... else:
# ... print(t[0]) # Database change may cause fails of the test.
# 2006-12-05 00:00:00
#
# >>> if not isdb():
# ... pass # doctest: +SKIP
# ... else:
# ... print(t[-1]) # Database change may cause fails of the test.
# 2006-12-06 00:00:00
# '''
# raise NotImplementedError('')
#
# return __DataFactory.get_obstime(t0, t1)
#
#
#def get_average_mag(t0, t1, frame=None):
# """ Return the average magnetic field over t0 and t1.
#
# Averaging has been done component-by-component.
#
# :param t0: Start time
# :param t1: Stop time
# :returns: ``(Bx, By, Bz)``
#
# >>> t0 = datetime.datetime(2012, 3, 1, 0, 0)
# >>> t1 = datetime.datetime(2012, 3, 1, 0, 10)
# >>> bx, by, bz = get_average_mag(t0, t1)
# >>> print('{:.3f} {:.3f} {:.3f}'.format(bx, by, bz))
# 4.685 0.487 -1.157
# """
# raise NotImplementedError('')
#
# tlist, mag = get_magarray(t0, t1, frame=frame)
# if len(tlist) == 0:
# return _np.array([_np.nan, _np.nan, _np.nan])
# return _np.nanmean(mag, axis=0)
#
#
#def get_avgstd_mag(t0, t1, frame=None):
# """ Return the average and std of magnetic field over t0 and t1.
#
# Average/Std has been taken component-by-component.
#
# :param t0: Start time
# :param t1: Stop time
# :returns: ``(Bx, By, Bz), (dBx, dBy, dBz)``
#
# >>> t0 = datetime.datetime(2012, 3, 1, 0, 0)
# >>> t1 = datetime.datetime(2012, 3, 1, 0, 10)
# >>> (bx, by, bz), (dbx, dby, dbz) = get_avgstd_mag(t0, t1)
# >>> print('{:.3f} {:.3f} {:.3f}'.format(bx, by, bz))
# 4.685 0.487 -1.157
# >>> print('{:.3f} {:.3f} {:.3f}'.format(dbx, dby, dbz))
# 0.551 0.742 0.460
# """
# raise NotImplementedError('')
# tlist, mag = get_magarray(t0, t1, frame=frame)
# if len(tlist) == 0:
# return _np.array([_np.nan, _np.nan, _np.nan]), _np.array([_np.nan, _np.nan, _np.nan])
# return _np.nanmean(mag, axis=0), _np.nanstd(mag, axis=0)