Source code for irfpy.util.timeseries

''' Time sereis related classes.

.. codeauthor:: Yoshifumi Futaana

.. autosummary::

    ScalarSeries
    Vector3dSeries
    dtrange

These classes stores a time series of scalar / vector values.
For example, these may be used to represent a time profile of
the plasma density, or a time series of magnetic field vector.

.. note::

    There is similar class, :class:`irfpy.util.julday.JdSeries`.
    The :class:`irfpy.util.julday.JdSeries` class is slow and not
    easy to handle.
    Thus, developers may consider using :class:`ScalarSeries`
    and :class:`Vector3dSeries`.

.. note::

    The :class:`ScalarSeries` only handles ``float`` data.
    :class:`Vector3dSeries` only treats 3-D vector of ``float``.
    Internally they use ``ndarray``, thus they are quicker and more flexible.
'''
import datetime

import numpy as np
from scipy import interpolate

import bisect
import abc

from irfpy.util import timeseriesdb as _tsdb
from irfpy.util import exception as _ex
from irfpy.util import utc as _utc


[docs]class DataNotFound(_ex.IrfpyException): ''' Exception raised when the data is not found. ''' def __init__(self, value): self.value = value def __str__(self): return repr(self.value)
class _TimeSeriesABC(metaclass=abc.ABCMeta): @abc.abstractmethod def t0(self): """ Return the time of the first data :returns: The earliest time. :rtype: ``datetime.datetime`` """ pass @abc.abstractmethod def t1(self): """ Return the time of the last data. :returns: The latest time. :rtype: ``datetime.datetime`` """ pass @abc.abstractmethod def getobstime(self): """ Return the list of the observation time. :return: List of the observation time. """ pass @abc.abstractmethod def get_data(self, t0, t1): ''' Return the registered data. :keyword t0: Start time (inclusive). :type t0: ``datetime.datetime`` :keyword t1: End time (inclusive). :type t1: ``datetime.datetime`` :returns: (``tlist``, ``dlist``) where ``tlist`` is the list of the time as in ``datetime`` instance, and dlist is the data as in ``ndarray`` instance. ''' pass @abc.abstractmethod def clipped(self, t0, t1): ''' Return clipped data. :param t0: Start time (inclusive) :param t1: Stop time (inclusive) :return: Clipped object. It should be the same class as ``self``. ''' pass
[docs]class ObjectSeries(_TimeSeriesABC): ''' Represents time series of any object. :param time_array: Sequence of time in ``datetime.dattime`` objects :param data_array: Sequecne of data in any format. Represent time-tagged data. If the data is floating point, :class:`ScalarSeries` can be used. If the data is 3-D real floating, :class:`VectorSeries` can be used. This class is for arbitrary object. ''' def __init__(self, time_array, data_sequence): """ Initialize the series. :param time_array: Array of time. :param data_sequence: Array of data. >>> tlist = [datetime.datetime(2001, 1, 10, 3), ... datetime.datetime(2001, 1, 10, 1), ... datetime.datetime(2001, 1, 10, 7), ... datetime.datetime(2001, 1, 10, 18), ... datetime.datetime(2001, 1, 10, 2)] >>> print(len(tlist)) 5 >>> dlist = [(3,), (1,), (7,), (18,), (2,)] >>> tser = ObjectSeries(tlist, dlist) >>> tlist2 = tser.times >>> from pprint import pprint >>> pprint(tlist2) [datetime.datetime(2001, 1, 10, 1, 0), datetime.datetime(2001, 1, 10, 2, 0), datetime.datetime(2001, 1, 10, 3, 0), datetime.datetime(2001, 1, 10, 7, 0), datetime.datetime(2001, 1, 10, 18, 0)] >>> dlist2 = tser.data >>> print(dlist2) [(1,), (2,), (3,), (7,), (18,)] >>> pprint(tser.get_data()) ((datetime.datetime(2001, 1, 10, 1, 0), datetime.datetime(2001, 1, 10, 2, 0), datetime.datetime(2001, 1, 10, 3, 0), datetime.datetime(2001, 1, 10, 7, 0), datetime.datetime(2001, 1, 10, 18, 0)), ((1,), (2,), (3,), (7,), (18,))) >>> pprint(tser.get_data(datetime.datetime(2001, 1, 10, 2), datetime.datetime(2001, 1, 10, 7))) ((datetime.datetime(2001, 1, 10, 2, 0), datetime.datetime(2001, 1, 10, 3, 0), datetime.datetime(2001, 1, 10, 7, 0)), ((2,), (3,), (7,))) >>> pprint(tser.get_data(datetime.datetime(2030, 1, 1), datetime.datetime(2040, 1, 1))) ((), ()) >>> for t, d in tser.iter(): ... print(t, d) 2001-01-10 01:00:00 (1,) 2001-01-10 02:00:00 (2,) 2001-01-10 03:00:00 (3,) 2001-01-10 07:00:00 (7,) 2001-01-10 18:00:00 (18,) >>> tser2 = tser.clipped(datetime.datetime(2001, 1, 10, 2), datetime.datetime(2001, 1, 10, 7)) >>> print(len(tser2)) 3 >>> pprint(tser2.get_data()) ((datetime.datetime(2001, 1, 10, 2, 0), datetime.datetime(2001, 1, 10, 3, 0), datetime.datetime(2001, 1, 10, 7, 0)), ((2,), (3,), (7,))) >>> tser3 = tser.clipped(datetime.datetime(2030, 10, 1), datetime.datetime(2010, 10, 1)) >>> print(len(tser3)) 0 """ if len(time_array) != len(data_sequence): raise DataNotFound('Length of time (%d) != data (%d)') datadict = {} for t, d in zip(time_array, data_sequence): datadict[t] = d self.times = sorted(datadict.keys()) self.data = [datadict[t] for t in self.times]
[docs] def t0(self): try: t = self.times[0] except IndexError: raise _ex.IrfpyException('No data contained.') return t
[docs] def t1(self): try: t = self.times[-1] except IndexError: raise _ex.IrfpyException("No data contained.") return t
[docs] def getobstime(self): return self.times[:]
[docs] def get_data(self, t0=None, t1=None): if t0 is None: t0 = self.t0() if t1 is None: t1 = self.t1() tlist = np.array(self.times, copy=True) t_later_t0 = tlist >= t0 t_earlier_t1 = tlist <= t1 t_match = (t_later_t0 & t_earlier_t1) idx = np.where(t_match)[0] t_new = tuple([self.times[_i] for _i in idx]) d_new = tuple([self.data[_i] for _i in idx]) return (t_new, d_new)
[docs] def clipped(self, t0, t1): tlist, dlist = self.get_data(t0, t1) return ObjectSeries(tlist, dlist)
def __len__(self): return len(self.times)
[docs] def at(self, t): """ Return the data at the time of ``t``. The data at the time of ``t``. The time should be exact. Usually you can get the time of the data by :meth:`ObjectSeries.getobstime` method. If not data is available at ``t``, :class:`DataNotFound` error is raised. """ try: idx = self.times.index(t) return self.data[idx] except ValueError: raise DataNotFound('Data not found for time {}.'.format(t))
[docs] def nextof(self, t): """ Return the data next of ``t``. The next data time should be ``tdata > t``. :param t: Time :return: A tuple, (tnext, dnext) :exception: If the next data is not available, :class:`DataNotFound` error is raised. >>> tlist = [datetime.datetime(2001, 1, 10, 3), ... datetime.datetime(2001, 1, 10, 1), ... datetime.datetime(2001, 1, 10, 7), ... datetime.datetime(2001, 1, 10, 18), ... datetime.datetime(2001, 1, 10, 2)] >>> print(len(tlist)) 5 >>> dlist = [(3,), (1,), (7,), (18,), (2,)] >>> tser = ObjectSeries(tlist, dlist) >>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 0)) >>> print(t, d) 2001-01-10 01:00:00 (1,) >>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 3)) >>> print(t, d) 2001-01-10 07:00:00 (7,) >>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 17)) >>> print(t, d) 2001-01-10 18:00:00 (18,) >>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 18)) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... timeseries.DataNotFound: >>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 20)) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... timeseries.DataNotFound: """ tlist = self.times idx = bisect.bisect_right(tlist, t) if idx >= len(tlist): raise DataNotFound('Data next of {:%FT%T} is not in the ObjectSeries.'.format(t)) return tlist[idx], self.data[idx]
[docs] def previousof(self, t): """ Return the data previous of ``t``. The previous data time should be ``tdata < t``. :param t: Time :return: A tuple, (tnext, dnext) :exception: If the previous data is not available, :class:`DataNotFound` error is raised. >>> tlist = [datetime.datetime(2001, 1, 10, 3), ... datetime.datetime(2001, 1, 10, 1), ... datetime.datetime(2001, 1, 10, 7), ... datetime.datetime(2001, 1, 10, 18), ... datetime.datetime(2001, 1, 10, 2)] >>> print(len(tlist)) 5 >>> dlist = [(3,), (1,), (7,), (18,), (2,)] >>> tser = ObjectSeries(tlist, dlist) >>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 20)) >>> print(t, d) 2001-01-10 18:00:00 (18,) >>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 18)) >>> print(t, d) 2001-01-10 07:00:00 (7,) >>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 2)) >>> print(t, d) 2001-01-10 01:00:00 (1,) >>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 1)) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... timeseries.DataNotFound: >>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 0)) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... timeseries.DataNotFound: """ tlist = self.times idx = bisect.bisect_left(tlist, t) if idx == 0: raise DataNotFound('Data previous of {:%FT%T} is not in the ObjectSeries.'.format(t)) return tlist[idx - 1], self.data[idx - 1]
[docs] def nearest(self, t): """ Return the nearest data. :param t: :return: """ try: te, de = self.nearest_no_later(t) # Exception handling needed! except DataNotFound: te, de = datetime.datetime.min, None try: tl, dl = self.nearest_no_earlier(t) except DataNotFound: tl, dl = datetime.datetime.max, None dte = (t - te).total_seconds() dtl = (tl - t).total_seconds() if dte <= dtl: return te, de else: return tl, dl
[docs] def nearest_no_later(self, t): """ Return the data point observed earlier than or equal to ``t``, but the latest one. :param t: :return: """ tlist = self.times idx = bisect.bisect(tlist, t) if idx == 0: raise DataNotFound('Time {} is earlier than the dataset. t0={}'.format(t, self.t0())) idx = idx - 1 return self.times[idx], self.data[idx]
nearest_later_or_at = nearest_no_later
[docs] def nearest_no_earlier(self, t): """ Return the data point observed later than or equal to ``t``, but the earliest one. :param t: :return: """ tlist = self.times idx = bisect.bisect_left(tlist, t) if idx == len(tlist): raise DataNotFound('Time {} is later than the dataset. t1={}'.format(t, self.t1())) return self.times[idx], self.data[idx]
nearest_earlier_or_at = nearest_no_earlier
[docs] def concatenate(self, other, strategy='overwrite'): ''' Concatenate other series to return a new series. :param other: The other :class:`ObjectSeries` :keyword strategy: Strategy to concatenate. Default is ``'overwrite'``, in which the same data points are overwritten by the ``other`` data. If ``'nooverwrite'`` is given, the same data points are kept as ``self`` data. :returns: New :class:`OtherSeries` Concatenate two series. If duplication happens, it overwrites by the second one (default), or kept original (``nooverwrite``). >>> t0 = [datetime.datetime(2008, 12, 1), ... datetime.datetime(2008, 12, 3), ... datetime.datetime(2008, 12, 5)] >>> d0 = [1.5, 2.5, 3.5] >>> tser0 = ObjectSeries(t0, d0) >>> print(len(tser0)) 3 >>> t1 = [datetime.datetime(2008, 12, 5), ... datetime.datetime(2008, 12, 8), ... datetime.datetime(2008, 12, 9)] >>> d1 = [4.5, 6.5, 9.5] >>> tser1 = ObjectSeries(t1, d1) >>> print(len(tser1)) 3 Concatenate! Note that ther is one duplication. Latter (tser1) prioritized. >>> tser2 = tser0.concatenate(tser1) >>> print(len(tser2)) 5 >>> t2, d2 = tser2.get_data() >>> from pprint import pprint >>> pprint(t2) (datetime.datetime(2008, 12, 1, 0, 0), datetime.datetime(2008, 12, 3, 0, 0), datetime.datetime(2008, 12, 5, 0, 0), datetime.datetime(2008, 12, 8, 0, 0), datetime.datetime(2008, 12, 9, 0, 0)) >>> print(d2) (1.5, 2.5, 4.5, 6.5, 9.5) Or, tser0 could be prioritzed. >>> tser3 = tser0.concatenate(tser1, strategy='nooverwrite') >>> print(tser3.nearest(datetime.datetime(2008, 12, 5))[1]) 3.5 ''' t0, d0 = self.get_data() t1, d1 = other.get_data() if strategy == 'overwrite': tlist = t0 + t1 dlist = d0 + d1 elif strategy == 'nooverwrite': tlist = t1 + t0 dlist = d1 + d0 else: raise ValueError('No strategy %s defined.' % strategy) return ObjectSeries(tlist, dlist)
[docs] def iter(self): return _iter_objser(self)
class _iter_objser: def __init__(self, iter_obj): self._cur_idx = 0 self._iter_obj = iter_obj def __iter__(self): return self def __next__(self): try: rett, retval = self._iter_obj.times[self._cur_idx], self._iter_obj.data[self._cur_idx] self._cur_idx += 1 return rett, retval except IndexError: raise StopIteration() raise StopIteration()
[docs]class ScalarSeries(_TimeSeriesABC): ''' Represents time series of floating point scalar data. :param time_array: Sequence of time in ``datetime.datetime`` objects. :param data_array: Sequence of data in ``float``. Internally, the time is in ``matplotlib.dates`` number; however, most of the methods use ``datetime.datetime`` object as interface. Sample follows. First, prepare the list of the time and corresponding data. >>> tlist = [datetime.datetime(2009, 1, 10, 12), ... datetime.datetime(2009, 1, 10, 8), ... datetime.datetime(2009, 1, 10, 4), ... datetime.datetime(2009, 1, 10, 16), ... datetime.datetime(2009, 1, 10, 0),] >>> dlist = [0, 1, 2, 3, 4] Then, instance the ScalarSeries object. >>> ts = ScalarSeries(tlist, dlist) >>> ts.shape (2, 5) >>> len(ts) 5 >>> tlist1, dlist1 = ts.get_data() >>> print(dlist1) # doctest: +NORMALIZE_WHITESPACE [4. 2. 1. 0. 3.] If the time array has duplication, the last data is validated. In this case, the created ScalarSeries length will be 2, not 3. >>> tlist2 = [datetime.datetime(2009, 2, 5, 12), ... datetime.datetime(2009, 2, 5, 13), ... datetime.datetime(2009, 2, 5, 13),] >>> dlist2 = [100, 200, 500] >>> ts2 = ScalarSeries(tlist2, dlist2) >>> ts2.shape (2, 2) >>> avg = ts2.get_average(ts2.t0(), ts2.t1()) >>> print(avg) 300.0 The coverage of the data can be get via methods of :meth:`t0` and :meth:`t1`. >>> print(ts.t0()) 2009-01-10 00:00:00 >>> print(ts.t1(fmt="mdates")) 733417.6666666666 To get the data, you can use :meth:`get_data` method. :meth:`get_average` and :meth:`get_moment` methods will calculate the average and variance of the given data. To get the data with inter (and extra) polation, use :meth:`spline_interpolate` method. :meth:`concatenate` method will concatenate the other given :class:`ScalarSeries` object. .. todo:: Zero size ScalarSeries should be implemented. However, to do this you may need to separate the data array? TBC. ''' def __init__(self, time_array, data_array): if len(time_array) != len(data_array): raise ValueError('Length of time (%d) != data (%d)' % (len(time_array), len(data_array))) # Sorting. datadict = {} for t, d in zip(time_array, data_array): datadict[t] = d data = [] for t in sorted(datadict.keys()): data.append([_utc.date2num(t), datadict[t]]) self.data = np.array(data).T self.shape = self.data.shape self.cache = {}
[docs] def t0(self, fmt=None): ''' Returns the first data. :keyword fmt: If ``"mdates"`` is specified, the corresponding object is returned. :returns: The earliest time. :rtype: ``datetime.datetime`` or ``float``. ''' if fmt == "mdates": return self.data[0, 0] else: return _utc.num2date(self.data[0, 0]).replace(tzinfo=None)
[docs] def t1(self, fmt=None): ''' Returns the first data. :keyword fmt: If ``"mdates"`` is specified, the corresponding object is returned. :returns: The latest time. :rtype: ``datetime.datetime`` or ``float``. ''' if fmt == "mdates": return self.data[0, -1] else: return _utc.num2date(self.data[0, -1]).replace(tzinfo=None)
[docs] def getobstime(self): """ Return the observation time. :return: List of observation time. >>> scalar_series = ScalarSeries.sample_data() >>> print(len(scalar_series)) 8 >>> tlist = scalar_series.getobstime() >>> print(len(tlist)) 8 >>> print(tlist[3]) 2009-01-10 08:00:00 """ return [_utc.num2date(self.data[0, i]).replace(tzinfo=None) for i in range(len(self))]
[docs] def get_data(self, t0=None, t1=None): ''' Return the registered data. :keyword t0: Start time (inclusive). Default is :meth:`t0`. :type t0: ``datetime.datetime`` :keyword t1: End time (inclusive). Default is :meth:`t1`. :type t1: ``datetime.datetime`` :returns: (``tlist``, ``dlist``) where ``tlist`` is the list of the time as in ``datetime`` instance, and dlist is the data as in ``ndarray`` instance. >>> tlist = dtrange(datetime.datetime(2009, 1, 30), ... datetime.datetime(2009, 1, 31), ... datetime.timedelta(hours=1)) >>> data = [i ** 2 for i in range(len(tlist))] >>> ts = ScalarSeries(tlist, data) >>> tlist2, data2 = ts.get_data( ... datetime.datetime(2009, 1, 30, 11, 30), ... datetime.datetime(2009, 1, 30, 13, 0)) >>> print(len(tlist2), len(data2)) 2 2 >>> print(tlist2[0], tlist2[1]) 2009-01-30 12:00:00 2009-01-30 13:00:00 >>> print('%.0f %.0f' % (data2[0], data2[1])) 144 169 ''' if t0 is None: t0 = self.t0() if t1 is None: t1 = self.t1() tn0 = _utc.date2num(t0) tn1 = _utc.date2num(t1) tlist = self.data[0, :] dlist = self.data[1, :] tlist = np.ma.masked_outside(tlist, tn0, tn1, copy=True) dlist = np.ma.masked_array(dlist, tlist.mask) tnlist = np.ma.compressed(tlist) tnlist = [_utc.num2date(t).replace(tzinfo=None) for t in tnlist] return (tnlist, np.ma.compressed(dlist))
[docs] def get_average(self, t0=None, t1=None): ''' Return the average between t0 and t1. :param t0: Start time (inclusive) :type t0: ``datetime.datetime`` :param t1: End time (inclusive) :type t1: ``datetime.datetime`` :returns: Average. Use :meth:`get_moment` for more precise use. >>> tlist = dtrange(datetime.datetime(2009, 1, 30), ... datetime.datetime(2009, 1, 31), ... datetime.timedelta(hours=1)) >>> data = [i ** 2 for i in range(len(tlist))] >>> ts = ScalarSeries(tlist, data) The average below should be **(144+169+196)/3 = 169.667**. >>> print('%.3f' % ts.get_average( ... datetime.datetime(2009, 1, 30, 11, 30), ... datetime.datetime(2009, 1, 30, 14, 0))) 169.667 ''' if t0 is None: t0 = self.t0() if t1 is None: t1 = self.t1() tlist, dlist = self.get_data(t0, t1) return dlist.mean()
[docs] def get_moment(self, t0=None, t1=None): ''' Return the number of data, average and variance. :param t0: Start time (inclusive) :type t0: ``datetime.datetime`` :param t1: End time (inclusive) :type t1: ``datetime.datetime`` :returns: (number of data, average, variance) >>> tlist = dtrange(datetime.datetime(2009, 1, 30), ... datetime.datetime(2009, 1, 31), ... datetime.timedelta(hours=1)) >>> data = [i ** 2 for i in range(len(tlist))] >>> ts = ScalarSeries(tlist, data) The average below should be (121+144+169+196)/4 = 157.5 Variance is (36.5^2 + 13.5^2 + 11.5^2 + 38.5^2)/4 = 782.25 >>> ndat, ave, var = ts.get_moment( ... datetime.datetime(2009, 1, 30, 11, 0), ... datetime.datetime(2009, 1, 30, 14, 0)) >>> print(ndat) 4 >>> print('%.3f' % ave) 157.500 >>> print('%.3f' % var) 782.250 ''' if t0 is None: t0 = self.t0() if t1 is None: t1 = self.t1() tlist, dlist = self.get_data(t0, t1) return len(dlist), dlist.mean(), dlist.var()
def _nearest(self, t): ''' Return the nearest data. Single arg version. :param t: Datetime. ''' tn = _utc.date2num(t) idx = bisect.bisect(self.data[0, :], tn) if idx == len(self): idx = idx - 1 if idx == 0: return self.data[1, 0] tn0 = np.abs(tn - self.data[0, idx - 1]) tn1 = np.abs(tn - self.data[0, idx]) if tn0 <= tn1: return self.data[1, idx - 1] else: return self.data[1, idx]
[docs] def nearest(self, t): ''' Return the nearest data. :param t: Scalar or list of time. ''' tlist = np.array(t) if tlist.shape == (): return self._nearest(t) else: d = np.array([self._nearest(ti) for ti in tlist]) return d
[docs] def linear_interpolate(self, t): """ Linear interpolation. :param t: Time or a sequence of times. :return: Interpolated data. >>> tlist = [datetime.datetime(2009, 1, 10, 12), ... datetime.datetime(2009, 1, 10, 8), ... datetime.datetime(2009, 1, 10, 4), ... datetime.datetime(2009, 1, 10, 16), ... datetime.datetime(2009, 1, 10, 0),] >>> dlist = [1, 2, 3, 2, 2] >>> ts = ScalarSeries(tlist, dlist) >>> print(ts.linear_interpolate(datetime.datetime(2009, 1, 10, 6))) 2.5 >>> print(ts.linear_interpolate([datetime.datetime(2009, 1, 10, 6), datetime.datetime(2009, 1, 10, 14)])) [2.5 1.5] """ tn = _utc.date2num(t) from scipy.interpolate import interp1d f = interp1d(self.data[0, :], self.data[1, :]) return f(tn)
[docs] def spline_interpolate(self, t, extrapolate=False): ''' Interpolate using spline fitting. Interpolation can be done using Spline fitting. :param t: Time, sequence allowed. :type t: ``datetime.datetime`` :keyword extrapolate: Extrapolation handling. If ``True`` is set, extrapolation is allowed. If ``False`` is set (default), extrapolation is not allowed. You may also set ``datetime.timedelta`` object here to set "glues" of the data. In this case, the inter/extra-polation between (:meth:`t0` - extrapolate) and (:meth:`t1` + extrapolate) is allowed. :raises ValueError: If non-allowed extrapolation was executed. By default extrapolation is not allowed, because it may produce unrealistic values. >>> tlist = [datetime.datetime(2009, 1, 10, 12), ... datetime.datetime(2009, 1, 10, 8), ... datetime.datetime(2009, 1, 10, 4), ... datetime.datetime(2009, 1, 10, 16), ... datetime.datetime(2009, 1, 10, 0),] >>> dlist = [1, 2, 3, 2, 2] >>> ts = ScalarSeries(tlist, dlist) >>> print('%.3f' % ( ... ts.spline_interpolate(datetime.datetime(2009, 1, 10, 8)))) 2.000 >>> print('%.3f' % ( ... ts.spline_interpolate(datetime.datetime(2009, 1, 10, 6)))) 2.625 You may give a list as argumnet, too. >>> vals = ts.spline_interpolate([datetime.datetime(2009, 1, 10, 1), ... datetime.datetime(2009, 1, 10, 5), ... datetime.datetime(2009, 1, 10, 9),]) >>> print('%.3f %.3f %.3f' % (vals[0], vals[1], vals[2])) 2.547 2.859 1.672 Extrapolation is not allowed by default, but you can set extrapolation keyword to do it. >>> print('%.3f' % ( ... ts.spline_interpolate(datetime.datetime(2009, 1, 11, 0), ... extrapolate=True))) 18.000 You may also allow the extrapolation-allowed range with ``datetime.timedelta`` object. See ``scripts/timeseries_sample1`` also (TBD). >>> print('%.3f' % ( ... ts.spline_interpolate(datetime.datetime(2009, 1, 9, 0), ... extrapolate=datetime.timedelta(days=1)))) -158.000 ''' if extrapolate is True: ex0 = datetime.datetime(datetime.MINYEAR, 1, 1) ex1 = datetime.datetime(datetime.MAXYEAR, 12, 31) elif extrapolate is False: ex0 = self.t0() ex1 = self.t1() else: ex0 = self.t0() - extrapolate ex1 = self.t1() + extrapolate ex0 = _utc.date2num(ex0) ex1 = _utc.date2num(ex1) tn = _utc.date2num(t) if (np.array(tn) < ex0).any() | (np.array(tn) > ex1).any(): raise ValueError( 'Some of the given time is out of the range.\n' + 'Use extrapol keyword to avoid this exception,\n' + 'while large error may be introduced.') try: spl = self.cache['spl'] except KeyError: spl = interpolate.InterpolatedUnivariateSpline(self.data[0, :], self.data[1, :]) self.cache['spl'] = spl return spl(tn)
[docs] def clear_cache(self): ''' Clear cache (for spline) ''' self.cache = {}
[docs] def concatenate(self, other, strategy='overwrite'): ''' Concatenate other series to return a new series. :param other: The other :class:`ScalarSeries` :keyword strategy: Strategy to concatenate. Default is ``'overwrite'``, in which the same data points are overwritten by the ``other`` data. If ``'nooverwrite'`` is given, the same data points are kept as ``self`` data. :returns: New :class:`ScalarSeries` Prepare the first ScalarSeries with 3 records. >>> t0 = [datetime.datetime(2008, 12, 1), ... datetime.datetime(2008, 12, 3), ... datetime.datetime(2008, 12, 5)] >>> d0 = [1.5, 2.5, 3.5] >>> ts0 = ScalarSeries(t0, d0) Then, create the second ScalarSeries with 3 records. >>> t1 = [datetime.datetime(2008, 12, 5), ... datetime.datetime(2008, 12, 8), ... datetime.datetime(2008, 12, 9)] >>> d1 = [4.5, 6.5, 9.5] >>> ts1 = ScalarSeries(t1, d1) Concatenate them. Remember 1 record is duplicated. Therefore, only 5 records are in the new ScalarSeries. >>> ts2 = ts0.concatenate(ts1) >>> print(len(ts2)) 5 The duplicated record is overwritten by ``ts1``. >>> print(ts2.nearest(datetime.datetime(2008, 12, 5))) 4.5 If 'nooverwrite strategy is taken, ``ts0`` is used. >>> ts2 = ts0.concatenate(ts1, strategy='nooverwrite') >>> print(ts2.nearest(datetime.datetime(2008, 12, 5))) 3.5 ''' t0, d0 = self.get_data() t1, d1 = other.get_data() if strategy == 'overwrite': t = np.concatenate([t0, t1]) d = np.concatenate([d0, d1]) elif strategy == 'nooverwrite': t = np.concatenate([t1, t0]) d = np.concatenate([d1, d0]) else: raise ValueError('No strategy %s defined.' % strategy) return ScalarSeries(t, d)
def __len__(self): return len(self.data[0, :])
[docs] @classmethod def sample_data(cls): ''' Get a sample data. ''' tlist = [datetime.datetime(2009, 1, 10, 0), datetime.datetime(2009, 1, 10, 1), datetime.datetime(2009, 1, 10, 3), datetime.datetime(2009, 1, 10, 8), datetime.datetime(2009, 1, 10, 10), datetime.datetime(2009, 1, 10, 11), datetime.datetime(2009, 1, 10, 15), datetime.datetime(2009, 1, 10, 21), ] data = [0.0, 1.5, 2.8, 9.8, 9.2, 9.9, 16.3, 22.5] return ScalarSeries(tlist, data)
[docs] def clipped(self, t0, t1): '''Return clipped :class:`ScalarSeries`. :param t0: Start time :param t1: Stop time :return: clipped data between t0 and t1. :rtype: ScalarSeries >>> scalar_series = ScalarSeries.sample_data() >>> print(len(scalar_series)) 8 >>> clipped_scalar_series = scalar_series.clipped( ... datetime.datetime(2009, 1, 10, 7), ... datetime.datetime(2009, 1, 10, 15)) >>> print(len(clipped_scalar_series)) 4 >>> print(clipped_scalar_series.t0()) 2009-01-10 08:00:00 >>> print(clipped_scalar_series.t1()) 2009-01-10 15:00:00 ''' tlist, dlist = self.get_data(t0, t1) return ScalarSeries(tlist, dlist)
[docs]class Vector3dSeries(_TimeSeriesABC): ''' Time series of 3-D vector. Internally each component uses :class:`ScalarSeries`. This is a redundunt approch (time_array is repeated three times) but simple implementation wins. >>> tlist = [datetime.datetime(2009, 1, 10, 0), ... datetime.datetime(2009, 1, 10, 4), ... datetime.datetime(2009, 1, 10, 8), ... datetime.datetime(2009, 1, 10, 12), ... datetime.datetime(2009, 1, 10, 16),] >>> xlist = [0, 0.5, 1, 0.5, 0] >>> ylist = [1, 0.5, 0, -0.5, -1] >>> zlist = [0, 1, 2, 3, 4] >>> vs = Vector3dSeries(tlist, xlist, ylist, zlist) To get the data, you can use :meth:`get_data` method. >>> t0 = datetime.datetime(2009, 1, 10, 1) >>> t1 = datetime.datetime(2009, 1, 10, 8) >>> t, v = vs.get_data(t0, t1) >>> print(len(t)) 2 >>> print(t[0]) 2009-01-10 04:00:00 >>> print(t[1]) 2009-01-10 08:00:00 >>> print(v.shape) (3, 2) >>> print(v[:, 0]) # doctest: +NORMALIZE_WHITESPACE [0.5 0.5 1. ] >>> print(v[:, 1]) # doctest: +NORMALIZE_WHITESPACE [1. 0. 2.] You may also use :meth:`get_average` and :meth:`get_moment` methods for the average or moment values. >>> ave = vs.get_average(t0, t1) >>> print('%.2f %.2f %.2f' % (ave[0], ave[1], ave[2])) 0.75 0.25 1.50 >>> n, ave, var = vs.get_moment(t0, t1) >>> print(n) 2 >>> print(ave) # doctest: +NORMALIZE_WHITESPACE [0.75 0.25 1.5 ] >>> print(var) # doctest: +NORMALIZE_WHITESPACE [0.0625 0.0625 0.25 ] For interpolation, you can use :meth:`spline_interpolate`. >>> t_interpol = [t0, t1] >>> val_interpol = vs.spline_interpolate(t_interpol) >>> from irfpy.util.with_context import printoptions >>> with printoptions(precision=2, suppress=True): ... print(val_interpol.shape) ... print(val_interpol[:, 0]) # doctest: +NORMALIZE_WHITESPACE ... # print val_interpol[:, 1] (3, 2) [0.04 0.88 0.25] # [ 1. 0. 2.] .. todo:: The doctest may make non-unique solution (0. or -0.) so that I disabled the test on 2013-09-03. One has to make better test routine here. ''' def __init__(self, time_array, x_array, y_array, z_array): ''' Instance the :class:`Vector3dSeries` object :param time_array: Sequence of time in ``datetime.datetime`` objects. :param x_array: Sequence of x-component in ``float``. :param y_array: Sequence of y-component in ``float``. :param z_array: Sequence of z-component in ``float``. ''' self.x = ScalarSeries(time_array, x_array) self.y = ScalarSeries(time_array, y_array) self.z = ScalarSeries(time_array, z_array)
[docs] def spline_interpolate(self, t, extrapolate=False): ''' Spline interpolation. Returns the spline interpolation. Interpolation is component-wise. See also :meth:`ScalarSeries.spline_interpolate` for details. :param t: ``datetime.datetime`` object or sequence of it. :keyword extrapolate: Extrapolation handling. See :meth:`ScalarSeries.spline_interpolate` for details. :returns: Interpolated vector (component wise). If ``t`` is a ``datetime.datetime`` object, ``np.array`` with shape of (3, ) is returned. If ``t`` is a sequence of ``datetime.datetime`` with *N* length, ``np.array`` with shape of (3, *N*) is returned. ''' x = self.x.spline_interpolate(t, extrapolate=extrapolate) y = self.y.spline_interpolate(t, extrapolate=extrapolate) z = self.z.spline_interpolate(t, extrapolate=extrapolate) return np.array([x, y, z])
[docs] def t0(self): ''' Return the first data time ''' return self.x.t0()
[docs] def t1(self): ''' Return the last data time ''' return self.x.t1()
[docs] def getobstime(self): ''' Return the observation time. :return: List of observation time. ''' return self.x.getobstime()
[docs] def get_data(self, t0=None, t1=None): ''' Return the data in between ``t0`` and ``t1``. :param t0: Start time (inclusive) :type t0: ``datetime.datetime`` :param t1: End time (inclusive) :type t1: ``datetime.datetime`` :returns: The data in between ``t0`` and ``t1``. ``ndarray`` with the shape of (3, N) where N is the number of data. ''' t, x = self.x.get_data(t0, t1) t, y = self.y.get_data(t0, t1) t, z = self.z.get_data(t0, t1) return (t, np.array([x, y, z]))
[docs] def get_timeseries_magnitude(self): """ Return the time series of the magnitude data. :return: Time series of magnitude data. :rtype: :class:`ScalarSeries`. >>> vecseries = Vector3dSeries.sample_data() >>> print(len(vecseries)) 5 >>> t, v = vecseries.get_data() >>> magseries = vecseries.get_timeseries_magnitude() >>> t, l = magseries.get_data() >>> print(v) [[ 0. 0.5 1. 0.5 0. ] [ 1. 0.5 0. -0.5 -1. ] [ 0. 1. 2. 3. 4. ]] >>> print(l) # doctest: +NORMALIZE_WHITESPACE [1. 1.22474487 2.23606798 3.082207 4.12310563] """ tlist, xyz = self.get_data() mag = np.sqrt((xyz ** 2).sum(axis=0)) return ScalarSeries(tlist, mag)
[docs] def clipped(self, t0, t1): """ Return clipped Vector3dSeries. :param t0: Start time :param t1: Stop time :return: New Vector3dSeries >>> tser3d = Vector3dSeries.sample_data() >>> print(len(tser3d)) 5 >>> tser3d = tser3d.clipped(datetime.datetime(2009, 1, 10, 6), ... datetime.datetime(2009, 1, 10, 15)) >>> print(len(tser3d)) 2 >>> tlist, dlist = tser3d.get_data() >>> print(dlist) [[ 1. 0.5] [ 0. -0.5] [ 2. 3. ]] """ t, x = self.x.get_data(t0, t1) t, y = self.y.get_data(t0, t1) t, z = self.z.get_data(t0, t1) return Vector3dSeries(t, x, y, z)
[docs] def get_average(self, t0=None, t1=None): ''' Return the average. :param t0: Start time (inclusive) :type t0: ``datetime.datetime`` :param t1: End time (inclusive) :type t1: ``datetime.datetime`` :returns: Average :rtype: ``ndarray`` with shape of (3,) ''' if t0 is None: t0 = self.t0() if t1 is None: t1 = self.t1() x = self.x.get_average(t0, t1) y = self.y.get_average(t0, t1) z = self.z.get_average(t0, t1) return np.array([x, y, z])
[docs] def get_moment(self, t0=None, t1=None): ''' Return the moment. :param t0: Start time (inclusive) :type t0: ``datetime.datetime`` :param t1: End time (inclusive) :type t1: ``datetime.datetime`` :returns: A tuple. (number of data, average in (3,) ``ndarray``, variance in (3,) ``ndarray``) ''' if t0 is None: t0 = self.t0() if t1 is None: t1 = self.t1() xn, xa, xv = self.x.get_moment(t0, t1) yn, ya, yv = self.y.get_moment(t0, t1) zn, za, zv = self.z.get_moment(t0, t1) return (xn, np.array([xa, ya, za]), np.array([xv, yv, zv]))
[docs] def clear_cache(self): self.x.clear_cache() self.y.clear_cache() self.z.clear_cache()
[docs] def nearest(self, t): x = self.x.nearest(t) y = self.y.nearest(t) z = self.z.nearest(t) return np.array([x, y, z])
[docs] def concatenate(self, other, strategy='overwrite'): ''' Concatenate other series to return a new series. :param other: The other :class:`Vector3dSeries` :keyword strategy: Strategy to concatenate. Default is ``'overwrite'``, in which the same data points are overwritten by the ``other`` data. If ``'nooverwrite'`` is given, the same data points are kept as ``self`` data. :returns: New :class:`VectorSeries` >>> t0 = dtrange(datetime.datetime(2005, 1, 1), ... datetime.datetime(2005, 1, 2), ... datetime.timedelta(hours=6)) # 4 elements >>> print(len(t0)) 4 >>> x0 = [0, 1, 2, 3] >>> y0 = [0, -1, -2, -3] >>> z0 = [0, 0, 0, 0] >>> v0 = Vector3dSeries(t0, x0, y0, z0) >>> t1 = dtrange(datetime.datetime(2005, 1, 2), ... datetime.datetime(2005, 1, 2, 12), ... datetime.timedelta(hours=4)) # 3 elements >>> x1 = [4, 5, 6] >>> y1 = [-2, -1, 0] >>> z1 = [1, 1, 1] >>> v1 = Vector3dSeries(t1, x1, y1, z1) >>> v2 = v0.concatenate(v1) >>> print(len(v2)) 7 >>> t2, d2 = v2.get_data() >>> print(d2.shape) (3, 7) >>> print(d2[0, :]) # doctest: +NORMALIZE_WHITESPACE [0. 1. 2. 3. 4. 5. 6.] >>> print(d2[1, :]) [ 0. -1. -2. -3. -2. -1. 0.] >>> print(d2[2, :]) # doctest: +NORMALIZE_WHITESPACE [0. 0. 0. 0. 1. 1. 1.] ''' x = self.x.concatenate(other.x, strategy=strategy) y = self.y.concatenate(other.y, strategy=strategy) z = self.z.concatenate(other.z, strategy=strategy) t, x = x.get_data() t, y = y.get_data() t, z = z.get_data() return Vector3dSeries(t, x, y, z)
def __len__(self): return len(self.x)
[docs] @classmethod def sample_data(cls): ''' Return a sample data. ''' tlist = [datetime.datetime(2009, 1, 10, 0), datetime.datetime(2009, 1, 10, 4), datetime.datetime(2009, 1, 10, 8), datetime.datetime(2009, 1, 10, 12), datetime.datetime(2009, 1, 10, 16), ] xlist = [0, 0.5, 1, 0.5, 0] ylist = [1, 0.5, 0, -0.5, -1] zlist = [0, 1, 2, 3, 4] return Vector3dSeries(tlist, xlist, ylist, zlist)
def __str__(self): s = '<irfpy.util.timeseries.Vector3dSeries: Len={l:d} T0={t0:%FT%T} T1={t1:%FT%T}>'.format( l=len(self), t0=self.t0(), t1=self.t1()) return s
[docs]class TimeStepScalarField3D(_TimeSeriesABC): r""" Time series of scalar field. This class extends the functionality of :class:`irfpy.util.fields.ScalarField3D` class and their derivatives in order to support the time series of fields. .. note:: This contains several :class:`ScalarField3D`, so that the system should sometimes have a very large memory. No memory handling is supported. This means that all the data is on memory. *Sample* Let's look at a scalar field, i.e. gravity potential, produced by a moving point of mass: :math:`(x_m=vt, 0, 0)`. The gravity potential is obtained from :class:`irfpy.util.fields.GravityPotential` class. First, just create the potential field at ``t=0``, i.e. ``x_m=0``. The mass is taken as :math:`M=\frac{1}{6.67408\times 10^{-11}}`, then, :math:`GM=1`. Note that the time should be ``datetime.datetime`` object. Thus, a dummy time (with respective to a given epoch). >>> import datetime >>> t0 = datetime.datetime(2010, 1, 1, 0, 0, 0) # An epoch. This case, it is a dummy time. .. math:: \phi(r) = -\frac{GM}{r} = -\frac{1}{r} >>> import irfpy.util.fields >>> m = 1 / 6.67408e-11 >>> potential_t0 = irfpy.util.fields.GravityPotential(m, center=(0, 0, 0)) Just to confirm if it works, check the potential at ``(0, 4, 0)``. >>> print(potential_t0([0, 4, 0])) -0.25 Then, as time goes, the point of mass moves. Here I take ``v=2.5``. >>> v=2.5 At time ``t=1``, the position of the mass is at ``x_m = v (=2.5)``. >>> t=1 >>> xm_1 = (v * t, 0, 0) >>> potential_t1 = irfpy.util.fields.GravityPotential(m, center=xm_1) Now the observer at (0, 4, 0) see different potential, the distance is now :math:`|(2.5, 4.0, 0)|=4.717` and the potential is :math:`\phi(r, t=1)=-1/r=-0.212`. >>> print('{:.3f}'.format(potential_t1([0, 4, 0]))) -0.212 At time ``t=3``, the position moves to ``x_m = 3v = 7.5``. >>> t=3 >>> xm_3 = (v * t, 0, 0) >>> potential_t3 = irfpy.util.fields.GravityPotential(m, center=xm_3) >>> print('{:.3f}'.format(potential_t3([0, 4, 0]))) -0.118 A bit more time steps are added as follows. >>> si = (0, 1, 3, 5, 8) >>> ti = [t0 + datetime.timedelta(seconds=_t) for _t in si] >>> potential_list = [] >>> for _t in si: ... xm_t = (v * _t, 0, 0) ... potential_t = irfpy.util.fields.GravityPotential(m, center=xm_t) ... potential_list.append(potential_t) Now it is time to create a :class:`TimeStepScalarField`. >>> tsfield = TimeStepScalarField3D(ti, potential_list) First, you can get the field directly. The following is the same as >>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=0)))) # At (0, 4, 0) at t=0 -0.250 >>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=1)))) # At (0, 4, 0) at t=1 -0.212 >>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=3)))) # At (0, 4, 0) at t=3 -0.118 Timewise, the linear interpolated value is returned. Spacewise, the default interpolation of the given field (irfpy.util.fields.ScalarField3D) is used. >>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=2)))) # At (0, 4, 0) at t=2 is not given, but the returned is linear interpolation. -0.165 >>> print(tsfield.index_of_time(t0)) 0 >>> print(tsfield.index_of_time(t0 + datetime.timedelta(seconds=4))) 2 >>> print(tsfield.index_of_time(t0 + datetime.timedelta(seconds=9))) 4 """ def __init__(self, time_list, field_list): if len(time_list) != len(field_list): raise ValueError('The given time_list and field_list does not have same length.') datadict = {} for _t, _f in zip(time_list, field_list): datadict[_t] = _f self._tlist = [] # List is in the ascending order self._flist = [] # Index is corresponding to tlist for t in sorted(datadict.keys()): self._tlist.append(t) self._flist.append(datadict[t]) self._time2index = _tsdb.DB() for _i, _t in enumerate(self._tlist): self._time2index.append('{:d}'.format(_i), _t)
[docs] def t0(self): return self._tlist[0]
[docs] def t1(self): return self._tlist[1]
[docs] def getobstime(self): return self._tlist.copy()
[docs] def get_data(self, t0=None, t1=None): if t0 is None: t0 = self.t0() if t1 is None: t1 = self.t1() new_tlist = [] new_flist = [] for _t, _f in zip(self._tlist, self._flist): if _t >= t0 and _t <= t1: new_tlist.append(_t) new_flist.append(_f) return new_tlist, new_flist
[docs] def clipped(self, t0, t1): tlist, dlist = self.get_data(t0, t1) return TimeStepScalarField3D(tlist, dlist)
[docs] def index_of_time(self, t): return int(self._time2index.get(t))
def __call__(self, xyz, tt): # First, identify the corresponding field i0 = self.index_of_time(tt) t0 = self._tlist[i0] f0 = self._flist[i0] try: t1 = self._tlist[i0 + 1] except IndexError: return ValueError('The given time is out of bounds.') f1 = self._flist[i0 + 1] val0 = f0(xyz) val1 = f1(xyz) dt0 = (tt - t0).total_seconds() dt1 = (t1 - tt).total_seconds() gamma = dt0 / (dt0 + dt1) return val0 + (val1 - val0) * gamma
[docs]def dtrange(t0, t1, delta): ''' Return a regularly separated list of ``datatime`` objects. :param t0: Start :type t0: ``datetime.datetime`` :param t1: Stop (not inclusive) :type t1: ``datetime.datetime`` :param delta: Delta :type delta: ``datetime.timedelta``. :returns: List of ``datetime.datetime``. >>> r0 = dtrange(datetime.datetime(2009, 1, 1), ... datetime.datetime(2009, 1, 2), ... datetime.timedelta(hours=1)) >>> print(len(r0)) 24 >>> print(r0[0]) 2009-01-01 00:00:00 >>> print(r0[-1]) 2009-01-01 23:00:00 ''' tlist = [] t = t0 while t < t1: tlist.append(t) t = t + delta return tlist