''' Time sereis related classes.
.. codeauthor:: Yoshifumi Futaana
.. autosummary::
ScalarSeries
Vector3dSeries
dtrange
These classes stores a time series of scalar / vector values.
For example, these may be used to represent a time profile of
the plasma density, or a time series of magnetic field vector.
.. note::
There is similar class, :class:`irfpy.util.julday.JdSeries`.
The :class:`irfpy.util.julday.JdSeries` class is slow and not
easy to handle.
Thus, developers may consider using :class:`ScalarSeries`
and :class:`Vector3dSeries`.
.. note::
The :class:`ScalarSeries` only handles ``float`` data.
:class:`Vector3dSeries` only treats 3-D vector of ``float``.
Internally they use ``ndarray``, thus they are quicker and more flexible.
'''
import datetime
import numpy as np
from scipy import interpolate
import matplotlib.dates as mdates
import bisect
import abc
from irfpy.util import timeseriesdb as _tsdb
from irfpy.util import exception as _ex
[docs]class DataNotFound(_ex.IrfpyException):
''' Exception raised when the data is not found.
'''
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class _TimeSeriesABC(metaclass=abc.ABCMeta):
@abc.abstractmethod
def t0(self):
""" Return the time of the first data
:returns: The earliest time.
:rtype: ``datetime.datetime``
"""
pass
@abc.abstractmethod
def t1(self):
""" Return the time of the last data.
:returns: The latest time.
:rtype: ``datetime.datetime``
"""
pass
@abc.abstractmethod
def getobstime(self):
""" Return the list of the observation time.
:return: List of the observation time.
"""
pass
@abc.abstractmethod
def get_data(self, t0, t1):
''' Return the registered data.
:keyword t0: Start time (inclusive).
:type t0: ``datetime.datetime``
:keyword t1: End time (inclusive).
:type t1: ``datetime.datetime``
:returns: (``tlist``, ``dlist``) where ``tlist`` is the
list of the time as in ``datetime`` instance, and
dlist is the data as in ``ndarray`` instance.
'''
pass
@abc.abstractmethod
def clipped(self, t0, t1):
''' Return clipped data.
:param t0: Start time (inclusive)
:param t1: Stop time (inclusive)
:return: Clipped object. It should be the same class as ``self``.
'''
pass
[docs]class ObjectSeries(_TimeSeriesABC):
''' Represents time series of any object.
:param time_array: Sequence of time in ``datetime.dattime`` objects
:param data_array: Sequecne of data in any format.
Represent time-tagged data.
If the data is floating point, :class:`ScalarSeries` can be used.
If the data is 3-D real floating, :class:`VectorSeries` can be used.
This class is for arbitrary object.
'''
def __init__(self, time_array, data_sequence):
""" Initialize the series.
:param time_array: Array of time.
:param data_sequence: Array of data.
>>> tlist = [datetime.datetime(2001, 1, 10, 3),
... datetime.datetime(2001, 1, 10, 1),
... datetime.datetime(2001, 1, 10, 7),
... datetime.datetime(2001, 1, 10, 18),
... datetime.datetime(2001, 1, 10, 2)]
>>> print(len(tlist))
5
>>> dlist = [(3,), (1,), (7,), (18,), (2,)]
>>> tser = ObjectSeries(tlist, dlist)
>>> tlist2 = tser.times
>>> from pprint import pprint
>>> pprint(tlist2)
[datetime.datetime(2001, 1, 10, 1, 0),
datetime.datetime(2001, 1, 10, 2, 0),
datetime.datetime(2001, 1, 10, 3, 0),
datetime.datetime(2001, 1, 10, 7, 0),
datetime.datetime(2001, 1, 10, 18, 0)]
>>> dlist2 = tser.data
>>> print(dlist2)
[(1,), (2,), (3,), (7,), (18,)]
>>> pprint(tser.get_data())
((datetime.datetime(2001, 1, 10, 1, 0),
datetime.datetime(2001, 1, 10, 2, 0),
datetime.datetime(2001, 1, 10, 3, 0),
datetime.datetime(2001, 1, 10, 7, 0),
datetime.datetime(2001, 1, 10, 18, 0)),
((1,), (2,), (3,), (7,), (18,)))
>>> pprint(tser.get_data(datetime.datetime(2001, 1, 10, 2), datetime.datetime(2001, 1, 10, 7)))
((datetime.datetime(2001, 1, 10, 2, 0),
datetime.datetime(2001, 1, 10, 3, 0),
datetime.datetime(2001, 1, 10, 7, 0)),
((2,), (3,), (7,)))
>>> pprint(tser.get_data(datetime.datetime(2030, 1, 1), datetime.datetime(2040, 1, 1)))
((), ())
>>> for t, d in tser.iter():
... print(t, d)
2001-01-10 01:00:00 (1,)
2001-01-10 02:00:00 (2,)
2001-01-10 03:00:00 (3,)
2001-01-10 07:00:00 (7,)
2001-01-10 18:00:00 (18,)
>>> tser2 = tser.clipped(datetime.datetime(2001, 1, 10, 2), datetime.datetime(2001, 1, 10, 7))
>>> print(len(tser2))
3
>>> pprint(tser2.get_data())
((datetime.datetime(2001, 1, 10, 2, 0),
datetime.datetime(2001, 1, 10, 3, 0),
datetime.datetime(2001, 1, 10, 7, 0)),
((2,), (3,), (7,)))
>>> tser3 = tser.clipped(datetime.datetime(2030, 10, 1), datetime.datetime(2010, 10, 1))
>>> print(len(tser3))
0
"""
if len(time_array) != len(data_sequence):
raise DataNotFound('Length of time (%d) != data (%d)')
datadict = {}
for t, d in zip(time_array, data_sequence):
datadict[t] = d
self.times = sorted(datadict.keys())
self.data = [datadict[t] for t in self.times]
[docs] def t0(self):
try:
t = self.times[0]
except IndexError:
raise _ex.IrfpyException('No data contained.')
return t
[docs] def t1(self):
try:
t = self.times[-1]
except IndexError:
raise _ex.IrfpyException("No data contained.")
return t
[docs] def getobstime(self):
return self.times[:]
[docs] def get_data(self, t0=None, t1=None):
if t0 is None:
t0 = self.t0()
if t1 is None:
t1 = self.t1()
tlist = np.array(self.times, copy=True)
t_later_t0 = tlist >= t0
t_earlier_t1 = tlist <= t1
t_match = (t_later_t0 & t_earlier_t1)
idx = np.where(t_match)[0]
t_new = tuple([self.times[_i] for _i in idx])
d_new = tuple([self.data[_i] for _i in idx])
return (t_new, d_new)
[docs] def clipped(self, t0, t1):
tlist, dlist = self.get_data(t0, t1)
return ObjectSeries(tlist, dlist)
def __len__(self):
return len(self.times)
[docs] def at(self, t):
""" Return the data at the time of ``t``.
The data at the time of ``t``. The time should be exact.
Usually you can get the time of the data by :meth:`ObjectSeries.getobstime` method.
If not data is available at ``t``, :class:`DataNotFound` error is raised.
"""
try:
idx = self.times.index(t)
return self.data[idx]
except ValueError:
raise DataNotFound('Data not found for time {}.'.format(t))
[docs] def nextof(self, t):
""" Return the data next of ``t``. The next data time should be ``tdata > t``.
:param t: Time
:return: A tuple, (tnext, dnext)
:exception: If the next data is not available, :class:`DataNotFound` error is raised.
>>> tlist = [datetime.datetime(2001, 1, 10, 3),
... datetime.datetime(2001, 1, 10, 1),
... datetime.datetime(2001, 1, 10, 7),
... datetime.datetime(2001, 1, 10, 18),
... datetime.datetime(2001, 1, 10, 2)]
>>> print(len(tlist))
5
>>> dlist = [(3,), (1,), (7,), (18,), (2,)]
>>> tser = ObjectSeries(tlist, dlist)
>>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 0))
>>> print(t, d)
2001-01-10 01:00:00 (1,)
>>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 3))
>>> print(t, d)
2001-01-10 07:00:00 (7,)
>>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 17))
>>> print(t, d)
2001-01-10 18:00:00 (18,)
>>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 18)) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
timeseries.DataNotFound:
>>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 20)) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
timeseries.DataNotFound:
"""
tlist = self.times
idx = bisect.bisect_right(tlist, t)
if idx >= len(tlist):
raise DataNotFound('Data next of {:%FT%T} is not in the ObjectSeries.'.format(t))
return tlist[idx], self.data[idx]
[docs] def previousof(self, t):
""" Return the data previous of ``t``. The previous data time should be ``tdata < t``.
:param t: Time
:return: A tuple, (tnext, dnext)
:exception: If the previous data is not available, :class:`DataNotFound` error is raised.
>>> tlist = [datetime.datetime(2001, 1, 10, 3),
... datetime.datetime(2001, 1, 10, 1),
... datetime.datetime(2001, 1, 10, 7),
... datetime.datetime(2001, 1, 10, 18),
... datetime.datetime(2001, 1, 10, 2)]
>>> print(len(tlist))
5
>>> dlist = [(3,), (1,), (7,), (18,), (2,)]
>>> tser = ObjectSeries(tlist, dlist)
>>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 20))
>>> print(t, d)
2001-01-10 18:00:00 (18,)
>>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 18))
>>> print(t, d)
2001-01-10 07:00:00 (7,)
>>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 2))
>>> print(t, d)
2001-01-10 01:00:00 (1,)
>>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 1)) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
timeseries.DataNotFound:
>>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 0)) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
timeseries.DataNotFound:
"""
tlist = self.times
idx = bisect.bisect_left(tlist, t)
if idx == 0:
raise DataNotFound('Data previous of {:%FT%T} is not in the ObjectSeries.'.format(t))
return tlist[idx - 1], self.data[idx - 1]
[docs] def nearest(self, t):
""" Return the nearest data.
:param t:
:return:
"""
try:
te, de = self.nearest_no_later(t) # Exception handling needed!
except DataNotFound:
te, de = datetime.datetime.min, None
try:
tl, dl = self.nearest_no_earlier(t)
except DataNotFound:
tl, dl = datetime.datetime.max, None
dte = (t - te).total_seconds()
dtl = (tl - t).total_seconds()
if dte <= dtl:
return te, de
else:
return tl, dl
[docs] def nearest_no_later(self, t):
""" Return the data point observed earlier than or equal to ``t``, but the latest one.
:param t:
:return:
"""
tlist = self.times
idx = bisect.bisect(tlist, t)
if idx == 0:
raise DataNotFound('Time {} is earlier than the dataset. t0={}'.format(t, self.t0()))
idx = idx - 1
return self.times[idx], self.data[idx]
nearest_later_or_at = nearest_no_later
[docs] def nearest_no_earlier(self, t):
""" Return the data point observed later than or equal to ``t``, but the earliest one.
:param t:
:return:
"""
tlist = self.times
idx = bisect.bisect_left(tlist, t)
if idx == len(tlist):
raise DataNotFound('Time {} is later than the dataset. t1={}'.format(t, self.t1()))
return self.times[idx], self.data[idx]
nearest_earlier_or_at = nearest_no_earlier
[docs] def concatenate(self, other, strategy='overwrite'):
''' Concatenate other series to return a new series.
:param other: The other :class:`ObjectSeries`
:keyword strategy: Strategy to concatenate.
Default is ``'overwrite'``, in which the same data points
are overwritten by the ``other`` data.
If ``'nooverwrite'`` is given, the same data points
are kept as ``self`` data.
:returns: New :class:`OtherSeries`
Concatenate two series. If duplication happens, it overwrites by the second one (default),
or kept original (``nooverwrite``).
>>> t0 = [datetime.datetime(2008, 12, 1),
... datetime.datetime(2008, 12, 3),
... datetime.datetime(2008, 12, 5)]
>>> d0 = [1.5, 2.5, 3.5]
>>> tser0 = ObjectSeries(t0, d0)
>>> print(len(tser0))
3
>>> t1 = [datetime.datetime(2008, 12, 5),
... datetime.datetime(2008, 12, 8),
... datetime.datetime(2008, 12, 9)]
>>> d1 = [4.5, 6.5, 9.5]
>>> tser1 = ObjectSeries(t1, d1)
>>> print(len(tser1))
3
Concatenate! Note that ther is one duplication. Latter (tser1) prioritized.
>>> tser2 = tser0.concatenate(tser1)
>>> print(len(tser2))
5
>>> t2, d2 = tser2.get_data()
>>> from pprint import pprint
>>> pprint(t2)
(datetime.datetime(2008, 12, 1, 0, 0),
datetime.datetime(2008, 12, 3, 0, 0),
datetime.datetime(2008, 12, 5, 0, 0),
datetime.datetime(2008, 12, 8, 0, 0),
datetime.datetime(2008, 12, 9, 0, 0))
>>> print(d2)
(1.5, 2.5, 4.5, 6.5, 9.5)
Or, tser0 could be prioritzed.
>>> tser3 = tser0.concatenate(tser1, strategy='nooverwrite')
>>> print(tser3.nearest(datetime.datetime(2008, 12, 5))[1])
3.5
'''
t0, d0 = self.get_data()
t1, d1 = other.get_data()
if strategy == 'overwrite':
tlist = t0 + t1
dlist = d0 + d1
elif strategy == 'nooverwrite':
tlist = t1 + t0
dlist = d1 + d0
else:
raise ValueError('No strategy %s defined.' % strategy)
return ObjectSeries(tlist, dlist)
[docs] def iter(self):
return _iter_objser(self)
class _iter_objser:
def __init__(self, iter_obj):
self._cur_idx = 0
self._iter_obj = iter_obj
def __iter__(self):
return self
def __next__(self):
try:
rett, retval = self._iter_obj.times[self._cur_idx], self._iter_obj.data[self._cur_idx]
self._cur_idx += 1
return rett, retval
except IndexError:
raise StopIteration()
raise StopIteration()
[docs]class ScalarSeries(_TimeSeriesABC):
''' Represents time series of floating point scalar data.
:param time_array: Sequence of time in ``datetime.datetime`` objects.
:param data_array: Sequence of data in ``float``.
Internally, the time is in ``matplotlib.dates`` number; however,
most of the methods use ``datetime.datetime`` object as interface.
Sample follows.
First, prepare the list of the time and corresponding data.
>>> tlist = [datetime.datetime(2009, 1, 10, 12),
... datetime.datetime(2009, 1, 10, 8),
... datetime.datetime(2009, 1, 10, 4),
... datetime.datetime(2009, 1, 10, 16),
... datetime.datetime(2009, 1, 10, 0),]
>>> dlist = [0, 1, 2, 3, 4]
Then, instance the ScalarSeries object.
>>> ts = ScalarSeries(tlist, dlist)
>>> ts.shape
(2, 5)
>>> len(ts)
5
>>> tlist1, dlist1 = ts.get_data()
>>> print(dlist1) # doctest: +NORMALIZE_WHITESPACE
[4. 2. 1. 0. 3.]
If the time array has duplication, the last data is validated.
In this case, the created ScalarSeries length will be 2, not 3.
>>> tlist2 = [datetime.datetime(2009, 2, 5, 12),
... datetime.datetime(2009, 2, 5, 13),
... datetime.datetime(2009, 2, 5, 13),]
>>> dlist2 = [100, 200, 500]
>>> ts2 = ScalarSeries(tlist2, dlist2)
>>> ts2.shape
(2, 2)
>>> avg = ts2.get_average(ts2.t0(), ts2.t1())
>>> print(avg)
300.0
The coverage of the data can be get via methods of
:meth:`t0` and :meth:`t1`.
>>> print(ts.t0())
2009-01-10 00:00:00
>>> print(ts.t1(fmt="mdates"))
733417.6666666666
To get the data, you can use :meth:`get_data` method.
:meth:`get_average` and :meth:`get_moment` methods will calculate
the average and variance of the given data.
To get the data with inter (and extra) polation, use
:meth:`spline_interpolate` method.
:meth:`concatenate` method will concatenate the other given
:class:`ScalarSeries` object.
.. todo::
Zero size ScalarSeries should be implemented.
However, to do this you may need to separate the data array?
TBC.
'''
def __init__(self, time_array, data_array):
if len(time_array) != len(data_array):
raise ValueError('Length of time (%d) != data (%d)'
% (len(time_array), len(data_array)))
# Sorting.
datadict = {}
for t, d in zip(time_array, data_array):
datadict[t] = d
data = []
for t in sorted(datadict.keys()):
data.append([mdates.date2num(t), datadict[t]])
self.data = np.array(data).T
self.shape = self.data.shape
self.cache = {}
[docs] def t0(self, fmt=None):
''' Returns the first data.
:keyword fmt: If ``"mdates"`` is specified,
the corresponding object is returned.
:returns: The earliest time.
:rtype: ``datetime.datetime`` or ``float``.
'''
if fmt == "mdates":
return self.data[0, 0]
else:
return mdates.num2date(self.data[0, 0]).replace(tzinfo=None)
[docs] def t1(self, fmt=None):
''' Returns the first data.
:keyword fmt: If ``"mdates"`` is specified,
the corresponding object is returned.
:returns: The latest time.
:rtype: ``datetime.datetime`` or ``float``.
'''
if fmt == "mdates":
return self.data[0, -1]
else:
return mdates.num2date(self.data[0, -1]).replace(tzinfo=None)
[docs] def getobstime(self):
""" Return the observation time.
:return: List of observation time.
>>> scalar_series = ScalarSeries.sample_data()
>>> print(len(scalar_series))
8
>>> tlist = scalar_series.getobstime()
>>> print(len(tlist))
8
>>> print(tlist[3])
2009-01-10 08:00:00
"""
return [mdates.num2date(self.data[0, i]).replace(tzinfo=None)
for i in range(len(self))]
[docs] def get_data(self, t0=None, t1=None):
''' Return the registered data.
:keyword t0: Start time (inclusive). Default is :meth:`t0`.
:type t0: ``datetime.datetime``
:keyword t1: End time (inclusive). Default is :meth:`t1`.
:type t1: ``datetime.datetime``
:returns: (``tlist``, ``dlist``) where ``tlist`` is the
list of the time as in ``datetime`` instance, and
dlist is the data as in ``ndarray`` instance.
>>> tlist = dtrange(datetime.datetime(2009, 1, 30),
... datetime.datetime(2009, 1, 31),
... datetime.timedelta(hours=1))
>>> data = [i ** 2 for i in range(len(tlist))]
>>> ts = ScalarSeries(tlist, data)
>>> tlist2, data2 = ts.get_data(
... datetime.datetime(2009, 1, 30, 11, 30),
... datetime.datetime(2009, 1, 30, 13, 0))
>>> print(len(tlist2), len(data2))
2 2
>>> print(tlist2[0], tlist2[1])
2009-01-30 12:00:00 2009-01-30 13:00:00
>>> print('%.0f %.0f' % (data2[0], data2[1]))
144 169
'''
if t0 is None:
t0 = self.t0()
if t1 is None:
t1 = self.t1()
tn0 = mdates.date2num(t0)
tn1 = mdates.date2num(t1)
tlist = self.data[0, :]
dlist = self.data[1, :]
tlist = np.ma.masked_outside(tlist, tn0, tn1, copy=True)
dlist = np.ma.masked_array(dlist, tlist.mask)
tnlist = np.ma.compressed(tlist)
tnlist = [mdates.num2date(t).replace(tzinfo=None) for t in tnlist]
return (tnlist, np.ma.compressed(dlist))
[docs] def get_average(self, t0=None, t1=None):
''' Return the average between t0 and t1.
:param t0: Start time (inclusive)
:type t0: ``datetime.datetime``
:param t1: End time (inclusive)
:type t1: ``datetime.datetime``
:returns: Average.
Use :meth:`get_moment` for more precise use.
>>> tlist = dtrange(datetime.datetime(2009, 1, 30),
... datetime.datetime(2009, 1, 31),
... datetime.timedelta(hours=1))
>>> data = [i ** 2 for i in range(len(tlist))]
>>> ts = ScalarSeries(tlist, data)
The average below should be **(144+169+196)/3 = 169.667**.
>>> print('%.3f' % ts.get_average(
... datetime.datetime(2009, 1, 30, 11, 30),
... datetime.datetime(2009, 1, 30, 14, 0)))
169.667
'''
if t0 is None:
t0 = self.t0()
if t1 is None:
t1 = self.t1()
tlist, dlist = self.get_data(t0, t1)
return dlist.mean()
[docs] def get_moment(self, t0=None, t1=None):
''' Return the number of data, average and variance.
:param t0: Start time (inclusive)
:type t0: ``datetime.datetime``
:param t1: End time (inclusive)
:type t1: ``datetime.datetime``
:returns: (number of data, average, variance)
>>> tlist = dtrange(datetime.datetime(2009, 1, 30),
... datetime.datetime(2009, 1, 31),
... datetime.timedelta(hours=1))
>>> data = [i ** 2 for i in range(len(tlist))]
>>> ts = ScalarSeries(tlist, data)
The average below should be (121+144+169+196)/4 = 157.5
Variance is (36.5^2 + 13.5^2 + 11.5^2 + 38.5^2)/4 = 782.25
>>> ndat, ave, var = ts.get_moment(
... datetime.datetime(2009, 1, 30, 11, 0),
... datetime.datetime(2009, 1, 30, 14, 0))
>>> print(ndat)
4
>>> print('%.3f' % ave)
157.500
>>> print('%.3f' % var)
782.250
'''
if t0 is None:
t0 = self.t0()
if t1 is None:
t1 = self.t1()
tlist, dlist = self.get_data(t0, t1)
return len(dlist), dlist.mean(), dlist.var()
def _nearest(self, t):
''' Return the nearest data. Single arg version.
:param t: Datetime.
'''
tn = mdates.date2num(t)
idx = bisect.bisect(self.data[0, :], tn)
if idx == len(self):
idx = idx - 1
if idx == 0:
return self.data[1, 0]
tn0 = np.abs(tn - self.data[0, idx - 1])
tn1 = np.abs(tn - self.data[0, idx])
if tn0 <= tn1:
return self.data[1, idx - 1]
else:
return self.data[1, idx]
[docs] def nearest(self, t):
''' Return the nearest data.
:param t: Scalar or list of time.
'''
tlist = np.array(t)
if tlist.shape == ():
return self._nearest(t)
else:
d = np.array([self._nearest(ti) for ti in tlist])
return d
[docs] def linear_interpolate(self, t):
""" Linear interpolation.
:param t: Time or a sequence of times.
:return: Interpolated data.
>>> tlist = [datetime.datetime(2009, 1, 10, 12),
... datetime.datetime(2009, 1, 10, 8),
... datetime.datetime(2009, 1, 10, 4),
... datetime.datetime(2009, 1, 10, 16),
... datetime.datetime(2009, 1, 10, 0),]
>>> dlist = [1, 2, 3, 2, 2]
>>> ts = ScalarSeries(tlist, dlist)
>>> print(ts.linear_interpolate(datetime.datetime(2009, 1, 10, 6)))
2.5
>>> print(ts.linear_interpolate([datetime.datetime(2009, 1, 10, 6), datetime.datetime(2009, 1, 10, 14)]))
[2.5 1.5]
"""
tn = mdates.date2num(t)
from scipy.interpolate import interp1d
f = interp1d(self.data[0, :], self.data[1, :])
return f(tn)
[docs] def spline_interpolate(self, t, extrapolate=False):
''' Interpolate using spline fitting.
Interpolation can be done using Spline fitting.
:param t: Time, sequence allowed.
:type t: ``datetime.datetime``
:keyword extrapolate: Extrapolation handling. If ``True`` is
set, extrapolation is allowed.
If ``False`` is set (default), extrapolation is not allowed.
You may also set ``datetime.timedelta`` object here to set
"glues" of the data.
In this case, the inter/extra-polation between
(:meth:`t0` - extrapolate) and (:meth:`t1` + extrapolate)
is allowed.
:raises ValueError: If non-allowed extrapolation was executed.
By default extrapolation is not allowed, because
it may produce unrealistic values.
>>> tlist = [datetime.datetime(2009, 1, 10, 12),
... datetime.datetime(2009, 1, 10, 8),
... datetime.datetime(2009, 1, 10, 4),
... datetime.datetime(2009, 1, 10, 16),
... datetime.datetime(2009, 1, 10, 0),]
>>> dlist = [1, 2, 3, 2, 2]
>>> ts = ScalarSeries(tlist, dlist)
>>> print('%.3f' % (
... ts.spline_interpolate(datetime.datetime(2009, 1, 10, 8))))
2.000
>>> print('%.3f' % (
... ts.spline_interpolate(datetime.datetime(2009, 1, 10, 6))))
2.625
You may give a list as argumnet, too.
>>> vals = ts.spline_interpolate([datetime.datetime(2009, 1, 10, 1),
... datetime.datetime(2009, 1, 10, 5),
... datetime.datetime(2009, 1, 10, 9),])
>>> print('%.3f %.3f %.3f' % (vals[0], vals[1], vals[2]))
2.547 2.859 1.672
Extrapolation is not allowed by default, but you can set
extrapolation keyword to do it.
>>> print('%.3f' % (
... ts.spline_interpolate(datetime.datetime(2009, 1, 11, 0),
... extrapolate=True)))
18.000
You may also allow the extrapolation-allowed range with
``datetime.timedelta`` object.
See ``scripts/timeseries_sample1`` also (TBD).
>>> print('%.3f' % (
... ts.spline_interpolate(datetime.datetime(2009, 1, 9, 0),
... extrapolate=datetime.timedelta(days=1))))
-158.000
'''
if extrapolate is True:
ex0 = datetime.datetime(datetime.MINYEAR, 1, 1)
ex1 = datetime.datetime(datetime.MAXYEAR, 12, 31)
elif extrapolate is False:
ex0 = self.t0()
ex1 = self.t1()
else:
ex0 = self.t0() - extrapolate
ex1 = self.t1() + extrapolate
ex0 = mdates.date2num(ex0)
ex1 = mdates.date2num(ex1)
tn = mdates.date2num(t)
if (np.array(tn) < ex0).any() | (np.array(tn) > ex1).any():
raise ValueError(
'Some of the given time is out of the range.\n' +
'Use extrapol keyword to avoid this exception,\n' +
'while large error may be introduced.')
try:
spl = self.cache['spl']
except KeyError:
spl = interpolate.InterpolatedUnivariateSpline(self.data[0, :], self.data[1, :])
self.cache['spl'] = spl
return spl(tn)
[docs] def clear_cache(self):
''' Clear cache (for spline)
'''
self.cache = {}
[docs] def concatenate(self, other, strategy='overwrite'):
''' Concatenate other series to return a new series.
:param other: The other :class:`ScalarSeries`
:keyword strategy: Strategy to concatenate.
Default is ``'overwrite'``, in which the same data points
are overwritten by the ``other`` data.
If ``'nooverwrite'`` is given, the same data points
are kept as ``self`` data.
:returns: New :class:`ScalarSeries`
Prepare the first ScalarSeries with 3 records.
>>> t0 = [datetime.datetime(2008, 12, 1),
... datetime.datetime(2008, 12, 3),
... datetime.datetime(2008, 12, 5)]
>>> d0 = [1.5, 2.5, 3.5]
>>> ts0 = ScalarSeries(t0, d0)
Then, create the second ScalarSeries with 3 records.
>>> t1 = [datetime.datetime(2008, 12, 5),
... datetime.datetime(2008, 12, 8),
... datetime.datetime(2008, 12, 9)]
>>> d1 = [4.5, 6.5, 9.5]
>>> ts1 = ScalarSeries(t1, d1)
Concatenate them. Remember 1 record is duplicated.
Therefore, only 5 records are in the new ScalarSeries.
>>> ts2 = ts0.concatenate(ts1)
>>> print(len(ts2))
5
The duplicated record is overwritten by ``ts1``.
>>> print(ts2.nearest(datetime.datetime(2008, 12, 5)))
4.5
If 'nooverwrite strategy is taken, ``ts0`` is used.
>>> ts2 = ts0.concatenate(ts1, strategy='nooverwrite')
>>> print(ts2.nearest(datetime.datetime(2008, 12, 5)))
3.5
'''
t0, d0 = self.get_data()
t1, d1 = other.get_data()
if strategy == 'overwrite':
t = np.concatenate([t0, t1])
d = np.concatenate([d0, d1])
elif strategy == 'nooverwrite':
t = np.concatenate([t1, t0])
d = np.concatenate([d1, d0])
else:
raise ValueError('No strategy %s defined.' % strategy)
return ScalarSeries(t, d)
def __len__(self):
return len(self.data[0, :])
[docs] @classmethod
def sample_data(cls):
''' Get a sample data.
'''
tlist = [datetime.datetime(2009, 1, 10, 0),
datetime.datetime(2009, 1, 10, 1),
datetime.datetime(2009, 1, 10, 3),
datetime.datetime(2009, 1, 10, 8),
datetime.datetime(2009, 1, 10, 10),
datetime.datetime(2009, 1, 10, 11),
datetime.datetime(2009, 1, 10, 15),
datetime.datetime(2009, 1, 10, 21), ]
data = [0.0, 1.5, 2.8, 9.8, 9.2, 9.9, 16.3, 22.5]
return ScalarSeries(tlist, data)
[docs] def clipped(self, t0, t1):
'''Return clipped :class:`ScalarSeries`.
:param t0: Start time
:param t1: Stop time
:return: clipped data between t0 and t1.
:rtype: ScalarSeries
>>> scalar_series = ScalarSeries.sample_data()
>>> print(len(scalar_series))
8
>>> clipped_scalar_series = scalar_series.clipped(
... datetime.datetime(2009, 1, 10, 7),
... datetime.datetime(2009, 1, 10, 15))
>>> print(len(clipped_scalar_series))
4
>>> print(clipped_scalar_series.t0())
2009-01-10 08:00:00
>>> print(clipped_scalar_series.t1())
2009-01-10 15:00:00
'''
tlist, dlist = self.get_data(t0, t1)
return ScalarSeries(tlist, dlist)
[docs]class Vector3dSeries(_TimeSeriesABC):
''' Time series of 3-D vector.
Internally each component uses :class:`ScalarSeries`.
This is a redundunt approch (time_array is repeated three times)
but simple implementation wins.
>>> tlist = [datetime.datetime(2009, 1, 10, 0),
... datetime.datetime(2009, 1, 10, 4),
... datetime.datetime(2009, 1, 10, 8),
... datetime.datetime(2009, 1, 10, 12),
... datetime.datetime(2009, 1, 10, 16),]
>>> xlist = [0, 0.5, 1, 0.5, 0]
>>> ylist = [1, 0.5, 0, -0.5, -1]
>>> zlist = [0, 1, 2, 3, 4]
>>> vs = Vector3dSeries(tlist, xlist, ylist, zlist)
To get the data, you can use :meth:`get_data` method.
>>> t0 = datetime.datetime(2009, 1, 10, 1)
>>> t1 = datetime.datetime(2009, 1, 10, 8)
>>> t, v = vs.get_data(t0, t1)
>>> print(len(t))
2
>>> print(t[0])
2009-01-10 04:00:00
>>> print(t[1])
2009-01-10 08:00:00
>>> print(v.shape)
(3, 2)
>>> print(v[:, 0]) # doctest: +NORMALIZE_WHITESPACE
[0.5 0.5 1. ]
>>> print(v[:, 1]) # doctest: +NORMALIZE_WHITESPACE
[1. 0. 2.]
You may also use :meth:`get_average` and :meth:`get_moment` methods
for the average or moment values.
>>> ave = vs.get_average(t0, t1)
>>> print('%.2f %.2f %.2f' % (ave[0], ave[1], ave[2]))
0.75 0.25 1.50
>>> n, ave, var = vs.get_moment(t0, t1)
>>> print(n)
2
>>> print(ave) # doctest: +NORMALIZE_WHITESPACE
[0.75 0.25 1.5 ]
>>> print(var) # doctest: +NORMALIZE_WHITESPACE
[0.0625 0.0625 0.25 ]
For interpolation, you can use :meth:`spline_interpolate`.
>>> t_interpol = [t0, t1]
>>> val_interpol = vs.spline_interpolate(t_interpol)
>>> from irfpy.util.with_context import printoptions
>>> with printoptions(precision=2, suppress=True):
... print(val_interpol.shape)
... print(val_interpol[:, 0]) # doctest: +NORMALIZE_WHITESPACE
... # print val_interpol[:, 1]
(3, 2)
[0.04 0.88 0.25]
# [ 1. 0. 2.]
.. todo::
The doctest may make non-unique solution (0. or -0.)
so that I disabled the test on 2013-09-03.
One has to make better test routine here.
'''
def __init__(self, time_array, x_array, y_array, z_array):
''' Instance the :class:`Vector3dSeries` object
:param time_array: Sequence of time in ``datetime.datetime`` objects.
:param x_array: Sequence of x-component in ``float``.
:param y_array: Sequence of y-component in ``float``.
:param z_array: Sequence of z-component in ``float``.
'''
self.x = ScalarSeries(time_array, x_array)
self.y = ScalarSeries(time_array, y_array)
self.z = ScalarSeries(time_array, z_array)
[docs] def spline_interpolate(self, t, extrapolate=False):
''' Spline interpolation.
Returns the spline interpolation. Interpolation is component-wise.
See also :meth:`ScalarSeries.spline_interpolate` for details.
:param t: ``datetime.datetime`` object or sequence of it.
:keyword extrapolate: Extrapolation handling.
See :meth:`ScalarSeries.spline_interpolate` for details.
:returns: Interpolated vector (component wise).
If ``t`` is a ``datetime.datetime`` object, ``np.array`` with
shape of (3, ) is returned.
If ``t`` is a sequence of ``datetime.datetime`` with *N* length,
``np.array`` with shape of (3, *N*) is returned.
'''
x = self.x.spline_interpolate(t, extrapolate=extrapolate)
y = self.y.spline_interpolate(t, extrapolate=extrapolate)
z = self.z.spline_interpolate(t, extrapolate=extrapolate)
return np.array([x, y, z])
[docs] def t0(self):
''' Return the first data time
'''
return self.x.t0()
[docs] def t1(self):
''' Return the last data time
'''
return self.x.t1()
[docs] def getobstime(self):
''' Return the observation time.
:return: List of observation time.
'''
return self.x.getobstime()
[docs] def get_data(self, t0=None, t1=None):
''' Return the data in between ``t0`` and ``t1``.
:param t0: Start time (inclusive)
:type t0: ``datetime.datetime``
:param t1: End time (inclusive)
:type t1: ``datetime.datetime``
:returns: The data in between ``t0`` and ``t1``.
``ndarray`` with the shape of (3, N) where N
is the number of data.
'''
t, x = self.x.get_data(t0, t1)
t, y = self.y.get_data(t0, t1)
t, z = self.z.get_data(t0, t1)
return (t, np.array([x, y, z]))
[docs] def get_timeseries_magnitude(self):
""" Return the time series of the magnitude data.
:return: Time series of magnitude data.
:rtype: :class:`ScalarSeries`.
>>> vecseries = Vector3dSeries.sample_data()
>>> print(len(vecseries))
5
>>> t, v = vecseries.get_data()
>>> magseries = vecseries.get_timeseries_magnitude()
>>> t, l = magseries.get_data()
>>> print(v)
[[ 0. 0.5 1. 0.5 0. ]
[ 1. 0.5 0. -0.5 -1. ]
[ 0. 1. 2. 3. 4. ]]
>>> print(l) # doctest: +NORMALIZE_WHITESPACE
[1. 1.22474487 2.23606798 3.082207 4.12310563]
"""
tlist, xyz = self.get_data()
mag = np.sqrt((xyz ** 2).sum(axis=0))
return ScalarSeries(tlist, mag)
[docs] def clipped(self, t0, t1):
""" Return clipped Vector3dSeries.
:param t0: Start time
:param t1: Stop time
:return: New Vector3dSeries
>>> tser3d = Vector3dSeries.sample_data()
>>> print(len(tser3d))
5
>>> tser3d = tser3d.clipped(datetime.datetime(2009, 1, 10, 6),
... datetime.datetime(2009, 1, 10, 15))
>>> print(len(tser3d))
2
>>> tlist, dlist = tser3d.get_data()
>>> print(dlist)
[[ 1. 0.5]
[ 0. -0.5]
[ 2. 3. ]]
"""
t, x = self.x.get_data(t0, t1)
t, y = self.y.get_data(t0, t1)
t, z = self.z.get_data(t0, t1)
return Vector3dSeries(t, x, y, z)
[docs] def get_average(self, t0=None, t1=None):
''' Return the average.
:param t0: Start time (inclusive)
:type t0: ``datetime.datetime``
:param t1: End time (inclusive)
:type t1: ``datetime.datetime``
:returns: Average
:rtype: ``ndarray`` with shape of (3,)
'''
if t0 is None:
t0 = self.t0()
if t1 is None:
t1 = self.t1()
x = self.x.get_average(t0, t1)
y = self.y.get_average(t0, t1)
z = self.z.get_average(t0, t1)
return np.array([x, y, z])
[docs] def get_moment(self, t0=None, t1=None):
''' Return the moment.
:param t0: Start time (inclusive)
:type t0: ``datetime.datetime``
:param t1: End time (inclusive)
:type t1: ``datetime.datetime``
:returns: A tuple.
(number of data,
average in (3,) ``ndarray``,
variance in (3,) ``ndarray``)
'''
if t0 is None:
t0 = self.t0()
if t1 is None:
t1 = self.t1()
xn, xa, xv = self.x.get_moment(t0, t1)
yn, ya, yv = self.y.get_moment(t0, t1)
zn, za, zv = self.z.get_moment(t0, t1)
return (xn, np.array([xa, ya, za]), np.array([xv, yv, zv]))
[docs] def clear_cache(self):
self.x.clear_cache()
self.y.clear_cache()
self.z.clear_cache()
[docs] def nearest(self, t):
x = self.x.nearest(t)
y = self.y.nearest(t)
z = self.z.nearest(t)
return np.array([x, y, z])
[docs] def concatenate(self, other, strategy='overwrite'):
''' Concatenate other series to return a new series.
:param other: The other :class:`Vector3dSeries`
:keyword strategy: Strategy to concatenate.
Default is ``'overwrite'``, in which the same data points
are overwritten by the ``other`` data.
If ``'nooverwrite'`` is given, the same data points
are kept as ``self`` data.
:returns: New :class:`VectorSeries`
>>> t0 = dtrange(datetime.datetime(2005, 1, 1),
... datetime.datetime(2005, 1, 2),
... datetime.timedelta(hours=6)) # 4 elements
>>> print(len(t0))
4
>>> x0 = [0, 1, 2, 3]
>>> y0 = [0, -1, -2, -3]
>>> z0 = [0, 0, 0, 0]
>>> v0 = Vector3dSeries(t0, x0, y0, z0)
>>> t1 = dtrange(datetime.datetime(2005, 1, 2),
... datetime.datetime(2005, 1, 2, 12),
... datetime.timedelta(hours=4)) # 3 elements
>>> x1 = [4, 5, 6]
>>> y1 = [-2, -1, 0]
>>> z1 = [1, 1, 1]
>>> v1 = Vector3dSeries(t1, x1, y1, z1)
>>> v2 = v0.concatenate(v1)
>>> print(len(v2))
7
>>> t2, d2 = v2.get_data()
>>> print(d2.shape)
(3, 7)
>>> print(d2[0, :]) # doctest: +NORMALIZE_WHITESPACE
[0. 1. 2. 3. 4. 5. 6.]
>>> print(d2[1, :])
[ 0. -1. -2. -3. -2. -1. 0.]
>>> print(d2[2, :]) # doctest: +NORMALIZE_WHITESPACE
[0. 0. 0. 0. 1. 1. 1.]
'''
x = self.x.concatenate(other.x, strategy=strategy)
y = self.y.concatenate(other.y, strategy=strategy)
z = self.z.concatenate(other.z, strategy=strategy)
t, x = x.get_data()
t, y = y.get_data()
t, z = z.get_data()
return Vector3dSeries(t, x, y, z)
def __len__(self):
return len(self.x)
[docs] @classmethod
def sample_data(cls):
''' Return a sample data.
'''
tlist = [datetime.datetime(2009, 1, 10, 0),
datetime.datetime(2009, 1, 10, 4),
datetime.datetime(2009, 1, 10, 8),
datetime.datetime(2009, 1, 10, 12),
datetime.datetime(2009, 1, 10, 16), ]
xlist = [0, 0.5, 1, 0.5, 0]
ylist = [1, 0.5, 0, -0.5, -1]
zlist = [0, 1, 2, 3, 4]
return Vector3dSeries(tlist, xlist, ylist, zlist)
def __str__(self):
s = '<irfpy.util.timeseries.Vector3dSeries: Len={l:d} T0={t0:%FT%T} T1={t1:%FT%T}>'.format(
l=len(self), t0=self.t0(), t1=self.t1())
return s
[docs]class TimeStepScalarField3D(_TimeSeriesABC):
r""" Time series of scalar field.
This class extends the functionality of
:class:`irfpy.util.fields.ScalarField3D`
class and their derivatives in order to support the time series of fields.
.. note::
This contains several :class:`ScalarField3D`, so that the system should sometimes
have a very large memory. No memory handling is supported.
This means that all the data is on memory.
*Sample*
Let's look at a scalar field, i.e. gravity potential,
produced by a moving point of mass: :math:`(x_m=vt, 0, 0)`.
The gravity potential is obtained from :class:`irfpy.util.fields.GravityPotential` class.
First, just create the potential field at ``t=0``, i.e. ``x_m=0``. The mass is taken
as :math:`M=\frac{1}{6.67408\times 10^{-11}}`, then, :math:`GM=1`.
Note that the time should be ``datetime.datetime`` object.
Thus, a dummy time (with respective to a given epoch).
>>> import datetime
>>> t0 = datetime.datetime(2010, 1, 1, 0, 0, 0) # An epoch. This case, it is a dummy time.
.. math::
\phi(r) = -\frac{GM}{r} = -\frac{1}{r}
>>> import irfpy.util.fields
>>> m = 1 / 6.67408e-11
>>> potential_t0 = irfpy.util.fields.GravityPotential(m, center=(0, 0, 0))
Just to confirm if it works, check the potential at ``(0, 4, 0)``.
>>> print(potential_t0([0, 4, 0]))
-0.25
Then, as time goes, the point of mass moves. Here I take ``v=2.5``.
>>> v=2.5
At time ``t=1``, the position of the mass is at ``x_m = v (=2.5)``.
>>> t=1
>>> xm_1 = (v * t, 0, 0)
>>> potential_t1 = irfpy.util.fields.GravityPotential(m, center=xm_1)
Now the observer at (0, 4, 0) see different potential, the distance is now
:math:`|(2.5, 4.0, 0)|=4.717` and the potential is :math:`\phi(r, t=1)=-1/r=-0.212`.
>>> print('{:.3f}'.format(potential_t1([0, 4, 0])))
-0.212
At time ``t=3``, the position moves to ``x_m = 3v = 7.5``.
>>> t=3
>>> xm_3 = (v * t, 0, 0)
>>> potential_t3 = irfpy.util.fields.GravityPotential(m, center=xm_3)
>>> print('{:.3f}'.format(potential_t3([0, 4, 0])))
-0.118
A bit more time steps are added as follows.
>>> si = (0, 1, 3, 5, 8)
>>> ti = [t0 + datetime.timedelta(seconds=_t) for _t in si]
>>> potential_list = []
>>> for _t in si:
... xm_t = (v * _t, 0, 0)
... potential_t = irfpy.util.fields.GravityPotential(m, center=xm_t)
... potential_list.append(potential_t)
Now it is time to create a :class:`TimeStepScalarField`.
>>> tsfield = TimeStepScalarField3D(ti, potential_list)
First, you can get the field directly. The following is the same as
>>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=0)))) # At (0, 4, 0) at t=0
-0.250
>>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=1)))) # At (0, 4, 0) at t=1
-0.212
>>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=3)))) # At (0, 4, 0) at t=3
-0.118
Timewise, the linear interpolated value is returned.
Spacewise, the default interpolation of the given field (irfpy.util.fields.ScalarField3D)
is used.
>>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=2)))) # At (0, 4, 0) at t=2 is not given, but the returned is linear interpolation.
-0.165
>>> print(tsfield.index_of_time(t0))
0
>>> print(tsfield.index_of_time(t0 + datetime.timedelta(seconds=4)))
2
>>> print(tsfield.index_of_time(t0 + datetime.timedelta(seconds=9)))
4
"""
def __init__(self, time_list, field_list):
if len(time_list) != len(field_list):
raise ValueError('The given time_list and field_list does not have same length.')
datadict = {}
for _t, _f in zip(time_list, field_list):
datadict[_t] = _f
self._tlist = [] # List is in the ascending order
self._flist = [] # Index is corresponding to tlist
for t in sorted(datadict.keys()):
self._tlist.append(t)
self._flist.append(datadict[t])
self._time2index = _tsdb.DB()
for _i, _t in enumerate(self._tlist):
self._time2index.append('{:d}'.format(_i), _t)
[docs] def t0(self):
return self._tlist[0]
[docs] def t1(self):
return self._tlist[1]
[docs] def getobstime(self):
return self._tlist.copy()
[docs] def get_data(self, t0=None, t1=None):
if t0 is None:
t0 = self.t0()
if t1 is None:
t1 = self.t1()
new_tlist = []
new_flist = []
for _t, _f in zip(self._tlist, self._flist):
if _t >= t0 and _t <= t1:
new_tlist.append(_t)
new_flist.append(_f)
return new_tlist, new_flist
[docs] def clipped(self, t0, t1):
tlist, dlist = self.get_data(t0, t1)
return TimeStepScalarField3D(tlist, dlist)
[docs] def index_of_time(self, t):
return int(self._time2index.get(t))
def __call__(self, xyz, tt):
# First, identify the corresponding field
i0 = self.index_of_time(tt)
t0 = self._tlist[i0]
f0 = self._flist[i0]
try:
t1 = self._tlist[i0 + 1]
except IndexError:
return ValueError('The given time is out of bounds.')
f1 = self._flist[i0 + 1]
val0 = f0(xyz)
val1 = f1(xyz)
dt0 = (tt - t0).total_seconds()
dt1 = (t1 - tt).total_seconds()
gamma = dt0 / (dt0 + dt1)
return val0 + (val1 - val0) * gamma
[docs]def dtrange(t0, t1, delta):
''' Return a regularly separated list of ``datatime`` objects.
:param t0: Start
:type t0: ``datetime.datetime``
:param t1: Stop (not inclusive)
:type t1: ``datetime.datetime``
:param delta: Delta
:type delta: ``datetime.timedelta``.
:returns: List of ``datetime.datetime``.
>>> r0 = dtrange(datetime.datetime(2009, 1, 1),
... datetime.datetime(2009, 1, 2),
... datetime.timedelta(hours=1))
>>> print(len(r0))
24
>>> print(r0[0])
2009-01-01 00:00:00
>>> print(r0[-1])
2009-01-01 23:00:00
'''
tlist = []
t = t0
while t < t1:
tlist.append(t)
t = t + delta
return tlist