Source code for irfpy.jupsci.mhddata

''' MHD simulation result provided from Jia.

The data files are provided from Jia and named as

``Ganymede_MHD_Bfield_Jia.mat``
    A magnetic field data.  :class:`MagneticField1201`.

``MHD_PrecipitationFlux_Jia.dat``
    A precipitation flux calculated from Jia.  :class:`Flux1205`.
    The flux is calculated as a multiple of the density and velocity along B.

``MHD_PlasmaParameters_3Rg_XianzheJia.mat``.
    Plasma parameters obtained from MHD data. :class:`PlasmaParameter1205`.

The Bfield data provides the L-value, the magnetic field line at the equator.
(Indeed, for simplicity I assumed the L-value is the distance that is fartherest from the
position of interest traced along magnetic field line.)
This is implemented in the :class:`LValue1201`.

In addition to the provision by Jia, I made two more precipitation files.

``MHD_PrecipitationFlux_YF_footprint.dat``
    Provides the precipitation flux at the footpoint of surface, by mapping the downgoing
    flux along B field.

``MHD_PrecipitationFlux_YF_nadir.dat``
    Provides the precipitation flux at the surface (nadir mapping), for the
    plasma with pitchangle <90.

'''

import os
import logging
import bisect
import gzip
import pickle

from pkg_resources import resource_filename

import numpy as np
import scipy.io

import irfpy.util.irfpyrc

