Source code for irfpy.util.datacenter

""" The data center: Easy-implementation and access of time series dataset from multiple files.

This document is more for developer of DataCenter.
For user of already-implemented DataCenter, refer to :ref:`data_center` document.

For space plasma data analysis, most of dataset have the following characteristics.

- Data is time-tagged (time series data)
- Data file is multiple
- Each data file contains data with time-tag
- Data contents are arbitrarily (vector, matrix, or multi-dimensional values)

In this module, for an easy data handling, I prepared a base class
(:class:`BaseDataCenter`) to contain multiple files as data,
with coherent accessors. The accessors support

- Users specify a instantaneous time. Then user will get a single point data.
    - :meth:`BaseDataCenter.nearest`
    - :meth:`BaseDataCenter.nearest_no_earlier`
    - :meth:`BaseDataCenter.nearest_no_later`
    - :meth:`BaseDataCenter.previousof`
    - :meth:`BaseDataCenter.nextof`
- Users specify a range of time.  Then user will get an array of data
    - :meth:`BaseDataCenter.get_array`
    - :meth:`BaseDataCenter.get_array_strict`
    - :meth:`BaseDataCenter.get_array_wide`
    - :meth:`BaseDataCenter.get_array_wide_start`
    - :meth:`BaseDataCenter.get_array_wide_stop`
- Users specify a range of time.  Then user will get an iterator
    - :meth:`BaseDataCenter.iter`
    - :meth:`BaseDataCenter.iter_strict`
    - :meth:`BaseDataCenter.iter_wide`
    - :meth:`BaseDataCenter.iter_wide_start`
    - :meth:`BaseDataCenter.iter_wide_stop`

All the methods above will return a pair of the observed time and the data.

*What is the tagged-time?*

Time specification in space plasma data is always not straightforward.
The time-tag in a data could have at least three meaning:

- Tagged time is for the start of observation
- Tagged time is for the middle (or the exact instance) of observation
- Tagged time is for the end of observation

Thus, two different definitions of time specification are available: "strict" and "wide".
Assume you have a time series data with a series of tagged time ``(Ti)``.

- "strict": The strict time range ``(t0, t1)`` will give you a subset of the original ``(Ti)`` with
  - ``Ti`` included if ``t0 <= Ti <= t1``
  - ``Ti`` excluded if ``Ti < t0``
  - ``Ti`` excluded if ``Ti > t1``
- "wide": The wide time range ``(t0, t1)`` will give you a subset of the original ``(Ti)`` with
  - ``Ti`` included if ``Ti <= t0 < T(i+1)`` and ``T(i-1) < t1 <= Ti``, in addition to the string range.

If the specified time ``t`` is exactly at one of the time-tags, there is no difference of "strict" and "wide".
In general, "wide" will behave the same as "strict", if the time given is at one of the time-tags of data.
If the time given is not exactly at a time-tag, one more data will be returned for "wide" rather than that for "strict".

See example in :class:`SampleDataCenter` for more in details.

*For users*

This module provides a skelton (base abstract class) for a data center.
Implementation for each instruments are available by each project.

The simplest use cases are described in the sample class, :class:`SampleDataCenter`.

*For developer*

The :class:`BaseDataCenter` will give a quick implementation of dataset accessor.
What the developer should implement is several abstract methods.

See the description under :class:`BaseDataCenter` for details.
"""
import os as _os
import datetime as _datetime
from dateutil import parser as _parser
import abc as _abc
import logging as _logging
import copy as _copy

import numpy as _np

from irfpy.util import timeseriesdb as _tdb
from irfpy.util import timeseries as _ts
from irfpy.util import exception as _ex
from irfpy.util import ringcache as _ringcache

_logger = _logging.getLogger(__name__)


