Source code for irfpy.util.timeseries

''' Time sereis related classes.

.. codeauthor:: Yoshifumi Futaana

.. autosummary::

    ScalarSeries
    Vector3dSeries
    dtrange

These classes stores a time series of scalar / vector values.
For example, these may be used to represent a time profile of
the plasma density, or a time series of magnetic field vector.

.. note::

    There is similar class, :class:`irfpy.util.julday.JdSeries`.
    The :class:`irfpy.util.julday.JdSeries` class is slow and not
    easy to handle.
    Thus, developers may consider using :class:`ScalarSeries`
    and :class:`Vector3dSeries`.

.. note::

    The :class:`ScalarSeries` only handles ``float`` data.
    :class:`Vector3dSeries` only treats 3-D vector of ``float``.
    Internally they use ``ndarray``, thus they are quicker and more flexible.
'''
import datetime

import numpy as np
from scipy import interpolate
import matplotlib.dates as mdates

import bisect
import abc

from irfpy.util import timeseriesdb as _tsdb

from irfpy.util import exception as _ex


[docs]class DataNotFound(_ex.IrfpyException): ''' Exception raised when the data is not found. ''' def __init__(self, value): self.value = value def __str__(self): return repr(self.value)
class _TimeSeriesABC(metaclass=abc.ABCMeta): @abc.abstractmethod def t0(self): """ Return the time of the first data :returns: The earliest time. :rtype: ``datetime.datetime`` """ pass @abc.abstractmethod def t1(self): """ Return the time of the last data. :returns: The latest time. :rtype: ``datetime.datetime`` """ pass @abc.abstractmethod def getobstime(self): """ Return the list of the observation time. :return: List of the observation time. """ pass @abc.abstractmethod def get_data(self, t0, t1): ''' Return the registered data. :keyword t0: Start time (inclusive). :type t0: ``datetime.datetime`` :keyword t1: End time (inclusive). :type t1: ``datetime.datetime`` :returns: (``tlist``, ``dlist``) where ``tlist`` is the list of the time as in ``datetime`` instance, and dlist is the data as in ``ndarray`` instance. ''' pass @abc.abstractmethod def clipped(self, t0, t1): ''' Return clipped data. :param t0: Start time (inclusive) :param t1: Stop time (inclusive) :return: Clipped object. It should be the same class as ``self``. ''' pass
[docs]class ObjectSeries(_TimeSeriesABC): ''' Represents time series of any object. :param time_array: Sequence of time in ``datetime.dattime`` objects :param data_array: Sequecne of data in any format. Represent time-tagged data. If the data is floating point, :class:`ScalarSeries` can be used. If the data is 3-D real floating, :class:`VectorSeries` can be used. This class is for arbitrary object. ''' def __init__(self, time_array, data_sequence): """ Initialize the series. :param time_array: Array of time. :param data_sequence: Array of data. >>> tlist = [datetime.datetime(2001, 1, 10, 3), ... datetime.datetime(2001, 1, 10, 1), ... datetime.datetime(2001, 1, 10, 7), ... datetime.datetime(2001, 1, 10, 18), ... datetime.datetime(2001, 1, 10, 2)] >>> print(len(tlist)) 5 >>> dlist = [(3,), (1,), (7,), (18,), (2,)] >>> tser = ObjectSeries(tlist, dlist) >>> tlist2 = tser.times >>> from pprint import pprint >>> pprint(tlist2) [datetime.datetime(2001, 1, 10, 1, 0), datetime.datetime(2001, 1, 10, 2, 0), datetime.datetime(2001, 1, 10, 3, 0), datetime.datetime(2001, 1, 10, 7, 0), datetime.datetime(2001, 1, 10, 18, 0)] >>> dlist2 = tser.data >>> print(dlist2) [(1,), (2,), (3,), (7,), (18,)] >>> pprint(tser.get_data()) ((datetime.datetime(2001, 1, 10, 1, 0), datetime.datetime(2001, 1, 10, 2, 0), datetime.datetime(2001, 1, 10, 3, 0), datetime.datetime(2001, 1, 10, 7, 0), datetime.datetime(2001, 1, 10, 18, 0)), ((1,), (2,), (3,), (7,), (18,))) >>> pprint(tser.get_data(datetime.datetime(2001, 1, 10, 2), datetime.datetime(2001, 1, 10, 7))) ((datetime.datetime(2001, 1, 10, 2, 0), datetime.datetime(2001, 1, 10, 3, 0), datetime.datetime(2001, 1, 10, 7, 0)), ((2,), (3,), (7,))) >>> pprint(tser.get_data(datetime.datetime(2030, 1, 1), datetime.datetime(2040, 1, 1))) ((), ()) >>> for t, d in tser.iter(): ... print(t, d) 2001-01-10 01:00:00 (1,) 2001-01-10 02:00:00 (2,) 2001-01-10 03:00:00 (3,) 2001-01-10 07:00:00 (7,) 2001-01-10 18:00:00 (18,) >>> tser2 = tser.clipped(datetime.datetime(2001, 1, 10, 2), datetime.datetime(2001, 1, 10, 7)) >>> print(len(tser2)) 3 >>> pprint(tser2.get_data()) ((datetime.datetime(2001, 1, 10, 2, 0), datetime.datetime(2001, 1, 10, 3, 0), datetime.datetime(2001, 1, 10, 7, 0)), ((2,), (3,), (7,))) >>> tser3 = tser.clipped(datetime.datetime(2030, 10, 1), datetime.datetime(2010, 10, 1)) >>> print(len(tser3)) 0 """ if len(time_array) != len(data_sequence): raise DataNotFound('Length of time (%d) != data (%d)') datadict = {} for t, d in zip(time_array, data_sequence): datadict[t] = d self.times = sorted(datadict.keys()) self.data = [datadict[t] for t in self.times]
[docs] def t0(self): try: t = self.times[0] except IndexError: raise _ex.IrfpyException('No data contained.') return t
[docs] def t1(self): try: t = self.times[-1] except IndexError: raise _ex.IrfpyException("No data contained.") return t
[docs] def getobstime(self): return self.times[:]
[docs] def get_data(self, t0=None, t1=None): if t0 is None: t0 = self.t0() if t1 is None: t1 = self.t1() tlist = np.array(self.times, copy=True) t_later_t0 = tlist >= t0 t_earlier_t1 = tlist <= t1 t_match = (t_later_t0 & t_earlier_t1) idx = np.where(t_match)[0] t_new = tuple([self.times[_i] for _i in idx]) d_new = tuple([self.data[_i] for _i in idx]) return (t_new, d_new)
[docs] def clipped(self, t0, t1): tlist, dlist = self.get_data(t0, t1) return ObjectSeries(tlist, dlist)
def __len__(self): return len(self.times)
[docs] def at(self, t): """ Return the data at the time of ``t``. The data at the time of ``t``. The time should be exact. Usually you can get the time of the data by :meth:`ObjectSeries.getobstime` method. If not data is available at ``t``, :class:`DataNotFound` error is raised. """ try: idx = self.times.index(t) return self.data[idx] except ValueError: raise DataNotFound('Data not found for time {}.'.format(t))
[docs] def nextof(self, t): """ Return the data next of ``t``. The next data time should be ``tdata > t``. :param t: Time :return: A tuple, (tnext, dnext) :exception: If the next data is not available, :class:`DataNotFound` error is raised. >>> tlist = [datetime.datetime(2001, 1, 10, 3), ... datetime.datetime(2001, 1, 10, 1), ... datetime.datetime(2001, 1, 10, 7), ... datetime.datetime(2001, 1, 10, 18), ... datetime.datetime(2001, 1, 10, 2)] >>> print(len(tlist)) 5 >>> dlist = [(3,), (1,), (7,), (18,), (2,)] >>> tser = ObjectSeries(tlist, dlist) >>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 0)) >>> print(t, d) 2001-01-10 01:00:00 (1,) >>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 3)) >>> print(t, d) 2001-01-10 07:00:00 (7,) >>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 17)) >>> print(t, d) 2001-01-10 18:00:00 (18,) >>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 18)) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... timeseries.DataNotFound: >>> t, d = tser.nextof(datetime.datetime(2001, 1, 10, 20)) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... timeseries.DataNotFound: """ tlist = self.times idx = bisect.bisect_right(tlist, t) if idx >= len(tlist): raise DataNotFound('Data next of {:%FT%T} is not in the ObjectSeries.'.format(t)) return tlist[idx], self.data[idx]
[docs] def previousof(self, t): """ Return the data previous of ``t``. The previous data time should be ``tdata < t``. :param t: Time :return: A tuple, (tnext, dnext) :exception: If the previous data is not available, :class:`DataNotFound` error is raised. >>> tlist = [datetime.datetime(2001, 1, 10, 3), ... datetime.datetime(2001, 1, 10, 1), ... datetime.datetime(2001, 1, 10, 7), ... datetime.datetime(2001, 1, 10, 18), ... datetime.datetime(2001, 1, 10, 2)] >>> print(len(tlist)) 5 >>> dlist = [(3,), (1,), (7,), (18,), (2,)] >>> tser = ObjectSeries(tlist, dlist) >>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 20)) >>> print(t, d) 2001-01-10 18:00:00 (18,) >>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 18)) >>> print(t, d) 2001-01-10 07:00:00 (7,) >>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 2)) >>> print(t, d) 2001-01-10 01:00:00 (1,) >>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 1)) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... timeseries.DataNotFound: >>> t, d = tser.previousof(datetime.datetime(2001, 1, 10, 0)) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... timeseries.DataNotFound: """ tlist = self.times idx = bisect.bisect_left(tlist, t) if idx == 0: raise DataNotFound('Data previous of {:%FT%T} is not in the ObjectSeries.'.format(t)) return tlist[idx - 1], self.data[idx - 1]
[docs] def nearest(self, t): """ Return the nearest data. :param t: :return: """ try: te, de = self.nearest_no_later(t) # Exception handling needed! except DataNotFound: te, de = datetime.datetime.min, None try: tl, dl = self.nearest_no_earlier(t) except DataNotFound: tl, dl = datetime.datetime.max, None dte = (t - te).total_seconds() dtl = (tl - t).total_seconds() if dte <= dtl: return te, de else: return tl, dl
[docs] def nearest_no_later(self, t): """ Return the data point observed earlier than or equal to ``t``, but the latest one. :param t: :return: """ tlist = self.times idx = bisect.bisect(tlist, t) if idx == 0: raise DataNotFound('Time {} is earlier than the dataset. t0={}'.format(t, self.t0())) idx = idx - 1 return self.times[idx], self.data[idx]
nearest_later_or_at = nearest_no_later
[docs] def nearest_no_earlier(self, t): """ Return the data point observed later than or equal to ``t``, but the earliest one. :param t: :return: """ tlist = self.times idx = bisect.bisect_left(tlist, t) if idx == len(tlist): raise DataNotFound('Time {} is later than the dataset. t1={}'.format(t, self.t1())) return self.times[idx], self.data[idx]
nearest_earlier_or_at = nearest_no_earlier
[docs] def concatenate(self, other, strategy='overwrite'): ''' Concatenate other series to return a new series. :param other: The other :class:`ObjectSeries` :keyword strategy: Strategy to concatenate. Default is ``'overwrite'``, in which the same data points are overwritten by the ``other`` data. If ``'nooverwrite'`` is given, the same data points are kept as ``self`` data. :returns: New :class:`OtherSeries` Concatenate two series. If duplication happens, it overwrites by the second one (default), or kept original (``nooverwrite``). >>> t0 = [datetime.datetime(2008, 12, 1), ... datetime.datetime(2008, 12, 3), ... datetime.datetime(2008, 12, 5)] >>> d0 = [1.5, 2.5, 3.5] >>> tser0 = ObjectSeries(t0, d0) >>> print(len(tser0)) 3 >>> t1 = [datetime.datetime(2008, 12, 5), ... datetime.datetime(2008, 12, 8), ... datetime.datetime(2008, 12, 9)] >>> d1 = [4.5, 6.5, 9.5] >>> tser1 = ObjectSeries(t1, d1) >>> print(len(tser1)) 3 Concatenate! Note that ther is one duplication. Latter (tser1) prioritized. >>> tser2 = tser0.concatenate(tser1) >>> print(len(tser2)) 5 >>> t2, d2 = tser2.get_data() >>> from pprint import pprint >>> pprint(t2) (datetime.datetime(2008, 12, 1, 0, 0), datetime.datetime(2008, 12, 3, 0, 0), datetime.datetime(2008, 12, 5, 0, 0), datetime.datetime(2008, 12, 8, 0, 0), datetime.datetime(2008, 12, 9, 0, 0)) >>> print(d2) (1.5, 2.5, 4.5, 6.5, 9.5) Or, tser0 could be prioritzed. >>> tser3 = tser0.concatenate(tser1, strategy='nooverwrite') >>> print(tser3.nearest(datetime.datetime(2008, 12, 5))[1]) 3.5 ''' t0, d0 = self.get_data() t1, d1 = other.get_data() if strategy == 'overwrite': tlist = t0 + t1 dlist = d0 + d1 elif strategy == 'nooverwrite': tlist = t1 + t0 dlist = d1 + d0 else: raise ValueError('No strategy %s defined.' % strategy) return ObjectSeries(tlist, dlist)
[docs] def iter(self): return _iter_objser(self)
class _iter_objser: def __init__(self, iter_obj): self._cur_idx = 0 self._iter_obj = iter_obj def __iter__(self): return self def __next__(self): try: rett, retval = self._iter_obj.times[self._cur_idx], self._iter_obj.data[self._cur_idx] self._cur_idx += 1 return rett, retval except IndexError: raise StopIteration() raise StopIteration()
[docs]class ScalarSeries(_TimeSeriesABC): ''' Represents time series of floating point scalar data. :param time_array: Sequence of time in ``datetime.datetime`` objects. :param data_array: Sequence of data in ``float``. Internally, the time is in ``matplotlib.dates`` number; however, most of the methods use ``datetime.datetime`` object as interface. Sample follows. First, prepare the list of the time and corresponding data. >>> tlist = [datetime.datetime(2009, 1, 10, 12), ... datetime.datetime(2009, 1, 10, 8), ... datetime.datetime(2009, 1, 10, 4), ... datetime.datetime(2009, 1, 10, 16), ... datetime.datetime(2009, 1, 10, 0),] >>> dlist = [0, 1, 2, 3, 4] Then, instance the ScalarSeries object. >>> ts = ScalarSeries(tlist, dlist) >>> ts.shape (2, 5) >>> len(ts) 5 >>> tlist1, dlist1 = ts.get_data() >>> print(dlist1) # doctest: +NORMALIZE_WHITESPACE [4. 2. 1. 0. 3.] If the time array has duplication, the last data is validated. In this case, the created ScalarSeries length will be 2, not 3. >>> tlist2 = [datetime.datetime(2009, 2, 5, 12), ... datetime.datetime(2009, 2, 5, 13), ... datetime.datetime(2009, 2, 5, 13),] >>> dlist2 = [100, 200, 500] >>> ts2 = ScalarSeries(tlist2, dlist2) >>> ts2.shape (2, 2) >>> avg = ts2.get_average(ts2.t0(), ts2.t1()) >>> print(avg) 300.0 The coverage of the data can be get via methods of :meth:`t0` and :meth:`t1`. >>> print(ts.t0()) 2009-01-10 00:00:00 >>> print(ts.t1(fmt="mdates")) 733417.6666666666 To get the data, you can use :meth:`get_data` method. :meth:`get_average` and :meth:`get_moment` methods will calculate the average and variance of the given data. To get the data with inter (and extra) polation, use :meth:`spline_interpolate` method. :meth:`concatenate` method will concatenate the other given :class:`ScalarSeries` object. .. todo:: Zero size ScalarSeries should be implemented. However, to do this you may need to separate the data array? TBC. ''' def __init__(self, time_array, data_array): if len(time_array) != len(data_array): raise ValueError('Length of time (%d) != data (%d)' % (len(time_array), len(data_array))) # Sorting. datadict = {} for t, d in zip(time_array, data_array): datadict[t] = d data = [] for t in sorted(datadict.keys()): data.append([mdates.date2num(t), datadict[t]]) self.data = np.array(data).T self.shape = self.data.shape self.cache = {}
[docs] def t0(self, fmt=None): ''' Returns the first data. :keyword fmt: If ``"mdates"`` is specified, the corresponding object is returned. :returns: The earliest time. :rtype: ``datetime.datetime`` or ``float``. ''' if fmt == "mdates": return self.data[0, 0] else: return mdates.num2date(self.data[0, 0]).replace(tzinfo=None)
[docs] def t1(self, fmt=None): ''' Returns the first data. :keyword fmt: If ``"mdates"`` is specified, the corresponding object is returned. :returns: The latest time. :rtype: ``datetime.datetime`` or ``float``. ''' if fmt == "mdates": return self.data[0, -1] else: return mdates.num2date(self.data[0, -1]).replace(tzinfo=None)
[docs] def getobstime(self): """ Return the observation time. :return: List of observation time. >>> scalar_series = ScalarSeries.sample_data() >>> print(len(scalar_series)) 8 >>> tlist = scalar_series.getobstime() >>> print(len(tlist)) 8 >>> print(tlist[3]) 2009-01-10 08:00:00 """ return [mdates.num2date(self.data[0, i]).replace(tzinfo=None) for i in range(len(self))]
[docs] def get_data(self, t0=None, t1=None): ''' Return the registered data. :keyword t0: Start time (inclusive). Default is :meth:`t0`. :type t0: ``datetime.datetime`` :keyword t1: End time (inclusive). Default is :meth:`t1`. :type t1: ``datetime.datetime`` :returns: (``tlist``, ``dlist``) where ``tlist`` is the list of the time as in ``datetime`` instance, and dlist is the data as in ``ndarray`` instance. >>> tlist = dtrange(datetime.datetime(2009, 1, 30), ... datetime.datetime(2009, 1, 31), ... datetime.timedelta(hours=1)) >>> data = [i ** 2 for i in range(len(tlist))] >>> ts = ScalarSeries(tlist, data) >>> tlist2, data2 = ts.get_data( ... datetime.datetime(2009, 1, 30, 11, 30), ... datetime.datetime(2009, 1, 30, 13, 0)) >>> print(len(tlist2), len(data2)) 2 2 >>> print(tlist2[0], tlist2[1]) 2009-01-30 12:00:00 2009-01-30 13:00:00 >>> print('%.0f %.0f' % (data2[0], data2[1])) 144 169 ''' if t0 is None: t0 = self.t0() if t1 is None: t1 = self.t1() tn0 = mdates.date2num(t0) tn1 = mdates.date2num(t1) tlist = self.data[0, :] dlist = self.data[1, :] tlist = np.ma.masked_outside(tlist, tn0, tn1, copy=True) dlist = np.ma.masked_array(dlist, tlist.mask) tnlist = np.ma.compressed(tlist) tnlist = [mdates.num2date(t).replace(tzinfo=None) for t in tnlist] return (tnlist, np.ma.compressed(dlist))
[docs] def get_average(self, t0=None, t1=None): ''' Return the average between t0 and t1. :param t0: Start time (inclusive) :type t0: ``datetime.datetime`` :param t1: End time (inclusive) :type t1: ``datetime.datetime`` :returns: Average. Use :meth:`get_moment` for more precise use. >>> tlist = dtrange(datetime.datetime(2009, 1, 30), ... datetime.datetime(2009, 1, 31), ... datetime.timedelta(hours=1)) >>> data = [i ** 2 for i in range(len(tlist))] >>> ts = ScalarSeries(tlist, data) The average below should be **(144+169+196)/3 = 169.667**. >>> print('%.3f' % ts.get_average( ... datetime.datetime(2009, 1, 30, 11, 30), ... datetime.datetime(2009, 1, 30, 14, 0))) 169.667 ''' if t0 is None: t0 = self.t0() if t1 is None: t1 = self.t1() tlist, dlist = self.get_data(t0, t1) return dlist.mean()
[docs] def get_moment(self, t0=None, t1=None): ''' Return the number of data, average and variance. :param t0: Start time (inclusive) :type t0: ``datetime.datetime`` :param t1: End time (inclusive) :type t1: ``datetime.datetime`` :returns: (number of data, average, variance) >>> tlist = dtrange(datetime.datetime(2009, 1, 30), ... datetime.datetime(2009, 1, 31), ... datetime.timedelta(hours=1)) >>> data = [i ** 2 for i in range(len(tlist))] >>> ts = ScalarSeries(tlist, data) The average below should be (121+144+169+196)/4 = 157.5 Variance is (36.5^2 + 13.5^2 + 11.5^2 + 38.5^2)/4 = 782.25 >>> ndat, ave, var = ts.get_moment( ... datetime.datetime(2009, 1, 30, 11, 0), ... datetime.datetime(2009, 1, 30, 14, 0)) >>> print(ndat) 4 >>> print('%.3f' % ave) 157.500 >>> print('%.3f' % var) 782.250 ''' if t0 is None: t0 = self.t0() if t1 is None: t1 = self.t1() tlist, dlist = self.get_data(t0, t1) return len(dlist), dlist.mean(), dlist.var()
def _nearest(self, t): ''' Return the nearest data. Single arg version. :param t: Datetime. ''' tn = mdates.date2num(t) idx = bisect.bisect(self.data[0, :], tn) if idx == len(self): idx = idx - 1 if idx == 0: return self.data[1, 0] tn0 = np.abs(tn - self.data[0, idx - 1]) tn1 = np.abs(tn - self.data[0, idx]) if tn0 <= tn1: return self.data[1, idx - 1] else: return self.data[1, idx]
[docs] def nearest(self, t): ''' Return the nearest data. :param t: Scalar or list of time. ''' tlist = np.array(t) if tlist.shape == (): return self._nearest(t) else: d = np.array([self._nearest(ti) for ti in tlist]) return d
[docs] def linear_interpolate(self, t): """ Linear interpolation. :param t: Time or a sequence of times. :return: Interpolated data. >>> tlist = [datetime.datetime(2009, 1, 10, 12), ... datetime.datetime(2009, 1, 10, 8), ... datetime.datetime(2009, 1, 10, 4), ... datetime.datetime(2009, 1, 10, 16), ... datetime.datetime(2009, 1, 10, 0),] >>> dlist = [1, 2, 3, 2, 2] >>> ts = ScalarSeries(tlist, dlist) >>> print(ts.linear_interpolate(datetime.datetime(2009, 1, 10, 6))) 2.5 >>> print(ts.linear_interpolate([datetime.datetime(2009, 1, 10, 6), datetime.datetime(2009, 1, 10, 14)])) [2.5 1.5] """ tn = mdates.date2num(t) from scipy.interpolate import interp1d f = interp1d(self.data[0, :], self.data[1, :]) return f(tn)
[docs] def spline_interpolate(self, t, extrapolate=False): ''' Interpolate using spline fitting. Interpolation can be done using Spline fitting. :param t: Time, sequence allowed. :type t: ``datetime.datetime`` :keyword extrapolate: Extrapolation handling. If ``True`` is set, extrapolation is allowed. If ``False`` is set (default), extrapolation is not allowed. You may also set ``datetime.timedelta`` object here to set "glues" of the data. In this case, the inter/extra-polation between (:meth:`t0` - extrapolate) and (:meth:`t1` + extrapolate) is allowed. :raises ValueError: If non-allowed extrapolation was executed. By default extrapolation is not allowed, because it may produce unrealistic values. >>> tlist = [datetime.datetime(2009, 1, 10, 12), ... datetime.datetime(2009, 1, 10, 8), ... datetime.datetime(2009, 1, 10, 4), ... datetime.datetime(2009, 1, 10, 16), ... datetime.datetime(2009, 1, 10, 0),] >>> dlist = [1, 2, 3, 2, 2] >>> ts = ScalarSeries(tlist, dlist) >>> print('%.3f' % ( ... ts.spline_interpolate(datetime.datetime(2009, 1, 10, 8)))) 2.000 >>> print('%.3f' % ( ... ts.spline_interpolate(datetime.datetime(2009, 1, 10, 6)))) 2.625 You may give a list as argumnet, too. >>> vals = ts.spline_interpolate([datetime.datetime(2009, 1, 10, 1), ... datetime.datetime(2009, 1, 10, 5), ... datetime.datetime(2009, 1, 10, 9),]) >>> print('%.3f %.3f %.3f' % (vals[0], vals[1], vals[2])) 2.547 2.859 1.672 Extrapolation is not allowed by default, but you can set extrapolation keyword to do it. >>> print('%.3f' % ( ... ts.spline_interpolate(datetime.datetime(2009, 1, 11, 0), ... extrapolate=True))) 18.000 You may also allow the extrapolation-allowed range with ``datetime.timedelta`` object. See ``scripts/timeseries_sample1`` also (TBD). >>> print('%.3f' % ( ... ts.spline_interpolate(datetime.datetime(2009, 1, 9, 0), ... extrapolate=datetime.timedelta(days=1)))) -158.000 ''' if extrapolate is True: ex0 = datetime.datetime(datetime.MINYEAR, 1, 1) ex1 = datetime.datetime(datetime.MAXYEAR, 12, 31) elif extrapolate is False: ex0 = self.t0() ex1 = self.t1() else: ex0 = self.t0() - extrapolate ex1 = self.t1() + extrapolate ex0 = mdates.date2num(ex0) ex1 = mdates.date2num(ex1) tn = mdates.date2num(t) if (np.array(tn) < ex0).any() | (np.array(tn) > ex1).any(): raise ValueError( 'Some of the given time is out of the range.\n' + 'Use extrapol keyword to avoid this exception,\n' + 'while large error may be introduced.') try: spl = self.cache['spl'] except KeyError: spl = interpolate.InterpolatedUnivariateSpline(self.data[0, :], self.data[1, :]) self.cache['spl'] = spl return spl(tn)
[docs] def clear_cache(self): ''' Clear cache (for spline) ''' self.cache = {}
[docs] def concatenate(self, other, strategy='overwrite'): ''' Concatenate other series to return a new series. :param other: The other :class:`ScalarSeries` :keyword strategy: Strategy to concatenate. Default is ``'overwrite'``, in which the same data points are overwritten by the ``other`` data. If ``'nooverwrite'`` is given, the same data points are kept as ``self`` data. :returns: New :class:`ScalarSeries` Prepare the first ScalarSeries with 3 records. >>> t0 = [datetime.datetime(2008, 12, 1), ... datetime.datetime(2008, 12, 3), ... datetime.datetime(2008, 12, 5)] >>> d0 = [1.5, 2.5, 3.5] >>> ts0 = ScalarSeries(t0, d0) Then, create the second ScalarSeries with 3 records. >>> t1 = [datetime.datetime(2008, 12, 5), ... datetime.datetime(2008, 12, 8), ... datetime.datetime(2008, 12, 9)] >>> d1 = [4.5, 6.5, 9.5] >>> ts1 = ScalarSeries(t1, d1) Concatenate them. Remember 1 record is duplicated. Therefore, only 5 records are in the new ScalarSeries. >>> ts2 = ts0.concatenate(ts1) >>> print(len(ts2)) 5 The duplicated record is overwritten by ``ts1``. >>> print(ts2.nearest(datetime.datetime(2008, 12, 5))) 4.5 If 'nooverwrite strategy is taken, ``ts0`` is used. >>> ts2 = ts0.concatenate(ts1, strategy='nooverwrite') >>> print(ts2.nearest(datetime.datetime(2008, 12, 5))) 3.5 ''' t0, d0 = self.get_data() t1, d1 = other.get_data() if strategy == 'overwrite': t = np.concatenate([t0, t1]) d = np.concatenate([d0, d1]) elif strategy == 'nooverwrite': t = np.concatenate([t1, t0]) d = np.concatenate([d1, d0]) else: raise ValueError('No strategy %s defined.' % strategy) return ScalarSeries(t, d)
def __len__(self): return len(self.data[0, :])
[docs] @classmethod def sample_data(cls): ''' Get a sample data. ''' tlist = [datetime.datetime(2009, 1, 10, 0), datetime.datetime(2009, 1, 10, 1), datetime.datetime(2009, 1, 10, 3), datetime.datetime(2009, 1, 10, 8), datetime.datetime(2009, 1, 10, 10), datetime.datetime(2009, 1, 10, 11), datetime.datetime(2009, 1, 10, 15), datetime.datetime(2009, 1, 10, 21), ] data = [0.0, 1.5, 2.8, 9.8, 9.2, 9.9, 16.3, 22.5] return ScalarSeries(tlist, data)
[docs] def clipped(self, t0, t1): '''Return clipped :class:`ScalarSeries`. :param t0: Start time :param t1: Stop time :return: clipped data between t0 and t1. :rtype: ScalarSeries >>> scalar_series = ScalarSeries.sample_data() >>> print(len(scalar_series)) 8 >>> clipped_scalar_series = scalar_series.clipped( ... datetime.datetime(2009, 1, 10, 7), ... datetime.datetime(2009, 1, 10, 15)) >>> print(len(clipped_scalar_series)) 4 >>> print(clipped_scalar_series.t0()) 2009-01-10 08:00:00 >>> print(clipped_scalar_series.t1()) 2009-01-10 15:00:00 ''' tlist, dlist = self.get_data(t0, t1) return ScalarSeries(tlist, dlist)
[docs]class Vector3dSeries(_TimeSeriesABC): ''' Time series of 3-D vector. Internally each component uses :class:`ScalarSeries`. This is a redundunt approch (time_array is repeated three times) but simple implementation wins. >>> tlist = [datetime.datetime(2009, 1, 10, 0), ... datetime.datetime(2009, 1, 10, 4), ... datetime.datetime(2009, 1, 10, 8), ... datetime.datetime(2009, 1, 10, 12), ... datetime.datetime(2009, 1, 10, 16),] >>> xlist = [0, 0.5, 1, 0.5, 0] >>> ylist = [1, 0.5, 0, -0.5, -1] >>> zlist = [0, 1, 2, 3, 4] >>> vs = Vector3dSeries(tlist, xlist, ylist, zlist) To get the data, you can use :meth:`get_data` method. >>> t0 = datetime.datetime(2009, 1, 10, 1) >>> t1 = datetime.datetime(2009, 1, 10, 8) >>> t, v = vs.get_data(t0, t1) >>> print(len(t)) 2 >>> print(t[0]) 2009-01-10 04:00:00 >>> print(t[1]) 2009-01-10 08:00:00 >>> print(v.shape) (3, 2) >>> print(v[:, 0]) # doctest: +NORMALIZE_WHITESPACE [0.5 0.5 1. ] >>> print(v[:, 1]) # doctest: +NORMALIZE_WHITESPACE [1. 0. 2.] You may also use :meth:`get_average` and :meth:`get_moment` methods for the average or moment values. >>> ave = vs.get_average(t0, t1) >>> print('%.2f %.2f %.2f' % (ave[0], ave[1], ave[2])) 0.75 0.25 1.50 >>> n, ave, var = vs.get_moment(t0, t1) >>> print(n) 2 >>> print(ave) # doctest: +NORMALIZE_WHITESPACE [0.75 0.25 1.5 ] >>> print(var) # doctest: +NORMALIZE_WHITESPACE [0.0625 0.0625 0.25 ] For interpolation, you can use :meth:`spline_interpolate`. >>> t_interpol = [t0, t1] >>> val_interpol = vs.spline_interpolate(t_interpol) >>> from irfpy.util.with_context import printoptions >>> with printoptions(precision=2, suppress=True): ... print(val_interpol.shape) ... print(val_interpol[:, 0]) # doctest: +NORMALIZE_WHITESPACE ... # print val_interpol[:, 1] (3, 2) [0.04 0.88 0.25] # [ 1. 0. 2.] .. todo:: The doctest may make non-unique solution (0. or -0.) so that I disabled the test on 2013-09-03. One has to make better test routine here. ''' def __init__(self, time_array, x_array, y_array, z_array): ''' Instance the :class:`Vector3dSeries` object :param time_array: Sequence of time in ``datetime.datetime`` objects. :param x_array: Sequence of x-component in ``float``. :param y_array: Sequence of y-component in ``float``. :param z_array: Sequence of z-component in ``float``. ''' self.x = ScalarSeries(time_array, x_array) self.y = ScalarSeries(time_array, y_array) self.z = ScalarSeries(time_array, z_array)
[docs] def spline_interpolate(self, t, extrapolate=False): ''' Spline interpolation. Returns the spline interpolation. Interpolation is component-wise. See also :meth:`ScalarSeries.spline_interpolate` for details. :param t: ``datetime.datetime`` object or sequence of it. :keyword extrapolate: Extrapolation handling. See :meth:`ScalarSeries.spline_interpolate` for details. :returns: Interpolated vector (component wise). If ``t`` is a ``datetime.datetime`` object, ``np.array`` with shape of (3, ) is returned. If ``t`` is a sequence of ``datetime.datetime`` with *N* length, ``np.array`` with shape of (3, *N*) is returned. ''' x = self.x.spline_interpolate(t, extrapolate=extrapolate) y = self.y.spline_interpolate(t, extrapolate=extrapolate) z = self.z.spline_interpolate(t, extrapolate=extrapolate) return np.array([x, y, z])
[docs] def t0(self): ''' Return the first data time ''' return self.x.t0()
[docs] def t1(self): ''' Return the last data time ''' return self.x.t1()
[docs] def getobstime(self): ''' Return the observation time. :return: List of observation time. ''' return self.x.getobstime()
[docs] def get_data(self, t0=None, t1=None): ''' Return the data in between ``t0`` and ``t1``. :param t0: Start time (inclusive) :type t0: ``datetime.datetime`` :param t1: End time (inclusive) :type t1: ``datetime.datetime`` :returns: The data in between ``t0`` and ``t1``. ``ndarray`` with the shape of (3, N) where N is the number of data. ''' t, x = self.x.get_data(t0, t1) t, y = self.y.get_data(t0, t1) t, z = self.z.get_data(t0, t1) return (t, np.array([x, y, z]))
[docs] def get_timeseries_magnitude(self): """ Return the time series of the magnitude data. :return: Time series of magnitude data. :rtype: :class:`ScalarSeries`. >>> vecseries = Vector3dSeries.sample_data() >>> print(len(vecseries)) 5 >>> t, v = vecseries.get_data() >>> magseries = vecseries.get_timeseries_magnitude() >>> t, l = magseries.get_data() >>> print(v) [[ 0. 0.5 1. 0.5 0. ] [ 1. 0.5 0. -0.5 -1. ] [ 0. 1. 2. 3. 4. ]] >>> print(l) # doctest: +NORMALIZE_WHITESPACE [1. 1.22474487 2.23606798 3.082207 4.12310563] """ tlist, xyz = self.get_data() mag = np.sqrt((xyz ** 2).sum(axis=0)) return ScalarSeries(tlist, mag)
[docs] def clipped(self, t0, t1): """ Return clipped Vector3dSeries. :param t0: Start time :param t1: Stop time :return: New Vector3dSeries >>> tser3d = Vector3dSeries.sample_data() >>> print(len(tser3d)) 5 >>> tser3d = tser3d.clipped(datetime.datetime(2009, 1, 10, 6), ... datetime.datetime(2009, 1, 10, 15)) >>> print(len(tser3d)) 2 >>> tlist, dlist = tser3d.get_data() >>> print(dlist) [[ 1. 0.5] [ 0. -0.5] [ 2. 3. ]] """ t, x = self.x.get_data(t0, t1) t, y = self.y.get_data(t0, t1) t, z = self.z.get_data(t0, t1) return Vector3dSeries(t, x, y, z)
[docs] def get_average(self, t0=None, t1=None): ''' Return the average. :param t0: Start time (inclusive) :type t0: ``datetime.datetime`` :param t1: End time (inclusive) :type t1: ``datetime.datetime`` :returns: Average :rtype: ``ndarray`` with shape of (3,) ''' if t0 is None: t0 = self.t0() if t1 is None: t1 = self.t1() x = self.x.get_average(t0, t1) y = self.y.get_average(t0, t1) z = self.z.get_average(t0, t1) return np.array([x, y, z])
[docs] def get_moment(self, t0=None, t1=None): ''' Return the moment. :param t0: Start time (inclusive) :type t0: ``datetime.datetime`` :param t1: End time (inclusive) :type t1: ``datetime.datetime`` :returns: A tuple. (number of data, average in (3,) ``ndarray``, variance in (3,) ``ndarray``) ''' if t0 is None: t0 = self.t0() if t1 is None: t1 = self.t1() xn, xa, xv = self.x.get_moment(t0, t1) yn, ya, yv = self.y.get_moment(t0, t1) zn, za, zv = self.z.get_moment(t0, t1) return (xn, np.array([xa, ya, za]), np.array([xv, yv, zv]))
[docs] def clear_cache(self): self.x.clear_cache() self.y.clear_cache() self.z.clear_cache()
[docs] def nearest(self, t): x = self.x.nearest(t) y = self.y.nearest(t) z = self.z.nearest(t) return np.array([x, y, z])
[docs] def concatenate(self, other, strategy='overwrite'): ''' Concatenate other series to return a new series. :param other: The other :class:`Vector3dSeries` :keyword strategy: Strategy to concatenate. Default is ``'overwrite'``, in which the same data points are overwritten by the ``other`` data. If ``'nooverwrite'`` is given, the same data points are kept as ``self`` data. :returns: New :class:`VectorSeries` >>> t0 = dtrange(datetime.datetime(2005, 1, 1), ... datetime.datetime(2005, 1, 2), ... datetime.timedelta(hours=6)) # 4 elements >>> print(len(t0)) 4 >>> x0 = [0, 1, 2, 3] >>> y0 = [0, -1, -2, -3] >>> z0 = [0, 0, 0, 0] >>> v0 = Vector3dSeries(t0, x0, y0, z0) >>> t1 = dtrange(datetime.datetime(2005, 1, 2), ... datetime.datetime(2005, 1, 2, 12), ... datetime.timedelta(hours=4)) # 3 elements >>> x1 = [4, 5, 6] >>> y1 = [-2, -1, 0] >>> z1 = [1, 1, 1] >>> v1 = Vector3dSeries(t1, x1, y1, z1) >>> v2 = v0.concatenate(v1) >>> print(len(v2)) 7 >>> t2, d2 = v2.get_data() >>> print(d2.shape) (3, 7) >>> print(d2[0, :]) # doctest: +NORMALIZE_WHITESPACE [0. 1. 2. 3. 4. 5. 6.] >>> print(d2[1, :]) [ 0. -1. -2. -3. -2. -1. 0.] >>> print(d2[2, :]) # doctest: +NORMALIZE_WHITESPACE [0. 0. 0. 0. 1. 1. 1.] ''' x = self.x.concatenate(other.x, strategy=strategy) y = self.y.concatenate(other.y, strategy=strategy) z = self.z.concatenate(other.z, strategy=strategy) t, x = x.get_data() t, y = y.get_data() t, z = z.get_data() return Vector3dSeries(t, x, y, z)
def __len__(self): return len(self.x)
[docs] @classmethod def sample_data(cls): ''' Return a sample data. ''' tlist = [datetime.datetime(2009, 1, 10, 0), datetime.datetime(2009, 1, 10, 4), datetime.datetime(2009, 1, 10, 8), datetime.datetime(2009, 1, 10, 12), datetime.datetime(2009, 1, 10, 16), ] xlist = [0, 0.5, 1, 0.5, 0] ylist = [1, 0.5, 0, -0.5, -1] zlist = [0, 1, 2, 3, 4] return Vector3dSeries(tlist, xlist, ylist, zlist)
def __str__(self): s = '<irfpy.util.timeseries.Vector3dSeries: Len={l:d} T0={t0:%FT%T} T1={t1:%FT%T}>'.format( l=len(self), t0=self.t0(), t1=self.t1()) return s
[docs]class TimeStepScalarField3D(_TimeSeriesABC): r""" Time series of scalar field. This class extends the functionality of :class:`irfpy.util.fields.ScalarField3D` class and their derivatives in order to support the time series of fields. .. note:: This contains several :class:`ScalarField3D`, so that the system should sometimes have a very large memory. No memory handling is supported. This means that all the data is on memory. *Sample* Let's look at a scalar field, i.e. gravity potential, produced by a moving point of mass: :math:`(x_m=vt, 0, 0)`. The gravity potential is obtained from :class:`irfpy.util.fields.GravityPotential` class. First, just create the potential field at ``t=0``, i.e. ``x_m=0``. The mass is taken as :math:`M=\frac{1}{6.67408\times 10^{-11}}`, then, :math:`GM=1`. Note that the time should be ``datetime.datetime`` object. Thus, a dummy time (with respective to a given epoch). >>> import datetime >>> t0 = datetime.datetime(2010, 1, 1, 0, 0, 0) # An epoch. This case, it is a dummy time. .. math:: \phi(r) = -\frac{GM}{r} = -\frac{1}{r} >>> import irfpy.util.fields >>> m = 1 / 6.67408e-11 >>> potential_t0 = irfpy.util.fields.GravityPotential(m, center=(0, 0, 0)) Just to confirm if it works, check the potential at ``(0, 4, 0)``. >>> print(potential_t0([0, 4, 0])) -0.25 Then, as time goes, the point of mass moves. Here I take ``v=2.5``. >>> v=2.5 At time ``t=1``, the position of the mass is at ``x_m = v (=2.5)``. >>> t=1 >>> xm_1 = (v * t, 0, 0) >>> potential_t1 = irfpy.util.fields.GravityPotential(m, center=xm_1) Now the observer at (0, 4, 0) see different potential, the distance is now :math:`|(2.5, 4.0, 0)|=4.717` and the potential is :math:`\phi(r, t=1)=-1/r=-0.212`. >>> print('{:.3f}'.format(potential_t1([0, 4, 0]))) -0.212 At time ``t=3``, the position moves to ``x_m = 3v = 7.5``. >>> t=3 >>> xm_3 = (v * t, 0, 0) >>> potential_t3 = irfpy.util.fields.GravityPotential(m, center=xm_3) >>> print('{:.3f}'.format(potential_t3([0, 4, 0]))) -0.118 A bit more time steps are added as follows. >>> si = (0, 1, 3, 5, 8) >>> ti = [t0 + datetime.timedelta(seconds=_t) for _t in si] >>> potential_list = [] >>> for _t in si: ... xm_t = (v * _t, 0, 0) ... potential_t = irfpy.util.fields.GravityPotential(m, center=xm_t) ... potential_list.append(potential_t) Now it is time to create a :class:`TimeStepScalarField`. >>> tsfield = TimeStepScalarField3D(ti, potential_list) First, you can get the field directly. The following is the same as >>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=0)))) # At (0, 4, 0) at t=0 -0.250 >>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=1)))) # At (0, 4, 0) at t=1 -0.212 >>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=3)))) # At (0, 4, 0) at t=3 -0.118 Timewise, the linear interpolated value is returned. Spacewise, the default interpolation of the given field (irfpy.util.fields.ScalarField3D) is used. >>> print('{:.3f}'.format(tsfield((0, 4, 0), t0 + datetime.timedelta(seconds=2)))) # At (0, 4, 0) at t=2 is not given, but the returned is linear interpolation. -0.165 >>> print(tsfield.index_of_time(t0)) 0 >>> print(tsfield.index_of_time(t0 + datetime.timedelta(seconds=4))) 2 >>> print(tsfield.index_of_time(t0 + datetime.timedelta(seconds=9))) 4 """ def __init__(self, time_list, field_list): if len(time_list) != len(field_list): raise ValueError('The given time_list and field_list does not have same length.') datadict = {} for _t, _f in zip(time_list, field_list): datadict[_t] = _f self._tlist = [] # List is in the ascending order self._flist = [] # Index is corresponding to tlist for t in sorted(datadict.keys()): self._tlist.append(t) self._flist.append(datadict[t]) self._time2index = _tsdb.DB() for _i, _t in enumerate(self._tlist): self._time2index.append('{:d}'.format(_i), _t)
[docs] def t0(self): return self._tlist[0]
[docs] def t1(self): return self._tlist[1]
[docs] def getobstime(self): return self._tlist.copy()
[docs] def get_data(self, t0=None, t1=None): if t0 is None: t0 = self.t0() if t1 is None: t1 = self.t1() new_tlist = [] new_flist = [] for _t, _f in zip(self._tlist, self._flist): if _t >= t0 and _t <= t1: new_tlist.append(_t) new_flist.append(_f) return new_tlist, new_flist
[docs] def clipped(self, t0, t1): tlist, dlist = self.get_data(t0, t1) return TimeStepScalarField3D(tlist, dlist)
[docs] def index_of_time(self, t): return int(self._time2index.get(t))
def __call__(self, xyz, tt): # First, identify the corresponding field i0 = self.index_of_time(tt) t0 = self._tlist[i0] f0 = self._flist[i0] try: t1 = self._tlist[i0 + 1] except IndexError: return ValueError('The given time is out of bounds.') f1 = self._flist[i0 + 1] val0 = f0(xyz) val1 = f1(xyz) dt0 = (tt - t0).total_seconds() dt1 = (t1 - tt).total_seconds() gamma = dt0 / (dt0 + dt1) return val0 + (val1 - val0) * gamma
[docs]def dtrange(t0, t1, delta): ''' Return a regularly separated list of ``datatime`` objects. :param t0: Start :type t0: ``datetime.datetime`` :param t1: Stop (not inclusive) :type t1: ``datetime.datetime`` :param delta: Delta :type delta: ``datetime.timedelta``. :returns: List of ``datetime.datetime``. >>> r0 = dtrange(datetime.datetime(2009, 1, 1), ... datetime.datetime(2009, 1, 2), ... datetime.timedelta(hours=1)) >>> print(len(r0)) 24 >>> print(r0[0]) 2009-01-01 00:00:00 >>> print(r0[-1]) 2009-01-01 23:00:00 ''' tlist = [] t = t0 while t < t1: tlist.append(t) t = t + delta return tlist