Source code for irfpy.util.datafile

''' irfpy-standard data file.

.. codeauthor:: Yoshifumi Futaana

See the tutorial on the data file :ref:`tutorial_datafile`.
'''
import os
import datetime
from io import StringIO
from collections import OrderedDict

[docs]class Datafile: r''' A template file of data file. All the irfpy-compatible data file has a very simple structure. It composes of header, data and trailer. Each can contain more than zero entries. Header and trailer includes general information or meta values on the data file. Version number and the creation time is automatically added at the top. Header and trailer lines start with "#'. You can add any header and trailer by :meth:`add_header` or :meth:`add_trail` methods. The data includes the data. You may add an entry of data by :meth:`add_data`. Multiple data can be stored. ``datakey`` can be given. If the same ``datakey`` is called in :meth:`add_data`, the data is appended in the end. You may also use :meth:`continue_data` without specifying the data key. The last one is used in this case. To append the data, using ``str`` is highly recommended. >>> df = Datafile(version="1.0") # Create data file on memory. >>> print(df.dumps()) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE ### HEADER : 2 # VERSION : 1.0 # CREATION_DATE : ... ### DATA : 0 ### TRAIL : 0 You may add header/trail. >>> df.add_header('SPACECRAFT', 'MEX') >>> df.add_trail('LICENSE', 'BSD') >>> print(df.dumps()) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE ### HEADER : 3 ... # SPACECRAFT : MEX ... ### TRAIL : 1 # LICENSE : BSD Adding data is as follows. >>> df.add_data('FLUX', '0 2009-01-03T18:05:33 2.8 1.9 4.1 7.3') >>> df.add_data('FLUX', '1 2009-01-03T18:08:45 2.7 2.3 3.7 6.8') >>> df.add_data('FLUX', '2 2009-01-03T18:11:57 2.6 1.9 2.8 9.0') >>> print(df.dumps()) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE ### HEADER : 3 ... ### DATA : 1 ## DATA : FLUX : 3 0 2009-01-03T18:05:33 2.8 1.9 4.1 7.3 1 2009-01-03T18:08:45 2.7 2.3 3.7 6.8 2 2009-01-03T18:11:57 2.6 1.9 2.8 9.0 ... Reading data is as follows. >>> from io import StringIO # Prepare data file. >>> fp = StringIO() >>> b = fp.write('### HEADER : 2\n') >>> b = fp.write('# VERSION : 1.0\n') >>> b = fp.write('# CREATION_DATE : 2013-03-08T09:56:22\n') >>> b = fp.write('### DATA : 1\n') >>> b = fp.write('## DATA : FLUX : 3\n') >>> b = fp.write('0 2009-01-03T18:05:33 2.8 1.9 4.1 7.3\n') >>> b = fp.write('1 2009-01-03T18:08:45 2.7 2.3 3.7 6.8\n') >>> b = fp.write('2 2009-01-03T18:11:57 2.6 1.9 2.8 9.0\n') >>> b = fp.write('### TRAIL : 0\n') >>> b = fp.seek(0) # Faked data file ready. Usually, fp = open('fname.dat') is enough. Here comes the way of reading. >>> fp = open('fname.dat') # doctest: +SKIP >>> df = Datafile() >>> df.readfile(fp) >>> dfreader = DatafileReader(df) >>> print(dfreader.get_header('VERSION')) 1.0 >>> print(dfreader.get_header('CREATION_DATE')) 2013-03-08T09:56:22 >>> print(dfreader.get_header('SOMETHING_NOT_THERE')) None >>> print(dfreader.get_data('FLUX')[2]) 2 2009-01-03T18:11:57 2.6 1.9 2.8 9.0 ''' def __init__(self, version="1.0"): ''' Instnce a data file class with version ''' self.header = OrderedDict() self.data = OrderedDict() self.__current_data_key = None self.trail = OrderedDict() self.add_header('VERSION', version) self.add_header('CREATION_DATE', datetime.datetime.now().strftime('%FT%T'))
[docs] def add_header(self, head_key, head_data): self.header[head_key] = str(head_data)
[docs] def add_creation_path(self): ''' Add a creation path to a header. ''' self.add_header('CREATION_PATH', os.getcwd())
[docs] def add_data(self, data_key, data): if not data_key in self.data: self.data[data_key] = [] self.data[data_key].append(str(data)) self.__current_data_key = data_key
[docs] def set_cdkey(self, data_key): ''' Set the current data key. ''' if not data_key in self.data: self.data[data_key] = [] self.__current_data_key = data_key
[docs] def continue_data(self, data): if self.__current_data_key is None: raise KeyError('No data added before. Use add_data') self.data[self.__current_data_key].append(str(data))
[docs] def add_trail(self, trail_key, trail_data): self.trail[trail_key] = str(trail_data)
[docs] def dumps(self): out = StringIO() print("### HEADER : %d" % len(self.header), file=out) for key in self.header: print('# %s : %s' % (key, self.header[key]), file=out) print("### DATA : %d" % len(self.data), file=out) for key in self.data: print('## DATA : %s : %d' % (key, len(self.data[key])), file=out) if len(self.data[key]) != 0: print('\n'.join(self.data[key]), file=out) else: pass # No output print("### TRAIL : %d" % len(self.trail), file=out) for key in self.trail: print('# %s : %s' % (key, self.trail[key]), file=out) return out.getvalue()
[docs] def dump(self, fp): ''' Print to a file-like, ``fp`` ''' fp.write(self.dumps())
def __str__(self): return '<Datafile.V%s instance. H,D,T entries=%d,%d,%d>' % ( self.header['VERSION'], len(self.header), len(self.data), len(self.trail))
[docs] def readfile(self, fp): ''' Read the file from the file-like, fp ''' self.readheader(fp) self.readdata(fp) self.readtrail(fp)
[docs] def readheader(self, fp): ### First line includes number of header. nh = int(fp.readline().split()[-1]) for ih in range(nh): hl = fp.readline().split(":") self.add_header(hl[0][2:].strip(), ':'.join(hl[1:]).strip())
[docs] def readtrail(self, fp): nt = int(fp.readline().split()[-1]) for it in range(nt): tl = fp.readline().split(":") self.add_trail(tl[0][2:].strip(), tl[-1].strip())
[docs] def readdata(self, fp): nd = int(fp.readline().split()[-1]) for idata in range(nd): datahead = fp.readline().split(":") dataname = ' '.join(datahead[1:-1]).strip() datalen = int(datahead[-1]) for iline in range(datalen): line = fp.readline().strip() self.add_data(dataname, line)
[docs]class DatafileReader: def __init__(self, data_file, missing_return=None): self.datafile = data_file self.missing_return = missing_return
[docs] def get_header(self, key): if key in self.datafile.header: return self.datafile.header[key] else: return self.missing_return
[docs] def get_data(self, key): if key in self.datafile.data: return self.datafile.data[key] else: return self.missing_return
[docs] def get_trail(self, key): if key in self.datafile.trail: return self.datafile.trail[key] else: return self.missing_return
import unittest import doctest
[docs]def doctests(): return unittest.TestSuite(( doctest.DocTestSuite(), ))
if __name__ == '__main__': unittest.main(defaultTest='doctests')