[docs]class PlasmaParameter1205: ''' Plasma parameter provided from Jia in mid 2012. >>> plaprm = PlasmaParameter1205() The data file is read and stored to the attribute of :attr:`dataarray`. Implementation of quantity getter, e.g.:meth:`getDensity` or whatever, returns (200, 200, 200)-shape array, which represent the 3-D data. For example, getDensity()[30, 70, 128] represent the density at the coordinate of (x, y, z) = (xlist()[30, 70, 128], ylist()[30, 70, 128], zlist()[30, 70, 128]) See, if you want to know the details of the file format, see :ref:`plasmaparameterjia`. ''' logger = logging.getLogger('PlasmaParameter1205') logger.setLevel(logging.DEBUG) def __init__(self): rc = irfpy.util.irfpyrc.Rc() self.datapath = rc.get('pep', 'mhddatapath') if self.datapath == None: raise IOError('No key of [pep] mhddatapath in irfpyrc file(s).') self.filename = os.path.join(self.datapath, 'MHD_PlasmaParameters_3Rg_XianzheJia.mat') self.logger.debug('Filename = %s' % self.filename) self.dataarray = scipy.io.loadmat(self.filename) ''' Loaded data from the file. '''
[docs] def xlist(self): ''' Return the np.array of x coordinates. :returns: The array of x coordinate system. Shape is (200, 200, 200) ''' return self.dataarray['X'].reshape((200, 200, 200))
[docs] def ylist(self): return self.dataarray['Y'].reshape((200, 200, 200))
[docs] def zlist(self): return self.dataarray['Z'].reshape((200, 200, 200))
[docs] def nlist(self): return self.dataarray['N'].reshape((200, 200, 200))
[docs] def vxlist(self): return self.dataarray['Vx'].reshape((200, 200, 200))
[docs] def vylist(self): return self.dataarray['Vy'].reshape((200, 200, 200))
[docs] def vzlist(self): return self.dataarray['Vz'].reshape((200, 200, 200))
[docs] def pthlist(self): return self.dataarray['Pth'].reshape((200, 200, 200))
[docs] def tlist(self): return self.dataarray['T'].reshape((200, 200, 200))
[docs] def lonlist(self): return np.rad2deg(np.arctan2(self.ylist(), self.xlist()))
[docs] def rlist(self): x = self.xlist(); y=self.ylist(); z=self.zlist() return np.sqrt(x * x + y * y + z * z)
[docs] def latlist(self): return np.rad2deg(np.arcsin(self.zlist() / self.rlist()))
[docs] def bin_indexes(self, x, y, z): ''' Return the indexes of the given position :returns: Corresponding indexes. The returned is a tuple (ix, iy, iz). The given position (x, y, z) is inside (ix, iy, iz) and (ix+1, iy+1, iz+1). ''' xc = self.xlist()[:, 0, 0] yc = self.ylist()[0, :, 0] zc = self.zlist()[0, 0, :] ix = bisect.bisect(xc, x) - 1 iy = bisect.bisect(yc, y) - 1 iz = bisect.bisect(zc, z) - 1 if ix < 0 or ix >= 199 or iy < 0 or iy >= 199 or iz < 0 or iz >= 199: raise IndexError("Out of range.") return (ix, iy, iz)
[docs] def interpolate3d(self, x, y, z): ''' >>> pp = PlasmaParameter1205() Interpolate the data. This case is indeed on the grid. >>> n, vx, vy, vz, t, pth = pp.interpolate3d(1.8, 1.2, 0.0) >>> print((' %.3f' * 6) % (n, vx, vy, vz, t, pth)) 0.259 -124.000 28.365 -14.754 18756.000 0.777 >>> n, vx, vy, vz, t, pth = pp.interpolate3d(1.81, 1.19, 0.02) >>> print((' %.3f' * 6) % (n, vx, vy, vz, t, pth)) 0.244 -125.355 26.277 -2.472 19178.578 0.750 ''' ix, iy, iz = self.bin_indexes(x, y, z) parameters = [] xc = self.xlist() yc = self.ylist() zc = self.zlist() n = self.nlist() vx = self.vxlist() vy = self.vylist() vz = self.vzlist() t = self.tlist() pth = self.pthlist() # L2 distance d000 = (x - xc[ix, iy, iz]) ** 2 + (y - yc[ix, iy, iz]) ** 2 + (z - zc[ix, iy, iz]) ** 2 d001 = (x - xc[ix, iy, iz + 1]) ** 2 + (y - yc[ix, iy, iz + 1]) ** 2 + (z - zc[ix, iy, iz + 1]) ** 2 d010 = (x - xc[ix, iy + 1, iz]) ** 2 + (y - yc[ix, iy + 1, iz]) ** 2 + (z - zc[ix, iy + 1, iz]) ** 2 d011 = (x - xc[ix, iy + 1, iz + 1]) ** 2 + (y - yc[ix, iy + 1, iz + 1]) ** 2 + (z - zc[ix, iy + 1, iz + 1]) ** 2 d100 = (x - xc[ix + 1, iy, iz]) ** 2 + (y - yc[ix + 1, iy, iz]) ** 2 + (z - zc[ix + 1, iy, iz]) ** 2 d101 = (x - xc[ix + 1, iy, iz + 1]) ** 2 + (y - yc[ix + 1, iy, iz + 1]) ** 2 + (z - zc[ix + 1, iy, iz + 1]) ** 2 d110 = (x - xc[ix + 1, iy + 1, iz]) ** 2 + (y - yc[ix + 1, iy + 1, iz]) ** 2 + (z - zc[ix + 1, iy + 1, iz]) ** 2 d111 = (x - xc[ix + 1, iy + 1, iz + 1]) ** 2 + (y - yc[ix + 1, iy + 1, iz + 1]) ** 2 + (z - zc[ix + 1, iy + 1, iz + 1]) ** 2 dists = np.array([d000, d001, d010, d011, d100, d101, d110, d111]) if dists.min() == 0: self.logger.debug('ZERO distance found') zero_len_index = dists.argmin() ix += ((zero_len_index >> 2) & 1) iy += ((zero_len_index >> 1) & 1) iz += (zero_len_index & 1) return (n[ix, iy, iz], vx[ix, iy, iz], vy[ix, iy, iz], vz[ix, iy, iz], t[ix, iy, iz], pth[ix, iy, iz]) for dat in (n, vx, vy, vz, t, pth): dat000 = dat[ix, iy, iz] dat001 = dat[ix, iy, iz + 1] dat010 = dat[ix, iy + 1, iz] dat011 = dat[ix, iy + 1, iz + 1] dat100 = dat[ix + 1, iy, iz] dat101 = dat[ix + 1, iy, iz + 1] dat110 = dat[ix + 1, iy + 1, iz] dat111 = dat[ix + 1, iy + 1, iz + 1] datintp = ((dat000 / d000) + (dat001 / d001) + (dat010 / d010) + (dat011 / d011) + (dat100 / d100) + (dat101 / d101) + (dat110 / d110) + (dat111 / d111)) / ( (1 / d000) + (1 / d001) + (1 / d010) + (1 / d011) + (1 / d100) + (1 / d101) + (1 / d110) + (1 / d111)) parameters.append(datintp) return parameters
[docs] def nearest_neighbor(self, x, y, z): ''' Return the nearest neighbor index >>> pp = PlasmaParameter1205() >>> idx = pp.nearest_neighbor(0., 0., 0.) >>> print(idx) (100, 100, 100) :param x: X coordinate in Rg. :param y: Y coordinate in Rg. :param z: Z coordinate in Rg. ''' xc = self.xlist()[:, 0, 0] yc = self.ylist()[0, :, 0] zc = self.zlist()[0, 0, :] ix, iy, iz = self.bin_indexes(x, y, z) if xc[ix + 1] - x < x - xc[ix]: ix = ix + 1 if yc[iy + 1] - y < y - yc[iy]: iy = iy + 1 if zc[iz + 1] - z < z - zc[iz]: iz = iz + 1 return (ix, iy, iz)
[docs]class MagneticField1201: ''' Magnetic field data provided from Jia in January 2012. >>> mf = MagneticField1201() The data file, ``Ganymede_MHD_Bfield_Jia.mat`` is read. Note that the data include (0, 0, 0) where the calculation is not done (i.e. inside the inner boundary). ''' logger = logging.getLogger('MagneticField1201') logger.setLevel(logging.DEBUG) def __init__(self, interpolate='l2'): rc = irfpy.util.irfpyrc.Rc() self.datapath = rc.get('pep', 'mhddatapath') if self.datapath == None: raise IOError('No key of [pep] mhddatapath in irfpyrc file(s).') self.filename = os.path.join(self.datapath, 'Ganymede_MHD_Bfield_Jia.mat') self.logger.debug('File = %s' % self.filename) self.dataarray = scipy.io.loadmat(self.filename) self.interpolate3d = None ''' Method to return the B vector at the arbitrary position. According to `interpolate` in the initializer, algorithm is chosen. See also :meth:`interpolate3d_l2`, :meth:`interpolate3d_trilin` and :meth:`interpolate3d_4p`. ''' if interpolate == 'l2': self.interpolate3d = self.interpolate3d_l2 elif interpolate == 'trilin': self.interpolate3d = self.interpolate3d_trilin elif interpolate == '4p': import warnings warnings.warn('''!!! NOT YET RECOMMENDED !!! There are still unwanted features for this algorithm. Use "l2" for the interpolation. ''') self.interpolate3d = self.interpolate3d_4p else: raise ValueError('Unknown algorithm for "interpolate", %s' % interpolate) self.strategy = "none"
[docs] def xlist(self): ''' Return the np.array of x coordinates. :returns: The array of x coordinate system. Shape is (200, 200, 200) ''' return np.transpose(self.dataarray['X'].reshape((200, 200, 200)), axes=(1, 0, 2))
[docs] def ylist(self): return np.transpose(self.dataarray['Y'].reshape((200, 200, 200)), axes=(1, 0, 2))
[docs] def zlist(self): return np.transpose(self.dataarray['Z'].reshape((200, 200, 200)), axes=(1, 0, 2))
[docs] def bxlist(self): return np.transpose(self.dataarray['Bx_MHD'].reshape((200, 200, 200)), axes=(1, 0, 2))
[docs] def bylist(self): return np.transpose(self.dataarray['By_MHD'].reshape((200, 200, 200)), axes=(1, 0, 2))
[docs] def bzlist(self): return np.transpose(self.dataarray['Bz_MHD'].reshape((200, 200, 200)), axes=(1, 0, 2))
[docs] def lonlist(self): return np.rad2deg(np.arctan2(self.ylist(), self.xlist()))
[docs] def rlist(self): x = self.xlist(); y=self.ylist(); z=self.zlist() return np.sqrt(x * x + y * y + z * z)
[docs] def latlist(self): return np.rad2deg(np.arcsin(self.zlist() / self.rlist()))
[docs] def bin_indexes(self, x, y, z): ''' Return the indexes of the given position :returns: Corresponding indexes. The returned is a tuple (ix, iy, iz). The given position (x, y, z) is inside (ix, iy, iz) and (ix+1, iy+1, iz+1). ''' xc = self.xlist()[:, 0, 0] yc = self.ylist()[0, :, 0] zc = self.zlist()[0, 0, :] ix = bisect.bisect(xc, x) - 1 iy = bisect.bisect(yc, y) - 1 iz = bisect.bisect(zc, z) - 1 if ix < 0 or ix >= 199 or iy < 0 or iy >= 199 or iz < 0 or iz >= 199: raise IndexError("Out of range.") return (ix, iy, iz)
[docs] def set_interpolate_strategy(self, strategy="none"): ''' Set strategy for R<1.1. As R<1.1, no data is available. Indeed, zero is filled. Here I try to make several strategy. "none" Do nothing. Just return 0 as the neighboring grid. "project" Project the data at 1.15 Rj of the same latitude and longitude. Projection valid if r<1.15 ''' self.strategy = strategy
[docs] def interpolate3d_4p(self, x, y, z): ''' Return the B vector at the arbitorary position. >>> mf = MagneticField1201(interpolate='4p') >>> print(mf.interpolate3d(1, 3, 5)) [ -8.7307 9.42083 -79.092 ] >>> print(mf.interpolate3d(1.01, 3.01, 5.01)) [ -8.583675 9.400121 -79.05206 ] ''' if self.strategy == 'project': if x * x + y * y + z * z < 1.149 ** 2: # Slightly less than 1.15 r2 = x * x + y * y + z * z self.logger.debug('PROJECT used (r=%f)' % np.sqrt(r2)) p = np.array([x, y, z]) pnew = p / np.sqrt(r2) * 1.15 return self.interpolate3d(pnew[0], pnew[1], pnew[2]) * ((1.15 ** 2 / r2) ** 1.5) ix, iy, iz = self.bin_indexes(x, y, z) parameters = [] xc = self.xlist() yc = self.ylist() zc = self.zlist() bx = self.bxlist() by = self.bylist() bz = self.bzlist() # L2 distance d000 = (x - xc[ix, iy, iz]) ** 2 + (y - yc[ix, iy, iz]) ** 2 + (z - zc[ix, iy, iz]) ** 2 d001 = (x - xc[ix, iy, iz + 1]) ** 2 + (y - yc[ix, iy, iz + 1]) ** 2 + (z - zc[ix, iy, iz + 1]) ** 2 d010 = (x - xc[ix, iy + 1, iz]) ** 2 + (y - yc[ix, iy + 1, iz]) ** 2 + (z - zc[ix, iy + 1, iz]) ** 2 d011 = (x - xc[ix, iy + 1, iz + 1]) ** 2 + (y - yc[ix, iy + 1, iz + 1]) ** 2 + (z - zc[ix, iy + 1, iz + 1]) ** 2 d100 = (x - xc[ix + 1, iy, iz]) ** 2 + (y - yc[ix + 1, iy, iz]) ** 2 + (z - zc[ix + 1, iy, iz]) ** 2 d101 = (x - xc[ix + 1, iy, iz + 1]) ** 2 + (y - yc[ix + 1, iy, iz + 1]) ** 2 + (z - zc[ix + 1, iy, iz + 1]) ** 2 d110 = (x - xc[ix + 1, iy + 1, iz]) ** 2 + (y - yc[ix + 1, iy + 1, iz]) ** 2 + (z - zc[ix + 1, iy + 1, iz]) ** 2 d111 = (x - xc[ix + 1, iy + 1, iz + 1]) ** 2 + (y - yc[ix + 1, iy + 1, iz + 1]) ** 2 + (z - zc[ix + 1, iy + 1, iz + 1]) ** 2 dists = np.array([d000, d001, d010, d011, d100, d101, d110, d111]) if dists.min() == 0: self.logger.debug('ZERO distance found') zero_len_index = dists.argmin() ix += ((zero_len_index >> 2) & 1) iy += ((zero_len_index >> 1) & 1) iz += (zero_len_index & 1) return np.array([bx[ix, iy, iz], by[ix, iy, iz], bz[ix, iy, iz]]) # Otherwise, choose 4 nearest points. idx = np.argsort(dists)[:4] idx_x = ((idx >> 2) & 1) + ix idx_y = ((idx >> 1) & 1) + iy idx_z = (idx & 1) + iz xlist = xc[idx_x, idx_y, idx_z] ylist = yc[idx_x, idx_y, idx_z] zlist = zc[idx_x, idx_y, idx_z] bxlist = bx[idx_x, idx_y, idx_z] bylist = by[idx_x, idx_y, idx_z] bzlist = bz[idx_x, idx_y, idx_z] for dat in (bxlist, bylist, bzlist): matx = np.array([[xlist[0], ylist[0], zlist[0], 1], [xlist[1], ylist[1], zlist[1], 1], [xlist[2], ylist[2], zlist[2], 1], [xlist[3], ylist[3], zlist[3], 1]]) try: matx = np.linalg.inv(matx) except np.linalg.LinAlgError as e: print('SINGULAR MATRIX') print(matx) return np.zeros([3]) + np.nan coeffs = matx.dot(np.array([dat[0], dat[1], dat[2], dat[3]])) datintp = coeffs.dot(np.array([x, y, z, 1])) parameters.append(datintp) return np.array(parameters)
[docs] def interpolate3d_trilin(self, x, y, z): ''' Return the B vector at the arbitorary position. The trilinear interpolation is used. >>> mf = MagneticField1201(interpolate='trilin') >>> print(mf.interpolate3d(1, 3, 5)) [ -8.7307 9.42083 -79.092 ] >>> print(mf.interpolate3d(1.01, 3.01, 5.01)) [ -8.58011575 9.40287633 -79.053923 ] ''' if self.strategy == "project": if x * x + y * y + z * z < 1.149 ** 2: # Slightly less than 1.15 r2 = x * x + y * y + z * z self.logger.debug('PROJECT used (r=%f)' % np.sqrt(r2)) p = np.array([x, y, z]) pnew = p / np.sqrt(r2) * 1.15 return self.interpolate3d(pnew[0], pnew[1], pnew[2]) * ((1.15 ** 2 / r2) ** 1.5) ix, iy, iz = self.bin_indexes(x, y, z) parameters = [] xc = self.xlist() yc = self.ylist() zc = self.zlist() bx = self.bxlist() by = self.bylist() bz = self.bzlist() # First, obtain the fraction xd, yd and zd. xd = (x - xc[ix, iy, iz]) / (xc[ix + 1, iy + 1, iz + 1] - xc[ix, iy, iz]) yd = (y - yc[ix, iy, iz]) / (yc[ix + 1, iy + 1, iz + 1] - yc[ix, iy, iz]) zd = (z - zc[ix, iy, iz]) / (zc[ix + 1, iy + 1, iz + 1] - zc[ix, iy, iz]) for dat in (bx, by, bz): # First, obtain 4 point by interpolation along x. c00 = dat[ix, iy, iz] * (1 - xd) + dat[ix + 1, iy, iz] * xd c01 = dat[ix, iy, iz + 1] * (1 - xd) + dat[ix + 1, iy, iz + 1] * xd c10 = dat[ix, iy + 1, iz] * (1 - xd) + dat[ix + 1, iy + 1, iz] * xd c11 = dat[ix, iy + 1, iz + 1] * (1 - xd) + dat[ix + 1, iy + 1, iz + 1] * xd # Then, obtain 2 point by interpolation along y. c0 = c00 * (1 - yd) + c10 * yd c1 = c01 * (1 - yd) + c11 * yd # Finally obtain the point by interpolation along z. c = c0 * (1 - zd) + c1 * zd parameters.append(c) return np.array(parameters)
[docs] def interpolate3d_l2(self, x, y, z): ''' Return the B vector at the arbitorary position. >>> mf = MagneticField1201() >>> print(mf.interpolate3d(1, 3, 5)) [ -8.7307 9.42083 -79.092 ] >>> print(mf.interpolate3d(1.01, 3.01, 5.01)) [ -8.61484666 9.41288854 -79.06814561] ''' if self.strategy == "project": if x * x + y * y + z * z < 1.149 ** 2: # Slightly less than 1.15 r2 = x * x + y * y + z * z self.logger.debug('PROJECT used (r=%f)' % np.sqrt(r2)) p = np.array([x, y, z]) pnew = p / np.sqrt(r2) * 1.15 return self.interpolate3d(pnew[0], pnew[1], pnew[2]) * ((1.15 ** 2 / r2) ** 1.5) ix, iy, iz = self.bin_indexes(x, y, z) parameters = [] xc = self.xlist() yc = self.ylist() zc = self.zlist() bx = self.bxlist() by = self.bylist() bz = self.bzlist() # L2 distance d000 = (x - xc[ix, iy, iz]) ** 2 + (y - yc[ix, iy, iz]) ** 2 + (z - zc[ix, iy, iz]) ** 2 d001 = (x - xc[ix, iy, iz + 1]) ** 2 + (y - yc[ix, iy, iz + 1]) ** 2 + (z - zc[ix, iy, iz + 1]) ** 2 d010 = (x - xc[ix, iy + 1, iz]) ** 2 + (y - yc[ix, iy + 1, iz]) ** 2 + (z - zc[ix, iy + 1, iz]) ** 2 d011 = (x - xc[ix, iy + 1, iz + 1]) ** 2 + (y - yc[ix, iy + 1, iz + 1]) ** 2 + (z - zc[ix, iy + 1, iz + 1]) ** 2 d100 = (x - xc[ix + 1, iy, iz]) ** 2 + (y - yc[ix + 1, iy, iz]) ** 2 + (z - zc[ix + 1, iy, iz]) ** 2 d101 = (x - xc[ix + 1, iy, iz + 1]) ** 2 + (y - yc[ix + 1, iy, iz + 1]) ** 2 + (z - zc[ix + 1, iy, iz + 1]) ** 2 d110 = (x - xc[ix + 1, iy + 1, iz]) ** 2 + (y - yc[ix + 1, iy + 1, iz]) ** 2 + (z - zc[ix + 1, iy + 1, iz]) ** 2 d111 = (x - xc[ix + 1, iy + 1, iz + 1]) ** 2 + (y - yc[ix + 1, iy + 1, iz + 1]) ** 2 + (z - zc[ix + 1, iy + 1, iz + 1]) ** 2 dists = np.array([d000, d001, d010, d011, d100, d101, d110, d111]) if dists.min() == 0: self.logger.debug('ZERO distance found') zero_len_index = dists.argmin() ix += ((zero_len_index >> 2) & 1) iy += ((zero_len_index >> 1) & 1) iz += (zero_len_index & 1) return np.array([bx[ix, iy, iz], by[ix, iy, iz], bz[ix, iy, iz]]) for dat in (bx, by, bz): dat000 = dat[ix, iy, iz] dat001 = dat[ix, iy, iz + 1] dat010 = dat[ix, iy + 1, iz] dat011 = dat[ix, iy + 1, iz + 1] dat100 = dat[ix + 1, iy, iz] dat101 = dat[ix + 1, iy, iz + 1] dat110 = dat[ix + 1, iy + 1, iz] dat111 = dat[ix + 1, iy + 1, iz + 1] datintp = ((dat000 / d000) + (dat001 / d001) + (dat010 / d010) + (dat011 / d011) + (dat100 / d100) + (dat101 / d101) + (dat110 / d110) + (dat111 / d111)) / ( (1 / d000) + (1 / d001) + (1 / d010) + (1 / d011) + (1 / d100) + (1 / d101) + (1 / d110) + (1 / d111)) parameters.append(datintp) return np.array(parameters)
[docs]class LValue1201: ''' L-value class. L-value calculated from :class:`MagneticField1201`. >>> lval = LValue1201() >>> l = lval.lvalue() >>> print(lval.lonlist()[10], lval.latlist()[70], lval.lvalue()[10, 70]) 20.0 -20.0 1.202 The data file is, as the file size is very small, in the ``irfpy.pep`` package distribution. Updating the data file, run the script in ``src/scripts/apps120811_flux_recalc/mhd_lvalue.py``. ''' def __init__(self): # fname = resource_filename(__name__, os.path.join('data', 'ganymede_lvalue.dat.gz')) rc = irfpy.util.irfpyrc.Rc() fname = rc.get('pep', 'mhddata_ganymede_lvalue') f = gzip.open(fname) self._lonlist = pickle.load(f) self._latlist = pickle.load(f) self._lvalue = pickle.load(f)
[docs] def lonlist(self): ''' List of longitude. (N,) shape ''' return self._lonlist
[docs] def latlist(self): ''' List of latitude. (M,) shape ''' return self._latlist
[docs] def lvalue(self): ''' List of L-value. (N, M) shape ''' return np.ma.masked_less(self._lvalue, 0)
[docs] def opencloseboundaries(self): ''' Return the open-close boundaries ''' lonlist = self.lonlist() latlist = np.array(self.latlist()) lvalue = self.lvalue() # ma.array shape of (LON, LAT) lat_north = np.zeros_like(lonlist, dtype=np.int) lat_south = np.zeros_like(lonlist, dtype=np.int) for ilon in range(len(lonlist)): lon = lonlist[ilon] l = lvalue[ilon, :] closelat = np.where(l > 0)[0] lat_south[ilon] = closelat.min() lat_north[ilon] = closelat.max() return latlist[lat_north] + 0.5, latlist[lat_south] - 0.5
[docs]def t2vth(temperature_eV, mass=1.0, charge=1.0): r''' Convert the temperature to the thermal velocity. :param mass: Mass in the unit of amu. :param charge: Charge in the unit of electricity (1.6e-19) The definition of the thermal velocity is .. math:: v_\mathrm{th}^2 = \frac{kT}{m} Here temperature, T, is in Kelvins. If one converts the temperature to the unit of electron volts, one can get .. math:: v_\mathrm{th}^2 = \frac{eT'}{m} >>> print('%.2e' % t2vth(1)) # T=1eV for proton, vth=9.79 km/s (not 13.8 km/s!!) 9.79e+03 >>> print('%.2e' % t2vth(0.5, mass=16)) # T=0.5eV for oxygen, vth=1.73 km/s. 1.73e+03 ''' e = charge * 1.6e-19 m = mass * 1.67e-27 return np.sqrt(e * temperature_eV / m)
[docs]class PrecipitationFlux: ''' Precipitation flux calculated. Data files should be placed on the path pointed by [pep] mhddatapath. The file name should be ``MHD_PrecipitationFlux_YF_footprint`` and ``MHD_PrecipitationFlux_YF_nadir``. When contructing you can specify which you use. >>> flux_fp = PrecipitationFlux(type='footprint') >>> flux_na = PrecipitationFlux(type='nadir') ''' logger = logging.getLogger('PrecipitationFlux') logger.setLevel(logging.DEBUG) def __init__(self, type="footprint"): ''' Initialize the PrecipitationFlux :keyword type: "footprint" for the footprint mapping data, "nadir" for the nadir pointing data, ''' rc = irfpy.util.irfpyrc.Rc() self.datapath = rc.get('pep', 'mhddatapath') if self.datapath == None: raise IOError('No key of [pep] mhddatapath in irfpyrc files.') self.filename = os.path.join(self.datapath, "MHD_PrecipitationFlux_YF_%s.dat" % type) self.logger.debug('Filename=%s' % self.filename) self.dataarray = np.loadtxt(self.filename, skiprows=1)
[docs] def reshape(self, array): return array.reshape((361, 181))
[docs] def lonlist(self): ''' Return the longitude list, ranging from 0 to 360. ''' return self.dataarray[:, 0]
[docs] def latlist(self): return self.dataarray[:, 1]
[docs] def flist(self): return np.ma.masked_invalid(self.dataarray[:, 2])
[docs] def bin_indexes(self, lon, lat): r''' >>> f = PrecipitationFlux() >>> print(f.bin_indexes(30, 55)) (30, 145) >>> print(f.bin_indexes(-30, -90)) (330, 0) ''' lons = self.reshape(self.lonlist())[:, 0] lats = self.reshape(self.latlist())[0, :] while lon < 0: lon += 360 lon = lon % 360 ilon = bisect.bisect(lons, lon) - 1 ilat = bisect.bisect(lats, lat) - 1 if ilon < 0 or ilon >= len(lons) or ilat < 0 or ilat >= len(lats): raise IndexError('The given lon,lat is out of range') return (ilon, ilat)
[docs] def nearest_neighbor(self, lon, lat): ''' Return the nearest neighbor grid's index ''' lons = self.reshape(self.lonlist())[:, 0] lats = self.reshape(self.latlist())[0, :] while lon < 0: lon += 360 lon = lon % 360 ilon, ilat = self.bin_indexes(lon, lat) # Assume increasing longitude and latitude. if lon - lons[ilon] > lons[ilon + 1] - lon: ilon += 1 if lat != 90: if lat - lats[ilat] > lats[ilat + 1] - lat: ilat += 1 return (ilon, ilat)
[docs]class Flux1205: ''' Flux data provided from Jia in March 2012. >>> flux = Flux1205() The constructor will read the data from the data file. Data file should have the name as ``<pathname>/MHD_PrecipitationFlux_Jia.dat`` where ``<pathname>`` should be specified by ``.irfpyrc`` file via ``[pep]`` section with ``mhddatapath`` entry. The data is loaded as a numpy array internally. You can access the data via :meth:`xlist`, :meth:`ylist`, :meth:`zlist` for the coordinates of the data point and :meth:`flist` for the flux. The flux is in the unit of ``/cm2 /s``. Primitive way of accessing to data is using ``dataarray`` attribute. It has (20000, 6) shape. The 2nd loop is *X*, *Y*, *Z*, longitude, latitude and flux. The flux is in the unit of ``/cm2 /s``. ''' logger = logging.getLogger('Flux1205') logger.setLevel(logging.DEBUG) def __init__(self): rc = irfpy.util.irfpyrc.Rc() self.datapath = rc.get('pep', 'mhddatapath') if self.datapath == None: raise IOError('No key of [pep] mhddatapath in irfpyrc file') self.filename = os.path.join(self.datapath, 'MHD_PrecipitationFlux_Jia.dat') self.logger.debug('FIlename=%s' % self.filename) self.dataarray = np.loadtxt(self.filename, skiprows=1, delimiter=',') ''' Data array with (20000, 6) shape. '''
[docs] def reshape(self, array): ''' Reshape into 2D array. The obtained data is 20000 element flattened array. This could be reshaped into 2D array. The best way is to convert to (200, 100) shaped array. First argument is fixed longitude and the second is fixed latitude. This means that the >>> m = Flux1205() >>> lon2d = m.reshape(m.lonlist()) >>> (lon2d[0, :] == -180).all() True >>> lat2d = m.reshape(m.latlist()) >>> (lat2d[:, 0] == 90).all() True ''' return array.reshape((200, 100))
[docs] def xlist(self): ''' Return np.array of the x-coords. ''' return self.dataarray[:, 0]
[docs] def ylist(self): ''' Return np.array of the y-coords. ''' return self.dataarray[:, 1]
[docs] def zlist(self): ''' Return np.array of the z-coords. ''' return self.dataarray[:, 2]
[docs] def flist(self): ''' Return np.array of the flux. ''' return self.dataarray[:, 5]
[docs] def lonlist(self): ''' Return np.array of the longitude list in degrees. ''' return self.dataarray[:, 3]
[docs] def latlist(self): ''' Return np.array of the latitude list in degrees. ''' return self.dataarray[:, 4]
import unittest import doctest
[docs]def doctests(): return unittest.TestSuite(( doctest.DocTestSuite(), ))
if __name__ == '__main__': unittest.main(defaultTest='doctests')