[docs]class DataNotInDC(_ex.IrfpyException): ''' Exception raised when the data is not found. ''' def __init__(self, value): self.value = value def __str__(self): return repr(self.value)
[docs]class IrfpyWarningNoFileInDataCenter(UserWarning): def __init__(self, *args, **kwds): UserWarning(*args, **kwds)
[docs]class BaseDataCenter(metaclass=_abc.ABCMeta): """ Base class of the data center. The :class:`BaseDataCenter` provides a quick implementation of time series data under multiple files in various format. The usage for users (interface) is described at :class:`SampleDataCenter` as follows. Here how to extend the :class:`BaseDataCenter` is depicted. Needed implementation is three methods as follows. - :meth:`search_files`: Search the data files to store to the data center. - :meth:`approximate_starttime`: Give an extremely quick way of giving a start time for each data file. - :meth:`read_file`: Read a single data file and return the data as a :class:`irfpy.util.timeseries.ObjectSeries` object. .. versionchanged:: v4.5 The data to be returned is, by default, a copy of the original data. With a compensation of slight overhead of processing time an memory, the returned data is safe to manipulated by users. Users can overwrite this default behavior with ``copy`` keyword on running. .. versionchanged:: v4.4.8a2 It is *not* recommended to re-implement :meth:`exact_starttime` any more. This is because the :meth:`exact_startime` should also judge if the data file is properly formatted and can be interpreted. If the data file is not proper, the file is disregarded from the database at the time of time check. Note that the loss of performance in total would be minimal, since the read data is stored to a cache, so that the datacenter does not repeat reading the file again. Cache size can be changed at the time of datacenter creation, or via :meth:``set_cachesize`` method. """ def __init__(self, cache_size=25, name="Unnamed DataCenter", copy=True): """ Initializer. :keyword cache_size: Size of the ring cache. :keyword name: The name of the :keyword copy: Boolean if the returned data is to be deep-copied (True) or reference (False). It is good to return the data after the copy, since then the data is always original. Returning reference is possibly faster, while there are side effect that the post-processing will destroy the original data. Therefore, it is recommended to set *True* always. The copy value can be overwritten by each method as necessity. """ self._cache = _ringcache.RingCache(cache_size) self.__name = name self.__fazdb = self._create_db() self._copy = copy
[docs] def set_cachesize(self, cache_size): """ Set the size of the cache. You can change the data cache size (:meth:`read_file`). Existing cache will be disregarded. :param cache_size: A size of the data cache """ self._cache = _ringcache.RingCache(cache_size)
[docs] def print_status(self): print('# DataCenterStatus:') print('# - Time range:') print('# - Start: {}'.format(self.t0())) print('# - Stop: {}'.format(self.t1())) print('# - TimeSeriesDataBase:') print('# - Size: {}'.format(len(self.__fazdb))) print('# - DataCache:') print('# - Size: {}'.format(len(self._cache)))
def _create_db(self): """ Create a data base """ db = _tdb.DB() for _f in self.search_files(): db.append(_f, self.approximate_starttime(_f)) fazdb = _tdb.FazzyDB(db, self.exact_starttime) if len(fazdb) == 0: import warnings as _warnings _warnings.warn("No file in the database.\n" + "\tCheck if the data exists, or your setting is correct.\n" + "\tContinue processings, but this may cause additional error later.", IrfpyWarningNoFileInDataCenter, stacklevel=4) # Stacklevel=4 is the level where the DataBase instance is created. return fazdb
[docs] @_abc.abstractmethod def search_files(self): """ Search the data files, returning a list of data file. This method searches the data files under the :attr:`base_folder`. This method should return a list / tuple of the data file name (usually a full path). This method is called only once when :meth:`__init__` was called. :return: A list / tuple of the data file. It should be full path (or relative path from the current path), and sorted from earlier data to later data. """ raise NotImplementedError('This must be implemented by a datacenter developer.')
[docs] @_abc.abstractmethod def approximate_starttime(self, filename): """ Start time should be guessed for each file. A guessed start time should be returned. It is OK if it is very approximate, but the orders of the guessed-start and the exact-start should be identical. This method must be very fast, because it is called for all the files in the data base (i.e. all the files retuned by :meth:`search_files` method). A practical suggestion for implementation is to guess the time from the filename. :param filename: A string, filename. :return: An approximate, guessed start time of the file :rtype: ``datetime.datetime`` """ raise NotImplementedError('This must be implemented by a datacenter developer.')
[docs] def exact_starttime(self, filename): """ From a file name, the precise start time should be obtained. The exact start time should be returned. .. versionchanged:: v4.4.8a2 It is *not* recommended to re-implement :meth:`exact_starttime` any more. This is because the :meth:`exact_startime` should also judge if the data file is properly formatted and can be interpreted. If the data file is not proper, the file is disregarded from the database at the time of time check. *For developer* Here, the exact start time is indeed the start time of the data contents. For example, let us think a data file "2013-10-18-12-00-00.dat". And assume that the first data in the data file is for ``2013-10-18T12:00:02``. In this case, the returned time should be the latter, i.e., ``2013-10-18T12:00:02``. Therefore, the start time can be obtained from the contents loaded by :meth:`read_file` method, with some specific error handling (zero-size data, or corrupted data file) Implementation of this method is not recommended. :param filename: A string, filename. :return: The ``datatime`` expression of the exact start time of the file. ``None`` is allowed if the data file cannot define the exact start time. In the ``None`` case, the file is dropped from the data center. """ try: contents = self._get_contents(filename) start = contents.t0() except (_ex.IrfpyException, AttributeError): start = None return start
def _get_contents(self, filename): """ From the file name, the contents are obtained as :class:`irfpy.timeseries.ObjectSeries` object. From the file name, the contents are obtained as :class:`irfpy.timeseries.ObjectSeries` object. Also, cache enabled. :param filename: A filename :return: Contents of the specified file. :rtype: :class:`irfpy.timeseries.ObjectSeries` Note that this is *not* intended to override. """ if not self._cache.hasKey(filename): contents = self.read_file(filename) if not isinstance(contents, _ts.ObjectSeries): tcontent, dcontent = contents if len(tcontent) != len(dcontent): _logger.warning(r'The given file ({}) have contents with inconsistent lengths.\n'.format(filename)) _logger.warning(r'\tTime={}/Data={}'.format(len(tcontent), len(dcontent))) _logger.warning(r'\tThe file {} is ingored, and continue processing.'.format(filename)) raise _ex.IrfpyException('Data inconsistency') contents = _ts.ObjectSeries(tcontent, dcontent) self._cache.add(filename, contents) contents = self._cache.get(filename) return contents
[docs] @_abc.abstractmethod def read_file(self, filename): """ The file is read, and return the contents as a tuple with size 2, (tlist, dlist). This method is an abstract method, meaning that the developer of the data center should implement it. See :class:`SampleDataCenter` for more details. The implementation of this method should follow: - Returned value is a tuple with a size of 2. - The first element is a tuple/list specifying the time (with each element as ``datetime.datetime`` object) - The second element is a tuple/list specifying the data, with any format. - The length of both two elements should be the same. If the given filename is corrupted or empty, a two empty tuple would be returned (i.e., ``return (), ()``). In this case, return ``None`` for the :meth:`exact_starttime` method. :param filename: File name :return: The contents of the data file :rtype: ``tuple`` """ raise NotImplementedError('This must be implemented by a datacenter developer.')
[docs] def t0(self): """ Return the time-tag of the first data. :return: The time-tag of the first data. """ t, dat = self.nearest_no_earlier(_datetime.datetime.min) return t
[docs] def t1(self): """ Return the time-tag of the last data. :return: The time tag of the last data """ t, dat = self.nearest_no_later(_datetime.datetime.max) return t
def __copy_or_ref(self, data, copy): iscopy = (self._copy if copy is None else copy) return (_copy.deepcopy(data) if iscopy else data)
[docs] def nearest(self, t, copy=None): """ Return the nearest data in the data center. :param t: Time in ``datetime`` object :keyword copy: If True, returned data is a new copy. If False, returned data is a reference. If None, the default value (see :meth:`__init__`) is used. If unaware, keep it as *None* (default). :return: A list, (``t_obs``, ``data``) """ try: te, de = self.nearest_no_later(t, copy=False) except DataNotInDC: te, de = None, None try: tl, dl = self.nearest_no_earlier(t, copy=False) except DataNotInDC: tl, dl = None, None if te is None: return tl, self.__copy_or_ref(dl, copy) elif tl is None: return te, self.__copy_or_ref(de, copy) delta_e = (t - te).total_seconds() delta_l = (tl - t).total_seconds() if delta_e <= delta_l: return te, self.__copy_or_ref(de, copy) else: return tl, self.__copy_or_ref(dl, copy)
[docs] def nearest_no_later(self, t, copy=None): """ Return the nearest data in the data center. :param t: Time in ``datetime`` object :return: A list, (``t_obs``, ``data``) """ # First, get the file name corresponding to the time t. try: fn = self.__fazdb.get(t) except _tdb.DataNotInDbError: raise DataNotInDC('The previous data at time {:%FT%T} not in the data center.'.format(t)) # Then, read the contents. tserdat = self._get_contents(fn) try: # Reading the earlier data dat = tserdat.nearest_no_later(t) except _ts.DataNotFound: # If the given time is earlier than the loaded file... try: fn = self.__fazdb.previousof(fn) except _tdb.DataNotInDbError: raise DataNotInDC('The previous data at time {:%FT%T} not in the dat center.'.format(t)) tserdat = self._get_contents(fn) dat = tserdat.nearest_no_later(t) return dat[0], self.__copy_or_ref(dat[1], copy)
nearest_earlier_or_at = nearest_no_later
[docs] def previousof(self, t, copy=None): try: fn = self.__fazdb.get(t) except _tdb.DataNotInDbError: # Too early time is specified raise DataNotInDC('The previousto data at time {:%FT%T} not in the data cneter.'.format(t)) tserdat = self._get_contents(fn) try: dat = tserdat.previousof(t) # Get the previous data except _ts.DataNotFound: # If the given time is out of the database try: fn = self.__fazdb.previousof(fn) except _tdb.DataNotInDbError: raise DataNotInDC('The previous data at time {:%FT%T} not in the dat center.'.format(t)) tserdat = self._get_contents(fn) # Load one file before dat = tserdat.nearest_no_later(t) return dat[0], self.__copy_or_ref(dat[1], copy)
[docs] def nearest_no_earlier(self, t, copy=None): """ Return the nearest data in the data center. :param t: Time in ``datetime`` object :return: A list, (``t_obs``, ``data``) """ # First, get the file name corresponding to the time t. try: fn = self.__fazdb.get(t) # Time given is earlier than the data base except _tdb.DataNotInDbError: t0 = self.__fazdb.t0() fn = self.__fazdb.get(t0) # The first file tserdat = self._get_contents(fn) while len(tserdat) == 0: fn = self.__fazdb.nextof(fn) tserdat = self._get_contents(fn) _t, _d = tserdat.nearest_no_earlier(t0) return _t, self.__copy_or_ref(_d, copy) # Then, read the contents. tserdat = self._get_contents(fn) try: dat = tserdat.nearest_no_earlier(t) # If the give time is later than the dataset, exception is thrown. except _ts.DataNotFound: try: fn = self.__fazdb.nextof(fn) # Check next file. except _tdb.DataNotInDbError: # The next file does not exist raise DataNotInDC('The next data at time {:%FT%T} not in the data center'.format(t)) tserdat = self._get_contents(fn) dat = tserdat.nearest_no_earlier(t) return dat[0], self.__copy_or_ref(dat[1], copy)
nearest_later_or_at = nearest_no_earlier
[docs] def nextof(self, t, copy=None): try: fn = self.__fazdb.get(t) except _tdb.DataNotInDbError: t0 = self.__fazdb.t0() fn = self.__fazdb.get(t0) tserdat = self._get_contents(fn) _t, _d = tserdat.nearest_no_earlier(t0) # The data at t0 should be returned. return _t, self.__copy_or_ref(_d, copy) tserdat = self._get_contents(fn) try: dat = tserdat.nextof(t) except _ts.DataNotFound: try: fn = self.__fazdb.nextof(fn) # Check the next file except _tdb.DataNotInDbError: raise DataNotInDC('The nextto data at time {:%FT%T} not in the data center'.format(t)) tserdat = self._get_contents(fn) while len(tserdat) == 0: try: fn = self.__fazdb.nextof(fn) except _tdb.DataNotInDbError: raise DataNotInDC('The nextto data at time {:%FT%T} not in the data center'.format(t)) tserdat = self._get_contents(fn) dat = tserdat.nearest_no_earlier(t) return dat[0], self.__copy_or_ref(dat[1], copy)
[docs] def get_array(self, t0, t1, wide_start=False, wide_stop=False, copy=None): if wide_start and wide_stop: return self.get_array_wide(t0, t1, copy=copy) elif wide_start and (not wide_stop): return self.get_array_wide_start(t0, t1, copy=copy) elif (not wide_start) and wide_stop: return self.get_array_wide_stop(t0, t1, copy=copy) else: return self.get_array_strict(t0, t1, copy=copy)
[docs] def get_array_strict(self, t0, t1, copy=None): """ Return the array of data in the data center. :param t0: Start time in ``datetime`` object :param t1: Stop time in ``datetime`` object :return: A list, (``tlist_obs``, ``data_list``) The time of the observation (``tlist_obs``) should be strictly between ``t0`` and ``t1`` (edge inclusive). """ tlist = [] dlist = [] for t, d in self.iter_strict(t0, t1, copy=copy): tlist.append(t) dlist.append(d) return (tlist, dlist)
[docs] def get_array_wide(self, t0, t1, copy=None): """ Return the array of data in the data center. :param t0: Start time in ``datetime`` object :param t1: Stop time in ``datetime`` object :return: A list, (``tlist_obs``, ``data_list``) The time of the observation (``tlist_obs``) contains data outside of the given (single data point added in both direction) in order to account for interpretations of time tag. """ tlist = [] dlist = [] for t, d in self.iter_wide(t0, t1, copy=copy): tlist.append(t) dlist.append(d) return (tlist, dlist)
[docs] def get_array_wide_start(self, t0, t1, copy=None): """ Return the array of data in the data center. :param t0: Start time in ``datetime`` object :param t1: Stop time in ``datetime`` object :return: A list, (``tlist_obs``, ``data_list``) The time of the observation (``tlist_obs``) contains data outside of the given (single data point added in both direction) in order to account for interpretations of time tag. """ tlist = [] dlist = [] for t, d in self.iter_wide_start(t0, t1, copy=copy): tlist.append(t) dlist.append(d) return (tlist, dlist)
[docs] def get_array_wide_stop(self, t0, t1, copy=None): """ Return the array of data in the data center. :param t0: Start time in ``datetime`` object :param t1: Stop time in ``datetime`` object :return: A list, (``tlist_obs``, ``data_list``) The time of the observation (``tlist_obs``) contains data outside of the given (single data point added in both direction) in order to account for interpretations of time tag. """ tlist = [] dlist = [] for t, d in self.iter_wide_stop(t0, t1, copy=copy): tlist.append(t) dlist.append(d) return (tlist, dlist)
[docs] def iter(self, t0=_datetime.datetime.min, t1=_datetime.datetime.max, wide_start=False, wide_stop=False, copy=None): """ Return the iterator of data in the data center. :param t0: Start time in ``datetime`` object. :param t1: Stop time in ``datetime`` object. :keyword wide_start: Set *True* if the start time should be interpreted as "wide". Default, *False* ("strict") :keyword wide_stop: Set *True* if the stop time should be interpreted as "wide". Default, *False* ("strict") :return: An iterator. """ if wide_start and wide_stop: return self.iter_wide(t0, t1, copy=copy) elif wide_start and (not wide_stop): return self.iter_wide_start(t0, t1, copy=copy) elif (not wide_start) and wide_stop: return self.iter_wide_stop(t0, t1, copy=copy) else: return self.iter_strict(t0, t1, copy=copy)
[docs] def iter_strict(self, t0=_datetime.datetime.min, t1=_datetime.datetime.max, copy=None): """ Return the iterator of data in the data center. :param t0: Start time in ``datetime`` object. This is defined as "strict" :param t1: Stop time in ``datetime`` object. This is defined as "strict" :return: An iterator. The time of the observation should be strictly between ``t0`` and ``t1`` (edge inclusive). """ return _IterStrict(self, t0, t1, copy=copy)
[docs] def iter_wide(self, t0=_datetime.datetime.min, t1=_datetime.datetime.max, copy=None): """ Return the iterator of data in the data center. :param t0: Start time in ``datetime`` object. This is defined as "wide". :param t1: Stop time in ``datetime`` object. This is defined as "wide". :return: An iterator. The time of the observation (``tlist_obs``) contains data outside of the given (single data point added in both direction) in order to account for interpretations of time tag. """ return _IterWide(self, t0, t1, copy=copy)
[docs] def iter_wide_start(self, t0=_datetime.datetime.min, t1=_datetime.datetime.max, copy=None): """ Return the iterator of data in the data center :param t0: Start time. This is defined as "wide" :param t1: Stop time. This is defined as "strict" :return: An iterator. """ return _IterWideStart(self, t0, t1, copy=copy)
[docs] def iter_wide_stop(self, t0=_datetime.datetime.min, t1=_datetime.datetime.max, copy=None): """ Return the iterator of data in the data center :param t0: Start time. This is defined as "strict" :param t1: Stop time. This is defined as "wide" :return: An iterator. """ return _IterWideStop(self, t0, t1, copy=copy)
def __str__(self): """ String expression to show the status of the datacenter :return: String expression """ s = "irfpy DataCenter (Name: '{}'): {} data files combined".format(self.__name, len(self)) return s def __repr__(self): s = "<{}: {}>".format(self.__class__.__name__, str(self)) return s def __len__(self): """ Return the number of data files in the data center. :return: Number of data files. """ return len(self.__fazdb)
class _IterStrict: _logger = _logging.getLogger(__name__ + '._IterStrict') def __init__(self, datacenter, t0, t1, copy=None): """ Iterator :param datacenter: Data center object :type datacenter: :class:`DataCenter` :param t0: :param t1: """ self.datacenter = datacenter self.t0 = t0 self.t1 = t1 self._current_time = t0 self._copy = (self.datacenter._copy if copy is None else copy) self._getnextfunction = self.datacenter.nearest_no_earlier # This function is a key of iteration. # For the first item, it should be "no_earlier", but from the second, the nextof() function should be used. def __iter__(self): return self def __next__(self): try: tnew, dnew = self._getnextfunction(self._current_time, copy=self._copy) self._getnextfunction = self.datacenter.nextof except DataNotInDC: raise StopIteration() if tnew > self.t1: raise StopIteration() self._current_time = tnew return (tnew, dnew) class _IterWide: _logger = _logging.getLogger(__name__ + '._IterWide') def __init__(self, datacenter, t0, t1, copy=None): """ Iterator :param datacenter: Data center object :type datacenter: :class:`DataCenter` :param t0: :param t1: """ self.datacenter = datacenter self.t0 = t0 self.t1 = t1 self._current_time = t0 self._copy = (self.datacenter._copy if copy is None else copy) self._getnextfunction = self.datacenter.nearest_no_later # This function is a key of iteration. # For the first item, it should be "no_later", but from the second, the nextof() function should be used. def __iter__(self): return self def __next__(self): if self._current_time >= self.t1: raise StopIteration() try: tnew, dnew = self._getnextfunction(self._current_time, copy=self._copy) self._getnextfunction = self.datacenter.nextof except DataNotInDC: raise StopIteration() self._current_time = tnew return (tnew, dnew) class _IterWideStart: _logger = _logging.getLogger(__name__ + '._IterWideStart') def __init__(self, datacenter, t0, t1, copy=None): """ Iterator :param datacenter: Data center object :type datacenter: :class:`DataCenter` :param t0: :param t1: """ self.datacenter = datacenter self.t0 = t0 self.t1 = t1 self._current_time = t0 self._copy = (self.datacenter._copy if copy is None else copy) self._getnextfunction = self.datacenter.nearest_no_later # This function is a key of iteration. # For the first item, it should be "no_earlier", but from the second, the nextof() function should be used. def __iter__(self): return self def __next__(self): try: tnew, dnew = self._getnextfunction(self._current_time, copy=self._copy) self._getnextfunction = self.datacenter.nextof except DataNotInDC: raise StopIteration() if tnew > self.t1: raise StopIteration() self._current_time = tnew return (tnew, dnew) class _IterWideStop: _logger = _logging.getLogger(__name__ + '._IterWideStop') def __init__(self, datacenter, t0, t1, copy=None): """ Iterator :param datacenter: Data center object :type datacenter: :class:`DataCenter` :param t0: :param t1: """ self.datacenter = datacenter self.t0 = t0 self.t1 = t1 self._current_time = t0 self._copy = (self.datacenter._copy if copy is None else copy) self._getnextfunction = self.datacenter.nearest_no_earlier # This function is a key of iteration. # For the first item, it should be "no_later", but from the second, the nextof() function should be used. def __iter__(self): return self def __next__(self): if self._current_time >= self.t1: raise StopIteration() try: tnew, dnew = self._getnextfunction(self._current_time, copy=self._copy) self._getnextfunction = self.datacenter.nextof except DataNotInDC: raise StopIteration() self._current_time = tnew return (tnew, dnew)
[docs]def create_sample_folder(sample_basedir=None): """ Create a sample folder structure. It is for development and testing. It creates the following folder structure :: <sample_basedir>/y2017/ <sample_basedir>/y2017/m03/ <sample_basedir>/y2017/m03/DATACENTER_SAMPLE_DATA_2017_03_30.dat <sample_basedir>/y2017/m03/DATACENTER_SAMPLE_DATA_2017_03_31.dat <sample_basedir>/y2017/m04/ <sample_basedir>/y2017/m04/DATACENTER_SAMPLE_DATA_2017_04_01.dat <sample_basedir>/y2017/m04/DATACENTER_SAMPLE_DATA_2017_04_02.dat :keyword sample_basedir: The folder under which the sample file is created. Default is None, which means that the folder is created by ``tempfile.mkdtemp()`` :returns: The name of the sample directory. User should remove the directory manually. """ import os import datetime import tempfile if sample_basedir is None: sample_basedir = tempfile.mkdtemp() _logger.info('Sample file tree is created on {}'.format(sample_basedir)) t = t0 = datetime.datetime(2017, 3, 30) t1 = datetime.datetime(2017, 4, 3) dt = datetime.timedelta(minutes=35) while t <= t1: y = t.year m = t.month d = t.day folder = os.path.join(sample_basedir, 'y%04d' % y, 'm%02d' % m) _logger.debug('Folder is %s', folder) os.makedirs(folder, exist_ok=True) filename = 'DATACENTER_SAMPLE_DATA_%04d_%02d_%02d.dat' % (y, m, d) _logger.debug('File is %s', filename) with open(os.path.join(folder, filename), 'a') as fp: print(t.strftime('%FT%T'), t.year, t.month, t.day, t.hour, t.minute, file=fp) t += dt return sample_basedir
[docs]def remove_sample_folder(): _logger.warning('remove_sampel_folder() does nothing now')
[docs]class SampleDataCenter(BaseDataCenter): """ A sample of a data center implementation. **How to implement a data center** Only three methods are to be implemented. - :meth:`search_files`: Search the data files to store to the data center. - :meth:`approximate_starttime`: Give an extremely quick way of giving a start time for each data file. - :meth:`read_file`: Read a single data file and return the data as a :class:`irfpy.util.timeseries.ObjectSeries` object. **How to read the data from a data center** 1. Preparation. The sample folder is created by :func:`create_sample_folder`. >>> sample_folder = create_sample_folder() >>> print(sample_folder) # doctest: +SKIP /tmp/tmpirg9wgix 2. Create the sample data center >>> dc = SampleDataCenter(sample_folder) Check if the data center correctly created. >>> print(dc.t0()) # The time of the first data 2017-03-30 00:00:00 >>> print(dc.t1()) # The time of the last data 2017-04-02 23:40:00 >>> print(len(dc)) # The size of data center, namely, the number of files contained. 4 3. Get the data via ``nearest`` methods. >>> import datetime >>> t0 = datetime.datetime(2017, 3, 30, 17, 15) >>> tobs, dat = dc.nearest_no_later(t0) >>> print(tobs, dat) 2017-03-30 16:55:00 ['2017' '3' '30' '16' '55'] >>> tobs, dat = dc.nearest_no_earlier(t0) >>> print(tobs, dat) 2017-03-30 17:30:00 ['2017' '3' '30' '17' '30'] >>> tobs, dat = dc.nearest(t0) >>> print(tobs, dat) 2017-03-30 17:30:00 ['2017' '3' '30' '17' '30'] 4. Iterate the data >>> for t, d in dc.iter_strict(): # doctest: +ELLIPSIS ... print(t, d) 2017-03-30 00:00:00 ['2017' '3' '30' '0' '0'] 2017-03-30 00:35:00 ['2017' '3' '30' '0' '35'] ... 2017-03-30 23:55:00 ['2017' '3' '30' '23' '55'] 2017-03-31 00:30:00 ['2017' '3' '31' '0' '30'] ... 2017-04-01 23:45:00 ['2017' '4' '1' '23' '45'] 2017-04-02 00:20:00 ['2017' '4' '2' '0' '20'] ... 2017-04-02 23:05:00 ['2017' '4' '2' '23' '5'] 2017-04-02 23:40:00 ['2017' '4' '2' '23' '40'] >>> for t, d in dc.iter_strict(datetime.datetime(2017, 3, 30, 23, 45), datetime.datetime(2017, 3, 31, 0, 45)): ... print(t, d) 2017-03-30 23:55:00 ['2017' '3' '30' '23' '55'] 2017-03-31 00:30:00 ['2017' '3' '31' '0' '30'] >>> for t, d in dc.iter_wide(datetime.datetime(2017, 3, 30, 23, 45), datetime.datetime(2017, 3, 31, 0, 45)): ... print(t, d) 2017-03-30 23:20:00 ['2017' '3' '30' '23' '20'] 2017-03-30 23:55:00 ['2017' '3' '30' '23' '55'] 2017-03-31 00:30:00 ['2017' '3' '31' '0' '30'] 2017-03-31 01:05:00 ['2017' '3' '31' '1' '5'] >>> for t, d in dc.iter_wide(datetime.datetime(2017, 3, 30, 23, 55), datetime.datetime(2017, 3, 31, 0, 30)): ... print(t, d) 2017-03-30 23:55:00 ['2017' '3' '30' '23' '55'] 2017-03-31 00:30:00 ['2017' '3' '31' '0' '30'] 5. Getting the data as array >>> tlist, dlist = dc.get_array_strict(datetime.datetime(2017, 3, 30, 23, 45), datetime.datetime(2017, 3, 31, 0, 45)) >>> from pprint import pprint >>> pprint(tlist) [datetime.datetime(2017, 3, 30, 23, 55), datetime.datetime(2017, 3, 31, 0, 30)] >>> tlist, dlist = dc.get_array_wide(datetime.datetime(2017, 3, 30, 23, 45), datetime.datetime(2017, 3, 31, 0, 45)) >>> pprint(tlist) [datetime.datetime(2017, 3, 30, 23, 20), datetime.datetime(2017, 3, 30, 23, 55), datetime.datetime(2017, 3, 31, 0, 30), datetime.datetime(2017, 3, 31, 1, 5)] 6. Remove the sample folder. >>> import shutil, os >>> if os.path.isdir(sample_folder): ... shutil.rmtree(sample_folder) """ def __init__(self, sample_basedir, *args, **kwds): """ Initialize the data center. """ self.base_folder = sample_basedir import os as _os if not _os.path.isdir(sample_basedir): raise ValueError('Sample folder structure not found. (Run a function ``create_sample_folder``)') BaseDataCenter.__init__(self, *args, **kwds)
[docs] def search_files(self): """ Search the files, returning a list of them The method will return a list of the file names. Usually, a full path name is used. :return: A list of the file names. """ filelist = [] for _p, _d, _f in _os.walk(self.base_folder): for _ff in _f: if _ff.startswith('DATACENTER_SAMPLE_DATA'): filelist.append(_os.path.join(_p, _ff)) return filelist
[docs] def approximate_starttime(self, filename): """ Get the approxiate start time. :param filename: The name of the file :return: Approximate start time For the samle data, the filename corresponds to the start time of the file. The name is "DATACENTER_SAMPLE_DATA_2017_03_31.dat" for example. """ basename = _os.path.basename(filename) basename = basename.split('_') y = int(basename[3]) m = int(basename[4]) d = int(basename[5][:2]) return _datetime.datetime(y, m, d)
[docs] def read_file(self, filename): """ Read the file, returning a :class:`irfpy.util.timeseries.ObjectSeries` data. :param filename: File name :return: The contents as a format of (``tlist``, ``dlist``). For the sample data, the contents is (6,) shaped array. """ from irfpy.util import timeseries as _ts tlist = [] objlist = [] with open(filename) as fp: for l in fp: lsplit = l.split() t = _parser.parse(lsplit[0]) obj = _np.array(lsplit[1:]) tlist.append(t) objlist.append(obj) return (tlist, objlist)