Source code for irfpy.ica.tools

r"""
Tools and helper functions for ICA data handling

.. codeauthor:: Martin Wieser

Module: irfpy.ica.tools

This library contains tools for ica data handling and functions
functions of interest when migrating ica processing from from matlab to python.

"""

import datetime as dt
import numpy as np
import os.path
from matplotlib.dates import date2num
import warnings
import logging
logging.basicConfig()
_logger = logging.getLogger('ica.tools')
_logger.setLevel(logging.DEBUG)

try:
    import deepdish  # noqa: F401
    _have_deepdish = True
except:
    _have_deepdish = False


old_epoch = '0000-12-31T00:00:00'  # old epoch (pre MPL 3.3)
_EPOCHOFFSET = date2num(np.datetime64(old_epoch))

# datetime2matlab(dt.datetime(2007,1,1,0,0,0))
# 733043.0


# %%


[docs]def get_data_path_info(level, datatype): r""" Return the subdirectory path below the dataroot where the datafiles are located and the fileextension used by the datafiles. level is one of: 'level0', 'level1', 'level2', 'level3', 'aux', 'mag', 'lap', 'cops', ... datatype is one of: 'mat', 'matlab', 'h5', hdf5' """ if level == 'special': # special directories do not use a yyy/mm/dd structure return level+'/','*' datafileextension = 'mat' datadirectory = level + '/matlab/' if datatype == 'h5' or datatype == 'hdf5': if _have_deepdish: datafileextension = 'h5' datadirectory = level + '/hdf5/' else: print("irfpy.ica.io: deepdish not installed. hdf5 file format " + "support is not available.") print(" Install using: 'pip install deepdish'") elif datatype == 'mat' or datatype == 'matlab': datafileextension = 'mat' datadirectory = level + '/matlab/' else: raise ValueError("Invalid datatype specified," + str(datatype) + ", only 'mat','matlab','h5' and 'hdf5' are supported") return datadirectory, datafileextension
# %% date handling routines
[docs]def datetimeNaN(size=-1): r""" Returns as type of a 'NaN' value for datetime objects. Note that the value is not really NaN but can be used in comparisions The value used is 01 JAN 0001. If this function is called with a length parameter >= 0, e.g. :: t= datetimeNaN(100) then a np.array of datetime objects is returned with all elements initialized with the 'NaN' value given by datetimeNaN() """ if size == -1: return dt.datetime.fromordinal(1) else: time_instances = np.empty(size, dtype=object) time_instances[:] = dt.datetime.fromordinal(1) return(time_instances)
[docs]def datetimeInf(size=-1): r""" Returns as type of a 'Inf' value for datetime objects. Note that the value is not really Inf but can be used in comparisions, the value is 01 JAN 3000. If this function is called with a length parameter > 0, e.g. :: t= datetimeInf(100) then a np.array of datetime objects is returned with all elements initialized with the 'Inf' value given by datetimeInf() """ if size == -1: return dt.datetime(3000, 1, 1, 0, 0) else: time_instances = np.empty(size, dtype=object) time_instances[:] = dt.datetime(3000, 1, 1, 0, 0) return(time_instances)
[docs]def matlab2datetime(rawtime, fixepoch=True): r""" Convert a matlab time vector 'rawtime' to a numpy array of datetime objects. it is assumed that the time vector is squeezed prior calling this function. fixepoch attempts to fix problems due to the matplotlib epoch change in matplotlib version > 3.3 Use like this:: matfile=scipy.io.loadmat('xyz.mat') time_instances = matlab2datetime(np.squeeze(matfile['time_instances'])) """ if type(rawtime)==np.ndarray: time_instances = datetimeNaN(rawtime.size) for ii in range(0, rawtime.size): try: rii = rawtime[ii] if fixepoch and rii < 365244.0: # looks like wrongly converted matplotlib epoch # date is < dt.datetime(1000, 1, 1, 0, 0) rii = rii + 719163.0 # assume epoch was wrongly used time_instances[ii] = dt.datetime.fromordinal(int(rii)) + \ dt.timedelta(days=rii % 1) - \ dt.timedelta(days=366) except: pass else: rii = rawtime if fixepoch and rii < 365244.0: rii = rii + 719163.0 time_instances = dt.datetime.fromordinal(int(rii)) + \ dt.timedelta(days=rii % 1) - \ dt.timedelta(days=366) return time_instances
[docs]def datetime2matlab(rawtime): r""" Convert a datetime time vector 'rawtime' to a numpy array of matlab times. it is assumed that the time vector is squeezed prior calling this function. Use like this:: matfile=scipy.io.loadmat('xyz.mat') time_instances = matlab2datetime(np.squeeze(matfile['time_instances']) matlabtime = datetime2matlab(time_instances) """ return date2num(rawtime) + 366.0 - _EPOCHOFFSET
[docs]def ordinal2datetime(rawtime): r""" Convert a python ordinal time vector with fractional days 'rawtime' to a numpy array of datetime objects. It is assumed that the time vector is squeezed prior calling this function. Use like this:: time_instances = ordinal2datetime(rawtime) """ time_instances = np.empty(rawtime.size, dtype=object) for ii in range(0, rawtime.size): try: time_instances[ii] = dt.datetime.fromordinal( int(rawtime[ii])) + dt.timedelta(days=rawtime[ii] % 1) except: time_instances[ii] = datetimeNaN() return time_instances
[docs]def last_day_of_month(any_day): """ Returns the last day of the a month for a given date. >>> import datetime >>> last_day_of_month(datetime.datetime(2015,9,18)) datetime.datetime(2015, 9, 30, 0, 0) """ # this will never fail: next_month = any_day.replace(day=28) + dt.timedelta(days=4) next_month = next_month.replace(hour=0, minute=0, second=0, microsecond=0) return next_month - dt.timedelta(days=next_month.day)
[docs]def end_of_last_day_of_month(any_day): r""" Returns the end of the last day of the a month for a given date. >>> import datetime >>> end_of_last_day_of_month(datetime.datetime(2015,9,18)) datetime.datetime(2015, 9, 30, 23, 59, 59, 999999) """ return last_day_of_month(any_day) + dt.timedelta(days=1, microseconds=-1)
[docs]def string2datetime(stime, default='20000101T000000.000000', fmt='%Y%m%dT%H%M%S.%f'): r""" Converts an ISO8601 string stime to a datetime.datetime object using the format string fmt. The stime string may be shorter than what is specified in the fmt string by leaving out elements at the end. Missing elements at the end of stime are replaced by the values from default. The default string must have the format given in fmt. """ if not isinstance(stime, str): raise ValueError( "irfpy.ica.tools.string2datetime: The parameter stime must " + "be of <class 'str'> (a string) but is now " + str(type(stime))) t = stime + default[len(stime):] try: return dt.datetime.strptime(t, fmt) except: warnings.warn( 'irfpy.ica.tools.string2datetime: Attempted to convert ' + 'an invalid date string: ' + stime) # try to fix it empirically t = stime + default[len(stime):] # 20150931T235959.9999 # check that the day of month is not illegal lastday = last_day_of_month(dt.date(int(t[0:4]), int(t[4:6]), 1)).day # get the length of the month and replace if needed if int(t[6:8]) > lastday and int(t[6:8]) <= 31: tt = t[:6] + str(lastday) + t[8:] return dt.datetime.strptime(tt, fmt) raise
[docs]def datetime2string(dtime, fmt='%Y%m%dT%H%M%S.%f'): r""" Convertes the datetime.datetime object dtime to an ISO8601 string without timezone. Missing elements in the string are replaced by the values in default. """ if not isinstance(dtime, dt.datetime): raise ValueError( "irfpy.ica.tools.datetime2string: The parameter dtime must be " + "of <class 'datetime.datetime'> (a datetime bject) but is now " + str(type(dt))) return dt.datetime.strftime(dtime, fmt)
[docs]def modification_date(filename, not_exist_is_future=False): r""" Returns the modification date of 'filename' as datetime object. If the file does not exist the file is assumed to be very old. """ try: t = os.path.getmtime(filename) return dt.datetime.fromtimestamp(t) except: if not_exist_is_future: return datetimeInf() else: return datetimeNaN()
# def clear_all(): # """ # Removes all variables analog to matlabs clear all command. HOWEVER # this works only when declared in the module where it is used. Howto FIX? # # """ # all = [var for var in globals() if (var[:2], var[-2:]) != ("__", "__") # and var != "clearall"] # for var in all: # del globals()[var] # # %% interval processing related functions
[docs]def interval(start, stop, step=1): r""" An iterator for closed intervalls similar to range(..) but different to range() the last element is included. """ i = start while i <= stop: yield i i += step
[docs]def rle(src): r""" Takes a 1D array numpy array of integers and returns a run-length encoded ordered array of tuples. e.g.:: a = np.array([0, 0, 1, 2, 2, 2, 2, 1, 1, 1, 7]) rle(a) returns:: [(2, 0), (1, 1), (4, 2), (3, 1), (1, 7)] Each tuple (n,v) means that v is repreted n times in the original array. By expanding each tuple this way in the order given, the original array is recovered. """ result = [] lsrc = src.tolist() if len(lsrc) > 0: current = lsrc.pop(0) counter = 1 for e in lsrc: if e == current: counter += 1 else: result.append((counter, current)) current = e counter = 1 result.append((counter, current)) return result
[docs]def irle(src): r""" Iterator for traversing an 1D array in run-length encoded fashion. Same functionality as rle() but as iterator. Returns a tuple (n,v) meaning that the value v is repreted n times. e.g.:: a = np.array([0, 0, 1, 2, 2, 2, 2, 1, 1, 1, 7]) for (n,v) in irle(a): print(n,v) produces this output: :: 2 0 1 1 4 2 3 1 1 7 Read as: 2x '0', followed by 1x '1', followed by 4x '2', followed ... etc. """ iterator = iter(src) current = next(iterator) counter = 1 for e in iterator: if e == current: counter += 1 else: yield counter, current current = e counter = 1 yield counter, current
[docs]def timeslices(src): r""" Iterator for traversing an 1D array in run-length encoded fashion. Functionality is similar to irle() iterator. Returns a tuple sta, sto, v meaning that the value v is repeated from offsets sta to sto-1 in src. e.g.:: a = np.array([0, 0, 1, 2, 2, 2, 2, 1, 1, 1, 7]) for sta,sto,value in timeslices(a): print(sta,sto, a[sta:sto],' = ', value) produces this output: :: 0 2 : [0 0] = 0 2 3 : [1] = 1 3 7 : [2 2 2 2] = 2 7 10 : [1 1 1] = 1 10 11 : [7] = 7 sta and sto are made to be directly used in a slice. If there are several variables that should be searched in parallel for intervals with identical elementns use zip() and the value from the above example becomes a tuple:: a = np.array([0, 0, 1, 2, 2, 2, 2, 1, 1, 1, 7]) b = np.array([9, 9, 9, 9, 9, 3 ,3, 3, 3, 3, 3]) for sta,sto,(aval,bval) in timeslices(zip(a,b): print(sta,sto, ':', a[sta:sto],b[sta:sto],' = ', aval, bval) produces this output: :: 0 2 : [0 0] [9 9] = 0 9 2 3 : [1] [9] = 1 9 3 5 : [2 2] [9 9] = 2 9 5 7 : [2 2] [3 3] = 2 3 7 10 : [1 1 1] [3 3 3] = 1 3 10 11 : [7] [3] = 7 3 """ iterator = iter(src) current = next(iterator) sta = 0 sto = 1 for e in iterator: if e == current: sto += 1 else: yield sta, sto, current sta = sto current = e sto += 1 yield sta, sto, current
[docs]def walk(src,stepsize): r""" Iterator for traversing an 1D array in steps of step length (the last step may be shorter). Returns a sta,sto index that can be used on src:: x = np.arange(30) for sta,sto in walk(x,9): print(sta,sto,' : ', x[sta:sto]) gives the output: :: 0 9 : [0 1 2 3 4 5 6 7 8] 9 18 : [ 9 10 11 12 13 14 15 16 17] 18 27 : [18 19 20 21 22 23 24 25 26] 27 30 : [27 28 29] """ iterator = iter(src) currlen=0 sta = 0 sto = 0 for e in iterator: currlen += 1 sto +=1 if currlen >= stepsize: yield sta, sto sta = sto currlen = 0 yield sta, sto
def _find_first_larger_equal(item, vec): """return the index of the first occurence of item in vec""" for i in range(len(vec)): if vec[i] >= item: return i return -1 def _find_last_smaller(item, vec): """return the index of the last occurence of item in vec""" for i in range(len(vec), 0, -1): if vec[i - 1] < item: return i return -1
[docs]def selecttime(time_instances, start, stop, mod_boundary=1): r""" Returns the indieces from the numpy array of datetime objects "time_instances" that fullfill the following criteria:: t >= start && t < stop where t is an element of time_instances. Note that the stop time itself is not included, analogous to the way python array sliceing works. Start and stop are either datetime objects or strings of the format "YYYYMMDDTHHMMSS". Complete timestamps are preferred, but the HH, MM or SS parts can be omitted if needed, 00 is substituted for the missing elements for the start time. For the stop time, if HH is missing 23 is substituted, missing MM ans SS are replaced by 59. If mod_boundary is set to a value different from 1, then the only every mod_boundary'th element in time_instances is considered in the evaluation and the 15 elements following an evaluated element are treated in the same way as the evaluated element. This is used to e.g. make sure that the boundaries are aligned to elevation boundaries (mod_boundary=16). Example: start = "2000101T11" becomes "2000101T110000" stop = "2000101T11" becomes "2000101T115959" The return value is numpy array of booleans of the same shape as time_instances:: t = selecttime(time_instances,"20150115T12","20150115T13") ti = time_instances[t] # ti contains all elements from "20150115T120000" to, # but not including, "20150115T135959.999999" """ if isinstance(start, str): sta = string2datetime(start, default="20000101T000000.000000") else: sta = start if isinstance(stop, str): sto = string2datetime(stop, default="20201231T235959.999999") else: sto = stop staindex = _find_first_larger_equal(sta, time_instances) stoindex = _find_last_smaller(sto, time_instances) # this is a mask with all elements equal false: mask = np.zeros_like(time_instances, dtype=bool) if staindex >= 0 and stoindex >= 0: # align to scan boundary by searching right for later # times until a boundary is found staindex = (staindex + mod_boundary - 1) // mod_boundary * mod_boundary # align to scan boundary by searching right for later # times until a boundary is found stoindex = (stoindex + mod_boundary - 1) // mod_boundary * mod_boundary mask[staindex:stoindex] = True return mask
# %% Energy vector related functions
[docs]def getE(tables, version): r""" getE extracts the correct energy related 96 element long vector from tables. Usage:: mat = icatools.loadlevel1(somepath,'proc',somedate, variables=['time_interval','E','dE','sw_version']) for tt in mat['time_instances']: E = icatools.getE(mat['E'],sw_version[tt]) dE = icatools.getE(mat['dE'],sw_version[tt]) # E and dE now contain data that is applicable for the time tt by # considering the software version that was active at that time. ...do stuff with E and dE... Similarly, dEE or deltaE can be passed to this function in place of E or dE. """ t = [] if isinstance(version, np.ndarray): if len(version.shape) > 1: raise ValueError( "icatools.getE: The parameter version can be a scalar " + "or a 1D-numpy array, but is now a numpy array of the shape " + str(version.shape)) else: version = np.array((version,)).flat for v in version: theshape = np.shape(tables) if len(theshape) == 1: raise ValueError( "icatools.getE: The parameter table must be 2 or 3 " + "dimensional with the 1st dimension the software version") elif len(theshape) == 2: t.append(tables[v - 1, :]) elif len(theshape) == 3: t.append(tables[v - 1, :, :]) else: raise ValueError( "icatools.getE: The parameter table must be 2 or 3 " + "dimensional with the 1st dimension the software version") return np.squeeze(np.asarray(t).T)
[docs]def numEfromE(E): r''' Returns the number of valid energy steps in the energy vector E that are not NaN. ''' return np.where(np.isnan(E) == False)[0][-1] + 1
[docs]def numEfromversion(version): r''' Returns the number of valid energy steps for a given software version. ''' Emax = 96 if version == 7: Emax = 32 if version == 8: Emax = 8 return Emax
[docs]def ESAvoltageshiftEdE(E, dE, shift=0.0, oldshift=None, analyzerconstant=9.65, rstar = 0.07): r""" Given an E and a dE vector, this calcualted a new E and dE including an energy shift by 'shift' eV. The default shift is 0eV. The shift value is applicable for proc data version >= 4.1. Alternatively to 'shift', 'oldshift' can be used: oldshift = shift + 13.7. """ theshift = shift if oldshift is not None: theshift = oldshift - 13.7 # first reconstruct (ESC_H+ESC_L): Equ 17 ESC_HL = 2*(dE*(1+ 1/(2*analyzerconstant))/rstar - E) # shift ESC_H by shift*analyzerconstant (this formula may need some refinement!) ESC_HL_shift = ESC_HL + (theshift / analyzerconstant) # recalculate E and dE Eshift = E + theshift dEshift = (Eshift+ESC_HL_shift/2)*rstar/(1+ 1/(2*analyzerconstant)) return Eshift, dEshift
[docs]def deltaTfromversion(version): r''' Returns length of one energy sweep in seconds for a given software version . ''' Tmax = 12 if version == 7: Tmax = 4 if version == 8: Tmax = 1 return Tmax
[docs]def paccgroupfrompacc(pacc): r''' Returns the pacc_group for a certain pacc. 0-2 -> 0 3-5 -> 1 6-7 -> 2 ''' return pacc // 3
[docs]def energyintervals(E, linear=False): r''' Returns an energy vector representing energy bands centered around E. E may be filled with NaN above a certain index up to index 95. Values of E from index 0 to the first NaN value or index 95 (whichever is smaller) are used. E may be of the shape (nE,) or (nE, nT) with nE the number of energy steps and nT the number of time_instances. The returned value has the same number of dimensions as the parameter E. linear=False : If set to True, the interval boundaries are constructed using arithmetic averages of neighboring energy centers, otherwise geometric intervall boundaries are used. Example:: E = icatools.getE(mat['E'],version) Einterval = icatools.energyintervals(E,linear=True) ''' # if E is one dimensional: if len(E.shape) == 1: # find largest element in E that is not nan Emax = np.where(np.isnan(E) == False)[0][-1] + 1 Einterval = np.zeros((Emax + 1, 1)) elif len(E.shape) == 2: Emax = -1 tlen = len(E[0, :]) for t in range(tlen): newEmax = np.where(np.isnan(E[:, t]) == False)[0][-1] + 1 if Emax == -1: Emax = newEmax if Emax != newEmax: Emax = newEmax # print(Emax, newEmax) raise ValueError( "energyintervals: the energy vectors for each time " + "instance must have the same length after removing nan " + "elements (e.g. 96, 32, 8)") Einterval = np.zeros((Emax + 1, tlen)) else: raise ValueError("energyintervals: The energy parameter must be one " + "or two dimensional but a parameter with shape " + str(E.shape) + " was passed instead.") # Make energy intervalls centred around E if len(E.shape) == 1: nnE = E[:, np.newaxis].copy() else: nnE = E.copy() if linear: Einterval[1:-1, :] = 0.5 * (nnE[1:Emax, :] + nnE[:Emax - 1, :]) Einterval[0, :] = nnE[0, :] - (Einterval[1, :] - nnE[0, :]) Einterval[-1, :] = nnE[Emax - 1, :] + (nnE[Emax - 1, :] - Einterval[-2, :]) else: for t in range(len(nnE[0, :])): if nnE[0, t] <= 0.0: for k in range(Emax): if nnE[k, t] <= 0.0: # make sure there are only positive numbers nnE[k, t] = (k + 2) * 1e-9 Einterval[1:-1, :] = np.sqrt(nnE[1:Emax, :] * nnE[:Emax - 1, :]) Einterval[0, :] = nnE[0, :] / (Einterval[1, :] / nnE[0, :]) Einterval[-1, :] = nnE[Emax - 1, :] * (nnE[Emax - 1, :] / Einterval[-2, :]) return np.squeeze(Einterval)
[docs]def energytablesfromoffset(ESC_H_volt, ESC_L_volt, sw_version=None, EnergyShift = None, OldOffset = 34.742, NewOffset=34.742, ICAanalyzerconstant = 9.65): r""" Recalculates the ICA energy tables based on the high voltage or energy offset specified. This function accepts two types if input: 1) Energy x time matrixes returned by icatools.getE: ESC_H_volt = icatools.getE(spec['ESC_H_volt'],sw_version)) 2) sw_version sorted high voltage values obtained from specialxxxxxxTyyyy.mat: spec['ESC_H_volt'] sw_version: If a sw_version is given then the resulting tables will be masked such that for sw_version==7 all esteps >=32 are set to nan and for sw_version==8 all esteps >= 8 are set to nan. If sw_version is not None, then input of type 1) is assumed, otherwise type 2 is assumed. EnergyShift: Alternatively specified instead of NewOffset and specifies the shift in eV. Preferred way of shifting the energy scale. OldOffset: Original offset voltage used to calculate ESC_H_volt. Default is 34.742V. This value corresponds to the internal value used in the proc data pipeline. Do not change. NewOffset: New offset voltage to be applied in V. Default is 34.742V. Change this value if a different offset should be used. Returns: E : Energy vector in eV with the same shape as the ESC_xxx inputs deltaE : Energy width (FWHM) in eV with the same shape as the ESC_xxx inputs. fromdate='20160101' todate = '20160123' spec = readspecial(datarootpath, fromdate, todate, variables=['ICAanalyzerconstant', 'ICAhighvoltageoffset', 'ESC_H_volt', 'ESC_L_volt', 'ICAsoftwareversion'], verbose=True) sw_version = 6 # this would be loaded from proc in a real case ESC_H_volt = icatools.getE(spec['ESC_H_volt'],sw_version) ESC_L_volt = icatools.getE(spec['ESC_L_volt'],sw_version) E, deltaE = icatools.energytablesfromoffset(ESC_H_volt, ESC_L_volt) """ # ICA instrument constants dEE0 = 0.07 if EnergyShift is not None: NewOffset = OldOffset - EnergyShift/ICAanalyzerconstant # calculate actual voltages: ESC_H_volt_corr = ESC_H_volt - OldOffset + NewOffset # delta_V is normally positive as ESC_H_volt is the larger value and is negative delta_V = ESC_L_volt - ESC_H_volt_corr # mean_V is usually negative mean_V = (ESC_L_volt + ESC_H_volt_corr)/2 # mean_E is then normally positive mean_E = delta_V * (ICAanalyzerconstant+0.5) # E is then normally less than mean_E # because particles are accelerated into ESA # (mean_V is normally negative and we have positive ions). E = mean_E + mean_V deltaE = mean_E *(dEE0/(ICAanalyzerconstant+0.5)*ICAanalyzerconstant) if sw_version is not None: # fix high time resolution tables in case E this is a sw_version sorted table E[32:,sw_version==7] = np.nan deltaE[32:,sw_version==7] = np.nan E[8:,sw_version==8] = np.nan deltaE[8:,sw_version==8] = np.nan return E, deltaE
# %% Elevation related functions
[docs]def expandElevation(orig_ionspectra): r""" expandElevation returns a easy to handle numpy 5D matrix with dimensions Azimuth x Energy x Mass x Elevation x Time based on orig_ionspectra . Usage:: for (start,stop,version) in icatools.timeslices(sw_version): fiveD = expandElevation(orig_ionspectra[:,:,:,start:stop]) """ sh = orig_ionspectra.shape if len(sh) == 4: spectrum = orig_ionspectra.copy() return spectrum.reshape(sh[0], sh[1], sh[2], -1, 16).swapaxes(3, 4) else: raise ValueError( "expandElevation: orig_ionspectra must have 4 dimensions and the " + "shape of the last one must be divisible by 16")
[docs]def collapseElevation(fiveD): r""" collapseElevation returns a easy to handle numpy 4D matrix with dimensions Azimuth x Energy x Mass x Time based on fiveD. Usage:: for (start,stop,version) in icatools.timeslices(mode): fiveD = expandElevation(orig_ionspectra[:,:,:,start:stop]) fourD = collapseElevation(fiveD) """ sh = fiveD.shape if len(sh) == 5: spectrum = fiveD.copy() return spectrum.swapaxes(3, 4).reshape(sh[0], sh[1], sh[2], -1) else: raise ValueError("collapseElevation: fiveD must have 5 dimensions")
[docs]def get5Dmatrix(orig_ionspectra): r""" get5Dmatrix returns a easy to handle numpy 5D matrix with dimensions Azimuth x Energy x Mass x Elevation x Time based on orig_ionspectra. Usage:: for (start,stop,version) in icatools.timeslices(mode): fiveD = get5Dmatrix(orig_ionspectra[:,:,:,start:stop]) DO NOT USE FOR NEW CODE Use expandElevation() instead. """ return expandElevation(orig_ionspectra)
[docs]def getDictElement(mdict, key): r""" returns mdict[key] if key exists in mdict or [] otherwise """ if key in mdict: return mdict[key] else: return []
#%%
[docs]def procDataTree(dataPath): r'''Returns a dictonnary with all proc data found bellow dataPath:: dTree={'20141128':['fullPathT1','fullPathT12', ...], ... }. Etienne Behar. ''' dTree = {} for root, dirNames, fileNames in os.walk(dataPath): for fileName in fileNames: if 'proc' not in fileName: continue day = fileName[4:12] try: # If this day already exists in the dictionnary. dTree[day].append(os.path.join(root, fileName)) except KeyError: # If not, the day is created: dTree[day] = [os.path.join(root, fileName)] return dTree
[docs]def model67P(additionnalPath='', resolution=1, modelfile='CSHP_DV_130_01_______00200.obj'): r''' 67P/Churyumov-Gerasimenko navCam model. `http://imagearchives.esac.esa.int/index.php?/page/navcam_3d_models` Etienne Behar. ''' vf = np.loadtxt(additionnalPath + modelfile, usecols=[1, 2, 3]) tag = np.loadtxt(additionnalPath + modelfile, usecols=[0], dtype=str) v = vf[tag == "b'v'"] # f = vf[tag == "b'f'"] return np.array(v[::resolution])
[docs]def norm(v): r''' Returns the length of an array of vector, along its dimension 0. ''' return np.sqrt(np.sum(v**2, axis=0))
[docs]def vec2ConeClock(v): r''' Returns the cone (angle from -x axis) and clock (angle of projected vector in (y,z), from y) angles. cone is returned in [0, pi] clock is returned in [0, 2pi] v.shape = 3xnbTime ''' cone = np.arccos(-v[0] / norm(v)) clock = np.arccos(v[1] / np.sqrt(v[1]**2 + v[2]**2)) clock[v[2] < 0] = 2 * np.pi - clock[v[2] < 0] # Clock in [0,2pi]e return cone, clock