''' Time sereis related classes.
.. codeauthor:: Yoshifumi Futaana
.. autosummary::
ScalarSeries
Vector3dSeries
dtrange
These classes stores a time series of scalar / vector values.
For example, these may be used to represent a time profile of
the plasma density, or a time series of magnetic field vector.
.. note::
There is similar class, :class:`irfpy.util.julday.JdSeries`.
The :class:`irfpy.util.julday.JdSeries` class is slow and not
easy to handle.
Thus, developers may consider using :class:`ScalarSeries`
and :class:`Vector3dSeries`.
.. note::
The :class:`ScalarSeries` only handles ``float`` data.
:class:`Vector3dSeries` only treats 3-D vector of ``float``.
Internally they use ``ndarray``, thus they are quicker and more flexible.
'''
import datetime
import numpy as np
from scipy import interpolate
import bisect
import abc
from irfpy.util import timeseriesdb as _tsdb
from irfpy.util import exception as _ex
from irfpy.util import utc as _utc
[docs]class DataNotFound(_ex.IrfpyException):
''' Exception raised when the data is not found.
'''
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class _TimeSeriesABC(metaclass=abc.ABCMeta):
@abc.abstractmethod
def t0(self):
""" Return the time of the first data
:returns: The earliest time.
:rtype: ``datetime.datetime``
"""
pass
@abc.abstractmethod
def t1(self):
""" Return the time of the last data.
:returns: The latest time.
:rtype: ``datetime.datetime``
"""
pass
@abc.abstractmethod
def getobstime(self):
""" Return the list of the observation time.
:return: List of the observation time.
"""
pass
@abc.abstractmethod
def get_data(self, t0, t1):
''' Return the registered data.
:keyword t0: Start time (inclusive).
:type t0: ``datetime.datetime``
:keyword t1: End time (inclusive).
:type t1: ``datetime.datetime``
:returns: (``tlist``, ``dlist``) where ``tlist`` is the
list of the time as in ``datetime`` instance, and
dlist is the data as in ``ndarray`` instance.
'''
pass
@abc.abstractmethod
def clipped(self, t0, t1):
''' Return clipped data.
:param t0: Start time (inclusive)
:param t1: Stop time (inclusive)
:return: Clipped object. It should be the same class as ``self``.
'''
pass
[docs]class ObjectSeries(_TimeSeriesABC):
''' Represents time series of any object.
:param time_array: Sequence of time in ``datetime.dattime`` objects
:param data_array: Sequecne of data in any format.
Represent time-tagged data.
If the data is floating point, :class:`ScalarSeries` can be used.
If the data is 3-D real floating, :class:`VectorSeries` can be used.
This class is for arbitrary object.
'''
def __init__(self, time_array, data_sequence):
""" Initialize the series.
:param time_array: Array of time.
:param data_sequence: Array of data.
>>> tlist = [datetime.datetime(2001, 1, 10, 3),
... datetime.datetime(2001, 1, 10, 1),
... datetime.datetime(2001, 1, 10, 7),
... datetime.datetime(2001, 1, 10, 18),
... datetime.datetime(2001, 1, 10, 2)]
>>> print(len(tlist))
5
>>> dlist = [(3,), (1,), (7,), (18,), (2,)]
>>> tser = ObjectSeries(tlist, dlist)
>>> tlist2 = tser.times
>>> from pprint import pprint
>>> pprint(tlist2)
[datetime.datetime(2001, 1, 10, 1, 0),
datetime.datetime(2001, 1, 10, 2, 0),
datetime.datetime(2001, 1, 10, 3, 0),
datetime.datetime(2001, 1, 10, 7, 0),
datetime.datetime(2001, 1, 10, 18, 0)]
>>> dlist2 = tser.data
>>> print(dlist2)
[(1,), (2,), (3,), (7,), (18,)]
>>> pprint(tser.get_data())
((datetime.datetime(2001, 1, 10, 1, 0),
datetime.datetime(2001, 1, 10, 2, 0),
datetime.datetime(2001, 1, 10, 3, 0),
datetime.datetime(2001, 1, 10, 7, 0),
datetime.datetime(2001, 1, 10, 18, 0)),
((1,), (2,), (3,), (7,), (18,)))
>>> pprint(tser.get_data(datetime.datetime(2001, 1, 10, 2), datetime.datetime(2001, 1, 10, 7)))
((datetime.datetime(2001, 1, 10, 2, 0),
datetime.datetime(2001, 1, 10, 3, 0),
datetime.datetime(2001, 1, 10, 7, 0)),
((2,), (3,), (7,)))
>>> pprint(tser.get_data(datetime.datetime(2030, 1, 1), datetime.datetime(2040, 1, 1)))
((), ())
>>> for t, d in tser.iter():
... print(t, d)
2001-01-10 01:00:00 (1,)
2001-01-10 02:00:00 (2,)
2001-01-10 03:00:00 (3,)
2001-01-10 07:00:00 (7,)
2001-01-10 18:00:00 (18,)
>>> tser2 = tser.clipped(datetime.datetime(2001, 1, 10, 2), datetime.datetime(2001, 1, 10, 7))
>>> print(len(tser2))
3
>>> pprint(tser2.get_data())
((datetime.datetime(2001, 1, 10, 2, 0),
datetime.datetime(2001, 1, 10, 3, 0),
datetime.datetime(2001, 1, 10, 7, 0)),
((2,), (3,), (7,)))
>>> tser3 = tser.clipped(datetime.datetime(2030, 10, 1), datetime.datetime(2010, 10, 1))
>>> print(len(tser3))
0
"""
if len(time_array) != len(data_sequence):
raise DataNotFound('Length of time (%d) != data (%d)')
datadict = {}
for t, d in zip(time_array, data_sequence):
datadict[t] = d
self.times = sorted(datadict.keys())
self.data = [datadict[t] for t in self.times]
[docs] def t0(self):
try:
t = self.times[0]
except IndexError:
raise _ex.IrfpyException('No data contained.')
return t
[docs] def t1(self):
try:
t = self.times[-1]
except IndexError:
raise _ex.IrfpyException("No data contained.")
return t
[docs] def getobstime(self):
return self.times[:]
[docs] def get_data(self, t0=None, t1=None):
if t0 is None:
t0 = self.t0()
if t1 is None:
t1 = self.t1()
tlist = np.array(self.times, copy=True)
t_later_t0 = tlist >= t0
t_earlier_t1 = tlist <= t1
t_match = (t_later_t0 & t_earlier_t1)
idx = np.where(t_match)[0]
t_new = tuple([self.times[_i] for _i in idx])
d_new = tuple([self.data[_i] for _i in idx])
return (t_new, d_new)
[docs] def clipped(self, t0, t1):
tlist, dlist = self.get_data(t0, t1)
return ObjectSeries(tlist, dlist)
def __len__(self):
return len(self.times)
[docs] def at(self, t):
""" Return the data at the time of ``t``.
The data at the time of ``t``. The time should be exact.
Usually you can get the time of the data by :meth:`ObjectSeries.getobstime` method.
If not data is available at ``t``, :class:`DataNotFound` error is raised.
"""
try:
idx = self.times.index(t)
return self.data[idx]
except ValueError:
raise DataNotFound('Data not found for time {}.'.format(t))
[docs] def nextof(self, t):
""" Return the data next of ``t``. The next data time should be ``tdata > t``.
:param t: Time
:return: A tuple, (tnext, dnext)
:exception: If the next data is not available, :class:`DataNotFound` error is raised.
>>> tlist = [datetime.datetime(2001, 1, 10, 3),
... datetime.datetime(2001, 1, 10, 1),
... datetime.datetime(2001, 1, 10, 7),
... datetime.datetime(2001, 1, 10, 18),
... datetime.datetime(2001, 1, 10, 2)]
>>> print(len(tlist))
5
>>> dlist = [(3,), (1,), (7,), (18,), (2,)]
>>> tser = ObjectSeries(tlist, dlist)
>>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 0))
>>> print(t, d)
2001-01-10 01:00:00 (1,)
>>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 3))
>>> print(t, d)
2001-01-10 07:00:00 (7,)
>>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 17))
>>> print(t, d)
2001-01-10 18:00:00 (18,)
>>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 18)) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
timeseries.DataNotFound:
>>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 20)) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
timeseries.DataNotFound:
"""
tlist = self.times
idx = bisect.bisect_right(tlist, t)
if idx >= len(tlist):
raise DataNotFound('Data next of {:%FT%T} is not in the ObjectSeries.'.format(t))
return tlist[idx], self.data[idx]
[docs] def previousof(self, t):
""" Return the data previous of ``t``. The previous data time should be ``tdata < t``.
:param t: Time
:return: A tuple, (tnext, dnext)
:exception: If the previous data is not available, :class:`DataNotFound` error is raised.
>>> tlist = [datetime.datetime(2001, 1, 10, 3),
... datetime.datetime(2001, 1, 10, 1),
... datetime.datetime(2001, 1, 10, 7),
... datetime.datetime(2001, 1, 10, 18),
... datetime.datetime(2001, 1, 10, 2)]
>>> print(len(tlist))
5
>>> dlist = [(3,), (1,), (7,), (18,), (2,)]
>>> tser = ObjectSeries(tlist, dlist)
>>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 20))
>>> print(t, d)
2001-01-10 18:00:00 (18,)
>>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 18))
>>> print(t, d)
2001-01-10 07:00:00 (7,)
>>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 2))
>>> print(t, d)
2001-01-10 01:00:00 (1,)
>>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 1)) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
timeseries.DataNotFound:
>>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 0)) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
timeseries.DataNotFound:
"""
tlist = self.times
idx = bisect.bisect_left(tlist, t)
if idx == 0:
raise DataNotFound('Data previous of {:%FT%T} is not in the ObjectSeries.'.format(t))
return tlist[idx - 1], self.data[idx - 1]
[docs] def nearest(self, t):
""" Return the nearest data.
:param t:
:return:
"""
try:
te, de = self.nearest_no_later(t) # Exception handling needed!
except DataNotFound:
te, de = datetime.datetime.min, None
try:
tl, dl = self.nearest_no_earlier(t)
except DataNotFound:
tl, dl = datetime.datetime.max, None
dte = (t - te).total_seconds()
dtl = (tl - t).total_seconds()
if dte <= dtl:
return te, de
else:
return tl, dl
[docs] def nearest_no_later(self, t):
""" Return the data point observed earlier than or equal to ``t``, but the latest one.
:param t:
:return:
"""
tlist = self.times
idx = bisect.bisect(tlist, t)
if idx == 0:
raise DataNotFound('Time {} is earlier than the dataset. t0={}'.format(t, self.t0()))
idx = idx - 1
return self.times[idx], self.data[idx]
nearest_later_or_at = nearest_no_later
[docs] def nearest_no_earlier(self, t):
""" Return the data point observed later than or equal to ``t``, but the earliest one.
:param t:
:return:
"""
tlist = self.times
idx = bisect.bisect_left(tlist, t)
if idx == len(tlist):
raise DataNotFound('Time {} is later than the dataset. t1={}'.format(t, self.t1()))
return self.times[idx], self.data[idx]
nearest_earlier_or_at = nearest_no_earlier
[docs] def concatenate(self, other, strategy='overwrite'):
''' Concatenate other series to return a new series.
:param other: The other :class:`ObjectSeries`
:keyword strategy: Strategy to concatenate.
Default is ``'overwrite'``, in which the same data points
are overwritten by the ``other`` data.
If ``'nooverwrite'`` is given, the same data points
are kept as ``self`` data.
:returns: New :class:`OtherSeries`
Concatenate two series. If duplication happens, it overwrites by the second one (default),
or kept original (``nooverwrite``).
>>> t0 = [datetime.datetime(2008, 12, 1),
... datetime.datetime(2008, 12, 3),
... datetime.datetime(2008, 12, 5)]
>>> d0 = [1.5, 2.5, 3.5]
>>> tser0 = ObjectSeries(t0, d0)
>>> print(len(tser0))
3
>>> t1 = [datetime.datetime(2008, 12, 5),
... datetime.datetime(2008, 12, 8),
... datetime.datetime(2008, 12, 9)]
>>> d1 = [4.5, 6.5, 9.5]
>>> tser1 = ObjectSeries(t1, d1)
>>> print(len(tser1))
3
Concatenate! Note that ther is one duplication. Latter (tser1) prioritized.
>>> tser2 = tser0.concatenate(tser1)
>>> print(len(tser2))
5
>>> t2, d2 = tser2.get_data()
>>> from pprint import pprint
>>> pprint(t2)
(datetime.datetime(2008, 12, 1, 0, 0),
datetime.datetime(2008, 12, 3, 0, 0),
datetime.datetime(2008, 12, 5, 0, 0),
datetime.datetime(2008, 12, 8, 0, 0),
datetime.datetime(2008, 12, 9, 0, 0))
>>> print(d2)
(1.5, 2.5, 4.5, 6.5, 9.5)
Or, tser0 could be prioritzed.
>>> tser3 = tser0.concatenate(tser1, strategy='nooverwrite')
>>> print(tser3.nearest(datetime.datetime(2008, 12, 5))[1])
3.5
'''
t0, d0 = self.get_data()
t1, d1 = other.get_data()
if strategy == 'overwrite':
tlist = t0 + t1
dlist = d0 + d1
elif strategy == 'nooverwrite':
tlist = t1 + t0
dlist = d1 + d0
else:
raise ValueError('No strategy %s defined.' % strategy)
return ObjectSeries(tlist, dlist)
[docs] def iter(self):
return _iter_objser(self)
class _iter_objser:
def __init__(self, iter_obj):
self._cur_idx = 0
self._iter_obj = iter_obj
def __iter__(self):
return self
def __next__(self):
try:
rett, retval = self._iter_obj.times[self._cur_idx], self._iter_obj.data[self._cur_idx]
self._cur_idx += 1
return rett, retval
except IndexError:
raise StopIteration()
raise StopIteration()
[docs]class ScalarSeries(_TimeSeriesABC):
''' Represents time series of floating point scalar data.
:param time_array: Sequence of time in ``datetime.datetime`` objects.
:param data_array: Sequence of data in ``float``.
Internally, the time is in ``matplotlib.dates`` number; however,
most of the methods use ``datetime.datetime`` object as interface.
Sample follows.
First, prepare the list of the time and corresponding data.
>>> tlist = [datetime.datetime(2009, 1, 10, 12),
... datetime.datetime(2009, 1, 10, 8),
... datetime.datetime(2009, 1, 10, 4),
... datetime.datetime(2009, 1, 10, 16),
... datetime.datetime(2009, 1, 10, 0),]
>>> dlist = [0, 1, 2, 3, 4]
Then, instance the ScalarSeries object.
>>> ts = ScalarSeries(tlist, dlist)
>>> ts.shape
(2, 5)
>>> len(ts)
5
>>> tlist1, dlist1 = ts.get_data()
>>> print(dlist1) # doctest: +NORMALIZE_WHITESPACE
[4. 2. 1. 0. 3.]
If the time array has duplication, the last data is validated.
In this case, the created ScalarSeries length will be 2, not 3.
>>> tlist2 = [datetime.datetime(2009, 2, 5, 12),
... datetime.datetime(2009, 2, 5, 13),
... datetime.datetime(2009, 2, 5, 13),]
>>> dlist2 = [100, 200, 500]
>>> ts2 = ScalarSeries(tlist2, dlist2)
>>> ts2.shape
(2, 2)
>>> avg = ts2.get_average(ts2.t0(), ts2.t1())
>>> print(avg)
300.0
The coverage of the data can be get via methods of
:meth:`t0` and :meth:`t1`.
>>> print(ts.t0())
2009-01-10 00:00:00
>>> print(ts.t1(fmt="mdates"))
733417.6666666666
To get the data, you can use :meth:`get_data` method.
:meth:`get_average` and :meth:`get_moment` methods will calculate
the average and variance of the given data.
To get the data with inter (and extra) polation, use
:meth:`spline_interpolate` method.
:meth:`concatenate` method will concatenate the other given
:class:`ScalarSeries` object.
.. todo::
Zero size ScalarSeries should be implemented.
However, to do this you may need to separate the data array?
TBC.
'''
def __init__(self, time_array, data_array):
if len(time_array) != len(data_array):
raise ValueError('Length of time (%d) != data (%d)'
% (len(time_array), len(data_array)))
# Sorting.
datadict = {}
for t, d in zip(time_array, data_array):
datadict[t] = d
data = []
for t in sorted(datadict.keys()):
data.append([_utc.date2num(t), datadict[t]])
self.data = np.array(data).T
self.shape = self.data.shape
self.cache = {}
[docs] def t0(self, fmt=None):
''' Returns the first data.
:keyword fmt: If ``"mdates"`` is specified,
the corresponding object is returned.
:returns: The earliest time.
:rtype: ``datetime.datetime`` or ``float``.
'''
if fmt == "mdates":
return self.data[0, 0]
else:
return _utc.num2date(self.data[0, 0]).replace(tzinfo=None)
[docs] def t1(self, fmt=None):
''' Returns the first data.
:keyword fmt: If ``"mdates"`` is specified,
the corresponding object is returned.
:returns: The latest time.
:rtype: ``datetime.datetime`` or ``float``.
'''
if fmt == "mdates":
return self.data[0, -1]
else:
return _utc.num2date(self.data[0, -1]).replace(tzinfo=None)
[docs] def getobstime(self):
""" Return the observation time.
:return: List of observation time.
>>> scalar_series = ScalarSeries.sample_data()
>>> print(len(scalar_series))
8
>>> tlist = scalar_series.getobstime()
>>> print(len(tlist))
8
>>> print(tlist[3])
2009-01-10 08:00:00
"""
return [_utc.num2date(self.data[0, i]).replace(tzinfo=None)
for i in range(len(self))]
[docs] def get_data(self, t0=None, t1=None):
''' Return the registered data.
:keyword t0: Start time (inclusive). Default is :meth:`t0`.
:type t0: ``datetime.datetime``
:keyword t1: End time (inclusive). Default is :meth:`t1`.
:type t1: ``datetime.datetime``
:returns: (``tlist``, ``dlist``) where ``tlist`` is the
list of the time as in ``datetime`` instance, and
dlist is the data as in ``ndarray`` instance.
>>> tlist = dtrange(datetime.datetime(2009, 1, 30),
... datetime.datetime(2009, 1, 31),
... datetime.timedelta(hours=1))
>>> data = [i ** 2 for i in range(len(tlist))]
>>> ts = ScalarSeries(tlist, data)
>>> tlist2, data2 = ts.get_data(
... datetime.datetime(2009, 1, 30, 11, 30),
... datetime.datetime(2009, 1, 30, 13, 0))
>>> print(len(tlist2), len(data2))
2 2
>>> print(tlist2[0], tlist2[1])
2009-01-30 12:00:00 2009-01-30 13:00:00
>>> print('%.0f %.0f' % (data2[0], data2[1]))
144 169
'''
if t0 is None:
t0 = self.t0()
if t1 is None:
t1 = self.t1()
tn0 = _utc.date2num(t0)
tn1 = _utc.date2num(t1)
tlist = self.data[0, :]
dlist = self.data[1, :]
tlist = np.ma.masked_outside(tlist, tn0, tn1, copy=True)
dlist = np.ma.masked_array(dlist, tlist.mask)
tnlist = np.ma.compressed(tlist)
tnlist = [_utc.num2date(t).replace(tzinfo=None) for t in tnlist]
return (tnlist, np.ma.compressed(dlist))
[docs] def get_average(self, t0=None, t1=None):
''' Return the average between t0 and t1.
:param t0: Start time (inclusive)
:type t0: ``datetime.datetime``
:param t1: End time (inclusive)
:type t1: ``datetime.datetime``
:returns: Average.
Use :meth:`get_moment` for more precise use.
>>> tlist = dtrange(datetime.datetime(2009, 1, 30),
... datetime.datetime(2009, 1, 31),
... datetime.timedelta(hours=1))
>>> data = [i ** 2 for i in range(len(tlist))]
>>> ts = ScalarSeries(tlist, data)
The average below should be **(144+169+196)/3 = 169.667**.
>>> print('%.3f' % ts.get_average(
... datetime.datetime(2009, 1, 30, 11, 30),
... datetime.datetime(2009, 1, 30, 14, 0)))
169.667
'''
if t0 is None:
t0 = self.t0()
if t1 is None:
t1 = self.t1()
tlist, dlist = self.get_data(t0, t1)
return dlist.mean()
[docs] def get_moment(self, t0=None, t1=None):
''' Return the number of data, average and variance.
:param t0: Start time (inclusive)
:type t0: ``datetime.datetime``
:param t1: End time (inclusive)
:type t1: ``datetime.datetime``
:returns: (number of data, average, variance)
>>> tlist = dtrange(datetime.datetime(2009, 1, 30),
... datetime.datetime(2009, 1, 31),
... datetime.timedelta(hours=1))
>>> data = [i ** 2 for i in range(len(tlist))]
>>> ts = ScalarSeries(tlist, data)
The average below should be (121+144+169+196)/4 = 157.5
Variance is (36.5^2 + 13.5^2 + 11.5^2 + 38.5^2)/4 = 782.25
>>> ndat, ave, var = ts.get_moment(
... datetime.datetime(2009, 1, 30, 11, 0),
... datetime.datetime(2009, 1, 30, 14, 0))
>>> print(ndat)
4
>>> print('%.3f' % ave)
157.500
>>> print('%.3f' % var)
782.250
'''
if t0 is None:
t0 = self.t0()
if t1 is None:
t1 = self.t1()
tlist, dlist = self.get_data(t0, t1)
return len(dlist), dlist.mean(), dlist.var()
def _nearest(self, t):
''' Return the nearest data. Single arg version.
:param t: Datetime.
'''
tn = _utc.date2num(t)
idx = bisect.bisect(self.data[0, :], tn)
if idx == len(self):
idx = idx - 1
if idx == 0:
return self.data[1, 0]
tn0 = np.abs(tn - self.data[0, idx - 1])
tn1 = np.abs(tn - self.data[0, idx])
if tn0 <= tn1:
return self.data[1, idx - 1]
else:
return self.data[1, idx]
[docs] def nearest(self, t):
''' Return the nearest data.
:param t: Scalar or list of time.
'''
tlist = np.array(t)
if tlist.shape == ():
return self._nearest(t)
else:
d = np.array([self._nearest(ti) for ti in tlist])
return d
[docs] def linear_interpolate(self, t):
""" Linear interpolation.
:param t: Time or a sequence of times.
:return: Interpolated data.
>>> tlist = [datetime.datetime(2009, 1, 10, 12),
... datetime.datetime(2009, 1, 10, 8),
... datetime.datetime(2009, 1, 10, 4),
... datetime.datetime(2009, 1, 10, 16),
... datetime.datetime(2009, 1, 10, 0),]
>>> dlist = [1, 2, 3, 2, 2]
>>> ts = ScalarSeries(tlist, dlist)
>>> print(ts.linear_interpolate(datetime.datetime(2009, 1, 10, 6)))
2.5
>>> print(ts.linear_interpolate([datetime.datetime(2009, 1, 10, 6), datetime.datetime(2009, 1, 10, 14)]))
[2.5 1.5]
"""
tn = _utc.date2num(t)
from scipy.interpolate import interp1d
f = interp1d(self.data[0, :], self.data[1, :])
return f(tn)
[docs] def spline_interpolate(self, t, extrapolate=False):
''' Interpolate using spline fitting.
Interpolation can be done using Spline fitting.
:param t: Time, sequence allowed.
:type t: ``datetime.datetime``
:keyword extrapolate: Extrapolation handling. If ``True`` is
set, extrapolation is allowed.
If ``False`` is set (default), extrapolation is not allowed.
You may also set ``datetime.timedelta`` object here to set
"glues" of the data.
In this case, the inter/extra-polation between
(:meth:`t0` - extrapolate) and (:meth:`t1` + extrapolate)
is allowed.
:raises ValueError: If non-allowed extrapolation was executed.
By default extrapolation is not allowed, because
it may produce unrealistic values.
>>> tlist = [datetime.datetime(2009, 1, 10, 12),
... datetime.datetime(2009, 1, 10, 8),
... datetime.datetime(2009, 1, 10, 4),
... datetime.datetime(2009, 1, 10, 16),
... datetime.datetime(2009, 1, 10, 0),]
>>> dlist = [1, 2, 3, 2, 2]
>>> ts = ScalarSeries(tlist, dlist)
>>> print('%.3f' % (
... ts.spline_interpolate(datetime.datetime(2009, 1, 10, 8))))
2.000
>>> print('%.3f' % (
... ts.spline_interpolate(datetime.datetime(2009, 1, 10, 6))))
2.625
You may give a list as argumnet, too.
>>> vals = ts.spline_interpolate([datetime.datetime(2009, 1, 10, 1),
... datetime.datetime(2009, 1, 10, 5),
... datetime.datetime(2009, 1, 10, 9),])
>>> print('%.3f %.3f %.3f' % (vals[0], vals[1], vals[2]))
2.547 2.859 1.672
Extrapolation is not allowed by default, but you can set
extrapolation keyword to do it.
>>> print('%.3f' % (
... ts.spline_interpolate(datetime.datetime(2009, 1, 11, 0),
... extrapolate=True)))
18.000
You may also allow the extrapolation-allowed range with
``datetime.timedelta`` object.
See ``scripts/timeseries_sample1`` also (TBD).
>>> print('%.3f' % (
... ts.spline_interpolate(datetime.datetime(2009, 1, 9, 0),
... extrapolate=datetime.timedelta(days=1))))
-158.000
'''
if extrapolate is True:
ex0 = datetime.datetime(datetime.MINYEAR, 1, 1)
ex1 = datetime.datetime(datetime.MAXYEAR, 12, 31)
elif extrapolate is False:
ex0 = self.t0()
ex1 = self.t1()
else:
ex0 = self.t0() - extrapolate
ex1 = self.t1() + extrapolate
ex0 = _utc.date2num(ex0)
ex1 = _utc.date2num(ex1)
tn = _utc.date2num(t)
if (np.array(tn) < ex0).any() | (np.array(tn) > ex1).any():
raise ValueError(
'Some of the given time is out of the range.\n' +
'Use extrapol keyword to avoid this exception,\n' +
'while large error may be introduced.')
try:
spl = self.cache['spl']
except KeyError:
spl = interpolate.InterpolatedUnivariateSpline(self.data[0, :], self.data[1, :])
self.cache['spl'] = spl
return spl(tn)
[docs] def clear_cache(self):
''' Clear cache (for spline)
'''
self.cache = {}
[docs] def concatenate(self, other, strategy='overwrite'):
''' Concatenate other series to return a new series.
:param other: The other :class:`ScalarSeries`
:keyword strategy: Strategy to concatenate.
Default is ``'overwrite'``, in which the same data points
are overwritten by the ``other`` data.
If ``'nooverwrite'`` is given, the same data points
are kept as ``self`` data.
:returns: New :class:`ScalarSeries`
Prepare the first ScalarSeries with 3 records.
>>> t0 = [datetime.datetime(2008, 12, 1),
... datetime.datetime(2008, 12, 3),
... datetime.datetime(2008, 12, 5)]
>>> d0 = [1.5, 2.5, 3.5]
>>> ts0 = ScalarSeries(t0, d0)
Then, create the second ScalarSeries with 3 records.
>>> t1 = [datetime.datetime(2008, 12, 5),
... datetime.datetime(2008, 12, 8),
... datetime.datetime(2008, 12, 9)]
>>> d1 = [4.5, 6.5, 9.5]
>>> ts1 = ScalarSeries(t1, d1)
Concatenate them. Remember 1 record is duplicated.
Therefore, only 5 records are in the new ScalarSeries.
>>> ts2 = ts0.concatenate(ts1)
>>> print(len(ts2))
5
The duplicated record is overwritten by ``ts1``.
>>> print(ts2.nearest(datetime.datetime(2008, 12, 5)))
4.5
If 'nooverwrite strategy is taken, ``ts0`` is used.
>>> ts2 = ts0.concatenate(ts1, strategy='nooverwrite')
>>> print(ts2.nearest(datetime.datetime(2008, 12, 5)))
3.5
'''
t0, d0 = self.get_data()
t1, d1 = other.get_data()
if strategy == 'overwrite':
t = np.concatenate([t0, t1])
d = np.concatenate([d0, d1])
elif strategy == 'nooverwrite':
t = np.concatenate([t1, t0])
d = np.concatenate([d1, d0])
else:
raise ValueError('No strategy %s defined.' % strategy)
return ScalarSeries(t, d)
def __len__(self):
return len(self.data[0, :])
[docs] @classmethod
def sample_data(cls):
''' Get a sample data.
'''
tlist = [datetime.datetime(2009, 1, 10, 0),
datetime.datetime(2009, 1, 10, 1),
datetime.datetime(2009, 1, 10, 3),
datetime.datetime(2009, 1, 10, 8),
datetime.datetime(2009, 1, 10, 10),
datetime.datetime(2009, 1, 10, 11),
datetime.datetime(2009, 1, 10, 15),
datetime.datetime(2009, 1, 10, 21), ]
data = [0.0, 1.5, 2.8, 9.8, 9.2, 9.9, 16.3, 22.5]
return ScalarSeries(tlist, data)
[docs] def clipped(self, t0, t1):
'''Return clipped :class:`ScalarSeries`.
:param t0: Start time
:param t1: Stop time
:return: clipped data between t0 and t1.
:rtype: ScalarSeries
>>> scalar_series = ScalarSeries.sample_data()
>>> print(len(scalar_series))
8
>>> clipped_scalar_series = scalar_series.clipped(
... datetime.datetime(2009, 1, 10, 7),
... datetime.datetime(2009, 1, 10, 15))
>>> print(len(clipped_scalar_series))
4
>>> print(clipped_scalar_series.t0())
2009-01-10 08:00:00
>>> print(clipped_scalar_series.t1())
2009-01-10 15:00:00
'''
tlist, dlist = self.get_data(t0, t1)
return ScalarSeries(tlist, dlist)
[docs]class Vector3dSeries(_TimeSeriesABC):
''' Time series of 3-D vector.
Internally each component uses :class:`ScalarSeries`.
This is a redundunt approch (time_array is repeated three times)
but simple implementation wins.
>>> tlist = [datetime.datetime(2009, 1, 10, 0),
... datetime.datetime(2009, 1, 10, 4),
... datetime.datetime(2009, 1, 10, 8),
... datetime.datetime(2009, 1, 10, 12),
... datetime.datetime(2009, 1, 10, 16),]
>>> xlist = [0, 0.5, 1, 0.5, 0]
>>> ylist = [1, 0.5, 0, -0.5, -1]
>>> zlist = [0, 1, 2, 3, 4]
>>> vs = Vector3dSeries(tlist, xlist, ylist, zlist)
To get the data, you can use :meth:`get_data` method.
>>> t0 = datetime.datetime(2009, 1, 10, 1)
>>> t1 = datetime.datetime(2009, 1, 10, 8)
>>> t, v = vs.get_data(t0, t1)
>>> print(len(t))
2
>>> print(t[0])
2009-01-10 04:00:00
>>> print(t[1])
2009-01-10 08:00:00
>>> print(v.shape)
(3, 2)
>>> print(v[:, 0]) # doctest: +NORMALIZE_WHITESPACE
[0.5 0.5 1. ]
>>> print(v[:, 1]) # doctest: +NORMALIZE_WHITESPACE
[1. 0. 2.]
You may also use :meth:`get_average` and :meth:`get_moment` methods
for the average or moment values.
>>> ave = vs.get_average(t0, t1)
>>> print('%.2f %.2f %.2f' % (ave[0], ave[1], ave[2]))
0.75 0.25 1.50
>>> n, ave, var = vs.get_moment(t0, t1)
>>> print(n)
2
>>> print(ave) # doctest: +NORMALIZE_WHITESPACE
[0.75 0.25 1.5 ]
>>> print(var) # doctest: +NORMALIZE_WHITESPACE
[0.0625 0.0625 0.25 ]
For interpolation, you can use :meth:`spline_interpolate`.
>>> t_interpol = [t0, t1]
>>> val_interpol = vs.spline_interpolate(t_interpol)
>>> from irfpy.util.with_context import printoptions
>>> with printoptions(precision=2, suppress=True):
... print(val_interpol.shape)
... print(val_interpol[:, 0]) # doctest: +NORMALIZE_WHITESPACE
... # print val_interpol[:, 1]
(3, 2)
[0.04 0.88 0.25]
# [ 1. 0. 2.]
.. todo::
The doctest may make non-unique solution (0. or -0.)
so that I disabled the test on 2013-09-03.
One has to make better test routine here.
'''
def __init__(self, time_array, x_array, y_array, z_array):
''' Instance the :class:`Vector3dSeries` object
:param time_array: Sequence of time in ``datetime.datetime`` objects.
:param x_array: Sequence of x-component in ``float``.
:param y_array: Sequence of y-component in ``float``.
:param z_array: Sequence of z-component in ``float``.
'''
self.x = ScalarSeries(time_array, x_array)
self.y = ScalarSeries(time_array, y_array)
self.z = ScalarSeries(time_array, z_array)
[docs] def spline_interpolate(self, t, extrapolate=False):
''' Spline interpolation.
Returns the spline interpolation. Interpolation is component-wise.
See also :meth:`ScalarSeries.spline_interpolate` for details.
:param t: ``datetime.datetime`` object or sequence of it.
:keyword extrapolate: Extrapolation handling.
See :meth:`ScalarSeries.spline_interpolate` for details.
:returns: Interpolated vector (component wise).
If ``t`` is a ``datetime.datetime`` object, ``np.array`` with
shape of (3, ) is returned.
If ``t`` is a sequence of ``datetime.datetime`` with *N* length,
``np.array`` with shape of (3, *N*) is returned.
'''
x = self.x.spline_interpolate(t, extrapolate=extrapolate)
y = self.y.spline_interpolate(t, extrapolate=extrapolate)
z = self.z.spline_interpolate(t, extrapolate=extrapolate)
return np.array([x, y, z])
[docs] def t0(self):
''' Return the first data time
'''
return self.x.t0()
[docs] def t1(self):
''' Return the last data time
'''
return self.x.t1()
[docs] def getobstime(self):
''' Return the observation time.
:return: List of observation time.
'''
return self.x.getobstime()
[docs] def get_data(self, t0=None, t1=None):
''' Return the data in between ``t0`` and ``t1``.
:param t0: Start time (inclusive)
:type t0: ``datetime.datetime``
:param t1: End time (inclusive)
:type t1: ``datetime.datetime``
:returns: The data in between ``t0`` and ``t1``.
``ndarray`` with the shape of (3, N) where N
is the number of data.
'''
t, x = self.x.get_data(t0, t1)
t, y = self.y.get_data(t0, t1)
t, z = self.z.get_data(t0, t1)
return (t, np.array([x, y, z]))
[docs] def get_timeseries_magnitude(self):
""" Return the time series of the magnitude data.
:return: Time series of magnitude data.
:rtype: :class:`ScalarSeries`.
>>> vecseries = Vector3dSeries.sample_data()
>>> print(len(vecseries))
5
>>> t, v = vecseries.get_data()
>>> magseries = vecseries.get_timeseries_magnitude()
>>> t, l = magseries.get_data()
>>> print(v)
[[ 0. 0.5 1. 0.5 0. ]
[ 1. 0.5 0. -0.5 -1. ]
[ 0. 1. 2. 3. 4. ]]
>>> print(l) # doctest: +NORMALIZE_WHITESPACE
[1. 1.22474487 2.23606798 3.082207 4.12310563]
"""
tlist, xyz = self.get_data()
mag = np.sqrt((xyz ** 2).sum(axis=0))
return ScalarSeries(tlist, mag)
[docs] def clipped(self, t0, t1):
""" Return clipped Vector3dSeries.
:param t0: Start time
:param t1: Stop time
:return: New Vector3dSeries
>>> tser3d = Vector3dSeries.sample_data()
>>> print(len(tser3d))
5
>>> tser3d = tser3d.clipped(datetime.datetime(2009, 1, 10, 6),
... datetime.datetime(2009, 1, 10, 15))
>>> print(len(tser3d))
2
>>> tlist, dlist = tser3d.get_data()
>>> print(dlist)
[[ 1. 0.5]
[ 0. -0.5]
[ 2. 3. ]]
"""
t, x = self.x.get_data(t0, t1)
t, y = self.y.get_data(t0, t1)
t, z = self.z.get_data(t0, t1)
return Vector3dSeries(t, x, y, z)
[docs] def get_average(self, t0=None, t1=None):
''' Return the average.
:param t0: Start time (inclusive)
:type t0: ``datetime.datetime``
:param t1: End time (inclusive)
:type t1: ``datetime.datetime``
:returns: Average
:rtype: ``ndarray`` with shape of (3,)
'''
if t0 is None:
t0 = self.t0()
if t1 is None:
t1 = self.t1()
x = self.x.get_average(t0, t1)
y = self.y.get_average(t0, t1)
z = self.z.get_average(t0, t1)
return np.array([x, y, z])
[docs] def get_moment(self, t0=None, t1=None):
''' Return the moment.
:param t0: Start time (inclusive)
:type t0: ``datetime.datetime``
:param t1: End time (inclusive)
:type t1: ``datetime.datetime``
:returns: A tuple.
(number of data,
average in (3,) ``ndarray``,
variance in (3,) ``ndarray``)
'''
if t0 is None:
t0 = self.t0()
if t1 is None:
t1 = self.t1()
xn, xa, xv = self.x.get_moment(t0, t1)
yn, ya, yv = self.y.get_moment(t0, t1)
zn, za, zv = self.z.get_moment(t0, t1)
return (xn, np.array([xa, ya, za]), np.array([xv, yv, zv]))
[docs] def clear_cache(self):
self.x.clear_cache()
self.y.clear_cache()
self.z.clear_cache()
[docs] def nearest(self, t):
x = self.x.nearest(t)
y = self.y.nearest(t)
z = self.z.nearest(t)
return np.array([x, y, z])
[docs] def concatenate(self, other, strategy='overwrite'):
''' Concatenate other series to return a new series.
:param other: The other :class:`Vector3dSeries`
:keyword strategy: Strategy to concatenate.
Default is ``'overwrite'``, in which the same data points
are overwritten by the ``other`` data.
If ``'nooverwrite'`` is given, the same data points
are kept as ``self`` data.
:returns: New :class:`VectorSeries`
>>> t0 = dtrange(datetime.datetime(2005, 1, 1),
... datetime.datetime(2005, 1, 2),
... datetime.timedelta(hours=6)) # 4 elements
>>> print(len(t0))
4
>>> x0 = [0, 1, 2, 3]
>>> y0 = [0, -1, -2, -3]
>>> z0 = [0, 0, 0, 0]
>>> v0 = Vector3dSeries(t0, x0, y0, z0)
>>> t1 = dtrange(datetime.datetime(2005, 1, 2),
... datetime.datetime(2005, 1, 2, 12),
... datetime.timedelta(hours=4)) # 3 elements
>>> x1 = [4, 5, 6]
>>> y1 = [-2, -1, 0]
>>> z1 = [1, 1, 1]
>>> v1 = Vector3dSeries(t1, x1, y1, z1)
>>> v2 = v0.concatenate(v1)
>>> print(len(v2))
7
>>> t2, d2 = v2.get_data()
>>> print(d2.shape)
(3, 7)
>>> print(d2[0, :]) # doctest: +NORMALIZE_WHITESPACE
[0. 1. 2. 3. 4. 5. 6.]
>>> print(d2[1, :])
[ 0. -1. -2. -3. -2. -1. 0.]
>>> print(d2[2, :]) # doctest: +NORMALIZE_WHITESPACE
[0. 0. 0. 0. 1. 1. 1.]
'''
x = self.x.concatenate(other.x, strategy=strategy)
y = self.y.concatenate(other.y, strategy=strategy)
z = self.z.concatenate(other.z, strategy=strategy)
t, x = x.get_data()
t, y = y.get_data()
t, z = z.get_data()
return Vector3dSeries(t, x, y, z)
def __len__(self):
return len(self.x)
[docs] @classmethod
def sample_data(cls):
''' Return a sample data.
'''
tlist = [datetime.datetime(2009, 1, 10, 0),
datetime.datetime(2009, 1, 10, 4),
datetime.datetime(2009, 1, 10, 8),
datetime.datetime(2009, 1, 10, 12),
datetime.datetime(2009, 1, 10, 16), ]
xlist = [0, 0.5, 1, 0.5, 0]
ylist = [1, 0.5, 0, -0.5, -1]
zlist = [0, 1, 2, 3, 4]
return Vector3dSeries(tlist, xlist, ylist, zlist)
def __str__(self):
s = '<irfpy.util.timeseries.Vector3dSeries: Len={l:d} T0={t0:%FT%T} T1={t1:%FT%T}>'.format(
l=len(self), t0=self.t0(), t1=self.t1())
return s
[docs]class TimeStepScalarField3D(_TimeSeriesABC):
r""" Time series of scalar field.
This class extends the functionality of
:class:`irfpy.util.fields.ScalarField3D`
class and their derivatives in order to support the time series of fields.
.. note::
This contains several :class:`ScalarField3D`, so that the system should sometimes
have a very large memory. No memory handling is supported.
This means that all the data is on memory.
*Sample*
Let's look at a scalar field, i.e. gravity potential,
produced by a moving point of mass: :math:`(x_m=vt, 0, 0)`.
The gravity potential is obtained from :class:`irfpy.util.fields.GravityPotential` class.
First, just create the potential field at ``t=0``, i.e. ``x_m=0``. The mass is taken
as :math:`M=\frac{1}{6.67408\times 10^{-11}}`, then, :math:`GM=1`.
Note that the time should be ``datetime.datetime`` object.
Thus, a dummy time (with respective to a given epoch).
>>> import datetime
>>> t0 = datetime.datetime(2010, 1, 1, 0, 0, 0) # An epoch. This case, it is a dummy time.
.. math::
\phi(r) = -\frac{GM}{r} = -\frac{1}{r}
>>> import irfpy.util.fields
>>> m = 1 / 6.67408e-11
>>> potential_t0 = irfpy.util.fields.GravityPotential(m, center=(0, 0, 0))
Just to confirm if it works, check the potential at ``(0, 4, 0)``.
>>> print(potential_t0([0, 4, 0]))
-0.25
Then, as time goes, the point of mass moves. Here I take ``v=2.5``.
>>> v=2.5
At time ``t=1``, the position of the mass is at ``x_m = v (=2.5)``.
>>> t=1
>>> xm_1 = (v * t, 0, 0)
>>> potential_t1 = irfpy.util.fields.GravityPotential(m, center=xm_1)
Now the observer at (0, 4, 0) see different potential, the distance is now
:math:`|(2.5, 4.0, 0)|=4.717` and the potential is :math:`\phi(r, t=1)=-1/r=-0.212`.
>>> print('{:.3f}'.format(potential_t1([0, 4, 0])))
-0.212
At time ``t=3``, the position moves to ``x_m = 3v = 7.5``.
>>> t=3
>>> xm_3 = (v * t, 0, 0)
>>> potential_t3 = irfpy.util.fields.GravityPotential(m, center=xm_3)
>>> print('{:.3f}'.format(potential_t3([0, 4, 0])))
-0.118
A bit more time steps are added as follows.
>>> si = (0, 1, 3, 5, 8)
>>> ti = [t0 + datetime.timedelta(seconds=_t) for _t in si]
>>> potential_list = []
>>> for _t in si:
... xm_t = (v * _t, 0, 0)
... potential_t = irfpy.util.fields.GravityPotential(m, center=xm_t)
... potential_list.append(potential_t)
Now it is time to create a :class:`TimeStepScalarField`.
>>> tsfield = TimeStepScalarField3D(ti, potential_list)
First, you can get the field directly. The following is the same as
>>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=0)))) # At (0, 4, 0) at t=0
-0.250
>>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=1)))) # At (0, 4, 0) at t=1
-0.212
>>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=3)))) # At (0, 4, 0) at t=3
-0.118
Timewise, the linear interpolated value is returned.
Spacewise, the default interpolation of the given field (irfpy.util.fields.ScalarField3D)
is used.
>>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=2)))) # At (0, 4, 0) at t=2 is not given, but the returned is linear interpolation.
-0.165
>>> print(tsfield.index_of_time(t0))
0
>>> print(tsfield.index_of_time(t0 + datetime.timedelta(seconds=4)))
2
>>> print(tsfield.index_of_time(t0 + datetime.timedelta(seconds=9)))
4
"""
def __init__(self, time_list, field_list):
if len(time_list) != len(field_list):
raise ValueError('The given time_list and field_list does not have same length.')
datadict = {}
for _t, _f in zip(time_list, field_list):
datadict[_t] = _f
self._tlist = [] # List is in the ascending order
self._flist = [] # Index is corresponding to tlist
for t in sorted(datadict.keys()):
self._tlist.append(t)
self._flist.append(datadict[t])
self._time2index = _tsdb.DB()
for _i, _t in enumerate(self._tlist):
self._time2index.append('{:d}'.format(_i), _t)
[docs] def t0(self):
return self._tlist[0]
[docs] def t1(self):
return self._tlist[1]
[docs] def getobstime(self):
return self._tlist.copy()
[docs] def get_data(self, t0=None, t1=None):
if t0 is None:
t0 = self.t0()
if t1 is None:
t1 = self.t1()
new_tlist = []
new_flist = []
for _t, _f in zip(self._tlist, self._flist):
if _t >= t0 and _t <= t1:
new_tlist.append(_t)
new_flist.append(_f)
return new_tlist, new_flist
[docs] def clipped(self, t0, t1):
tlist, dlist = self.get_data(t0, t1)
return TimeStepScalarField3D(tlist, dlist)
[docs] def index_of_time(self, t):
return int(self._time2index.get(t))
def __call__(self, xyz, tt):
# First, identify the corresponding field
i0 = self.index_of_time(tt)
t0 = self._tlist[i0]
f0 = self._flist[i0]
try:
t1 = self._tlist[i0 + 1]
except IndexError:
return ValueError('The given time is out of bounds.')
f1 = self._flist[i0 + 1]
val0 = f0(xyz)
val1 = f1(xyz)
dt0 = (tt - t0).total_seconds()
dt1 = (t1 - tt).total_seconds()
gamma = dt0 / (dt0 + dt1)
return val0 + (val1 - val0) * gamma
[docs]def dtrange(t0, t1, delta):
''' Return a regularly separated list of ``datatime`` objects.
:param t0: Start
:type t0: ``datetime.datetime``
:param t1: Stop (not inclusive)
:type t1: ``datetime.datetime``
:param delta: Delta
:type delta: ``datetime.timedelta``.
:returns: List of ``datetime.datetime``.
>>> r0 = dtrange(datetime.datetime(2009, 1, 1),
... datetime.datetime(2009, 1, 2),
... datetime.timedelta(hours=1))
>>> print(len(r0))
24
>>> print(r0[0])
2009-01-01 00:00:00
>>> print(r0[-1])
2009-01-01 23:00:00
'''
tlist = []
t = t0
while t < t1:
tlist.append(t)
t = t + delta
return tlist