Source code for ximpol.core.spline

#!/usr/bin/env python
#
# Copyright (C) 2015, the ximpol team.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.


"""Spline utility module, building on top of the scipy.interpolate modules.
"""


import numpy
from scipy.interpolate import UnivariateSpline, InterpolatedUnivariateSpline
from scipy.interpolate import RectBivariateSpline

from ximpol.utils.logging_ import logger, abort


[docs]def interpolate(xa, ya, xb, yb, x): """Simple two-point linear interpolation/extrapolation. """ return ya + (yb - ya)/(xb - xa)*(x - xa)
[docs]def optimize_grid_linear(x, y, tolerance=1e-4): """Optimize a pair of (x, y) arrays for the corresponding spline definition. This loops over the input arrays and removes unnecessary data points to minimize the length of the arrays necessary to the spline definition. Args ---- x : array The input x-array. y : array The input y-array. tolerance : float The maximum relative difference between the generic yi value and the\ estrapolation of the two previous optimized data points for the point\ i to be removed. """ assert(len(x) == len(y)) logger.info('Optimizing grid with %d starting points...' % len(x)) # Start a new series with the first two points of the input arrays. _x = [x[0], x[1]] _y = [y[0], y[1]] # Loop over the points 3 ... (N - 1). for i, (_xi, _yi) in enumerate(zip(x, y)[2:-1]): # Extrapolate the last two points of the new series to xi and # see how far we are from the actual yi. delta = interpolate(_x[-2], _y[-2], _x[-1], _y[-1], _xi) - _yi if abs(delta/_yi) > tolerance: # If the difference is larger than the tolerance, add a point. # (This has the drawback that we tend to add pairs of point at # each change of slope.) _x.append(_xi) _y.append(_yi) # Interpolate the points last and (last - 2) to (last - 1). delta = interpolate(_x[-3], _y[-3], _x[-1], _y[-1], _x[-2]) - _y[-2] if abs(delta/_y[-2]) < tolerance: # If the penultimate point was not necessary, remove it. _x.remove(_x[-2]) _y.remove(_y[-2]) # Append the last point of the original array to the list. _x.append(x[-1]) _y.append(y[-1]) _x, _y = numpy.array(_x), numpy.array(_y) logger.info('Done, %d points remaining.' % len(_x)) return _x, _y
[docs]class xUnivariateSplineBase: """Base class for all the univariate spline classes. The basic idea is to keep track of the original arrays passed to the interpolator and to support arithmetic operations. We also allow the user to supply optional arguments to control the ranges and specify names and units for the quantities involved. Args ---- x : array Input x values. y : array Input y values. xname: str, optional The name of the quantity on the x-axis. xunits: str, optional The units for the x-axis. yname: str, optional The name of the quantity on the y-axis. yunits: str, optional The units for the y-axis. Note ---- This is a do-nothing class to be subclassed and not instantiated directly. """ def __init__(self, x, y, xname=None, xunits=None, yname=None, yunits=None): """Constructor. """ # Make sure the input vectors have the same lengths. assert len(x) == len(y) # numpy.unique is returning a sorted copy of the unique values of the # x arrays, so this is effectively sorting x. self.x, _index = numpy.unique(x, return_index=True) # If some of the values were not unique, give up. assert len(self.x) == len(x) # Need to grab the y in the right order. self.y = y[_index] self.xname = xname self.xunits = xunits self.yname = yname self.yunits = yunits
[docs] def dist(self, x, y): """ """ return abs((self(x) - y)/y)
[docs] def xmin(self): """Return the minimum of the underlying x-array. """ return self.x[0]
[docs] def xmax(self): """Return the maximum of the underlying x-array. """ return self.x[-1]
def __mul__(self, other): """Overloaded multiplication operator. """ assert(self.__class__.__name__ == other.__class__.__name__) _x = numpy.union1d(self.x, other.x) _y = self(_x)*other(_x) return self.__class__(_x, _y) def __div__(self, other): """Overloaded division operator. """ assert(self.__class__.__name__ == other.__class__.__name__) _x = numpy.union1d(self.x, other.x) _x = _x[other(_x) != 0] _y = self(_x)/other(_x) return self.__class__(_x, _y) def __add__(self, other): """Overloaded sum operator. """ assert(self.__class__.__name__ == other.__class__.__name__) _x = numpy.union1d(self.x, other.x) _y = self(_x) + other(_x) return self.__class__(_x, _y) def __sub__(self, other): """Overloaded sum operator. """ assert(self.__class__.__name__ == other.__class__.__name__) _x = numpy.union1d(self.x, other.x) _y = self(_x) - other(_x) return self.__class__(_x, _y) def __len__(self): """Return the lenght of the arrays used to construct the spline. """ return len(self.x)
[docs] def scale(self, scale, yname=None, yunits=None): """Scale the spline y values. Warning ------- Need to set the names and units properly---the main issue is ths for derived classes the names of the corresponding members can be different. """ _x = numpy.copy(self.x) _y = numpy.copy(self.y)*scale if yname is None: yname = self.yname if yunits is None: yunits = self.yunits fmt = dict(xname=self.xname, xunits=self.xunits, yname=yname, yunits=yunits) return self.__class__(_x, _y, **fmt)
@classmethod
[docs] def label(cls, name, units=None): """Compose an axis label given a name and some units. """ if units is None or units == '': return name else: return '%s [%s]' % (name, units)
[docs] def xlabel(self): """Return the x-label for a plot. """ return self.label(self.xname, self.xunits)
[docs] def ylabel(self): """Return the y-label for a plot. """ return self.label(self.yname, self.yunits)
[docs] def norm(self): """Return the integral over the entire spline domain. """ return self.integral(self.xmin(), self.xmax())
[docs] def build_cdf(self): """Create the cumulative distribution function. Note that the cdf is built using a linear interpolated spline, no matter what the class of the original spline is. """ _x = self.x _y = numpy.array([self.integral(_x[0], _xp) for _xp in _x])/self.norm() return xInterpolatedUnivariateSplineLinear(_x, _y)
[docs] def build_ppf(self): """Create the percent point function (or inverse of the cdf). Note that the cdf is built using a linear interpolated spline, no matter what the class of the original spline is. """ _y = self.x _x = numpy.array([self.integral(_y[0], _xp) for _xp in _y]) _x, _mask = numpy.unique(_x, return_index=True) _x/= self.norm() _y = _y[_mask] fmt = dict(xname='Normalized integral', yname=self.xname, yunits=self.xunits) return xInterpolatedUnivariateSplineLinear(_x, _y, **fmt)
[docs] def plot(self, num_points=1000, overlay=False, logx=False, logy=False, scale=1., offset=0., show=True, **kwargs): """Plot the spline. Args ---- num_points : int, optional The number of sampling points to be used to draw the spline. overlay : bool, optional If True, the original arrays passed to the spline are overlaid. show : bool, optional If True, `plt.show()` is called at the end, interrupting the flow. """ from ximpol.utils.matplotlib_ import pyplot as plt if not logx: _x = numpy.linspace(self.xmin(), self.xmax(), num_points) else: _x = numpy.logspace(numpy.log10(self.xmin()), numpy.log10(self.xmax()), num_points) _y = scale*self(_x) + offset if overlay: plt.plot(_x, _y, '-', self.x, self.y, 'o', **kwargs) else: plt.plot(_x, _y, '-', **kwargs) if self.xname is not None: plt.xlabel(self.xlabel()) if self.yname is not None: plt.ylabel(self.ylabel()) if logx: plt.gca().set_xscale('log') if logy: plt.gca().set_yscale('log') if show: plt.show()
[docs]class xUnivariateSpline(xUnivariateSplineBase, UnivariateSpline): """Light-weight wrapper over the scipy `UnivariateSpline <http://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.UnivariateSpline.html>`_ class (see the corresponding documentation for the meaning of the parameters passed to the constructor). Note ---- Note that the interface to the base class has changed from numpy 0.14. An `ext` argument can be passed to the constructor starting with scipy 0.15 to control the extrapolation behavior and a `check_finite` argument is available in 0.16 to avoid `nans` in the input data. We currently do not use either one. """ def __init__(self, x, y, w=None, bbox=[None, None], k=3, s=None, xname=None, xunits=None, yname=None, yunits=None): """Constructor. """ xUnivariateSplineBase.__init__(self, x, y, xname, xunits, yname, yunits) UnivariateSpline.__init__(self, self.x, self.y, w, bbox, k, s)
[docs]class xInterpolatedUnivariateSpline(xUnivariateSplineBase, InterpolatedUnivariateSpline): """Light-weight wrapper over the scipy `InterpolatedUnivariateSpline <http://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.InterpolatedUnivariateSpline.html>`_ class (see the corresponding documentation for the meaning of the parameters passed to the constructor). Note ---- Note that the interface to the base class has changed from numpy 0.14. An `ext` argument can be passed to the constructor starting with scipy 0.15 to control the extrapolation behavior and a `check_finite` argument is available in 0.16 to avoid `nans` in the input data. We currently do not use either one. """ def __init__(self, x, y, w=None, bbox=[None, None], k=3, xname=None, xunits=None, yname=None, yunits=None): """Constructor. """ xUnivariateSplineBase.__init__(self, x, y, xname, xunits, yname, yunits) InterpolatedUnivariateSpline.__init__(self, self.x, self.y, w, bbox, k)
[docs]class xInterpolatedUnivariateSplineLinear(xInterpolatedUnivariateSpline): """xInterpolatedUnivariateSplineLinear subclass implementing the simplest possible linear interpolator. This particular class is offering the facility to optimize the input arrays via the `optimize` argument. Args ---- x : array Input x values (assumed to be sorted). y : array Input y values. xname: str, optional The name of the quantity on the x-axis. xunits: str, optional The units for the x-axis. yname: str, optional The name of the quantity on the y-axis. yunits: str, optional The units for the y-axis. optimize : bool If `True`, the input arrays are optimized via the\ `optimize_grid_linear()` function. tolerance : float The tolerance for the input array optimization. (If `optimize` is\ `False`, this has no effect.) Example ------- >>> from ximpol.core.spline import xInterpolatedUnivariateSplineLinear >>> >>> x = numpy.linspace(0, 2*numpy.pi, 100) >>> y = numpy.sin(x) >>> s = xInterpolatedUnivariateSplineLinear(x, y, xname='x', xunits='au') >>> s.plot() """ def __init__(self, x, y, xname=None, xunits=None, yname=None, yunits=None, optimize=False, tolerance=1e-4): """ Constructor. """ if optimize: oldx, oldy = x, y x, y = optimize_grid_linear(x, y, tolerance) xInterpolatedUnivariateSpline.__init__(self, x, y, None, [None, None], 1, xname, xunits, yname, yunits) if optimize: dist = self.dist(oldx, oldy) logger.info('Relative (max/ave) dist. to original array: %e/%e' %\ (dist.max(), dist.sum()/len(dist)))
[docs]class xInterpolatedUnivariateLogSpline(xUnivariateSplineBase, UnivariateSpline): """Poor man's attempt at a spline in logarithmic space. The basic interpolation is trivial, as we simply take the logarithms of the x and y values in the constructor and overload the __call__ method to raise 10 to the power of the interpolated value. The integral is more tricky, as obviously the implementation in the base class operates in logarirthmic space and the result is just nonsense. To this end we proceed by brute force and create a second spline, this time in linear space, with a relatively large number of points to guarantee the accuracy of the integral. Warning ------- This is not supporting the calculation of the derivatives or roots in any sensible way, so just refrain from calling any other methods than the overloaded ones, i.e., __call__() and integral(). In the future it might make sense to overload all the other methods and simply abort the executions whenever any of them is called. """ def __init__(self, x, y, w=None, bbox=[None, None], k=3, xname=None, xunits=None, yname=None, yunits=None): """Constructor. """ xUnivariateSplineBase.__init__(self, x, y, xname, xunits, yname, yunits) _x = numpy.log10(x) _y = numpy.log10(y) UnivariateSpline.__init__(self, _x, _y, w, bbox, k, s=None) self.__integral_spline = None def __call__(self, x): """Overloaded call method. """ return numpy.power(10., UnivariateSpline.__call__(self, numpy.log10(x))) def __build_integral_spline(self, num_points=1000): """Build the underlying univariate linear interpolated spline to be used to evaluate integrals. Note that we try to save something in terms of speed by optimizing the number of points, here. """ _x = numpy.logspace(numpy.log10(self.xmin()), numpy.log10(self.xmax()), num_points) _y = self(_x) fmt = dict(xname=self.xname, xunits=self.xunits, yname=self.yname, yunits=self.yunits) return xInterpolatedUnivariateSplineLinear(_x, _y, optimize=True, **fmt)
[docs] def integral(self, x1, x2): """Overloaded integral method. The integral spline is calculated and cached the first time this method is called. """ if self.__integral_spline is None: self.__integral_spline = self.__build_integral_spline() return self.__integral_spline.integral(x1, x2)
[docs]class xInterpolatedUnivariateLogSplineLinear(xInterpolatedUnivariateLogSpline): """Subclass of xInterpolatedUnivariateLogSpline with k=1. """ def __init__(self, x, y, xname=None, xunits=None, yname=None, yunits=None): """Constructor. """ xInterpolatedUnivariateLogSpline.__init__(self, x, y, None, [None, None], 1, xname, xunits, yname, yunits)
[docs]class xBivariateSplineBase: """Base class for all the bivariate spline classes. This is somewhat similar in spirit to the corresponding univariate base class, except that the additional functionalities are, for the moment, limited to bookkeeping and plotting facilities. Args ---- x : array Input x values (assumed to be sorted). y : array Input y values (assumed to be sorted). z : array Input z values, with shape (x.size,y.size). xname: str, optional The name of the quantity on the x-axis. xunits: str, optional The units for the x-axis. yname: str, optional The name of the quantity on the y-axis. yunits: str, optional The units for the y-axis. zname: str, optional The name of the quantity on the z-axis. zunits: str, optional The units for the z-axis. Note ---- This is a do-nothing class to be subclassed and not instantiated directly. """ def __init__(self, x, y, z, xname=None, xunits=None, yname=None, yunits=None, zname=None, zunits=None): """Constructor. """ self.x = x self.y = y self.z = z self.xname = xname self.xunits = xunits self.yname = yname self.yunits = yunits self.zname = zname self.zunits = zunits
[docs] def xmin(self): """Return the minimum of the underlying x-array. """ return self.x[0]
[docs] def xmax(self): """Return the maximum of the underlying x-array. """ return self.x[-1]
[docs] def ymin(self): """Return the minimum of the underlying y-array. """ return self.y[0]
[docs] def ymax(self): """Return the maximum of the underlying y-array. """ return self.y[-1]
[docs] def xlabel(self): """Return the x-label for a plot. """ return xUnivariateSplineBase.label(self.xname, self.xunits)
[docs] def ylabel(self): """Return the y-label for a plot. """ return xUnivariateSplineBase.label(self.yname, self.yunits)
[docs] def zlabel(self): """Return the z-label for a plot. """ return xUnivariateSplineBase.label(self.zname, self.zunits)
[docs] def scale(self, scale_factor): """ """ return self.__class__(self.x.copy(), self.y.copy(), self.z*scale_factor)
[docs]class xInterpolatedBivariateSpline(xBivariateSplineBase, RectBivariateSpline): """Bivariate interpolated spline on a rectangular grid. """ def __init__(self, x, y, z, kx=1, ky=1, xname=None, xunits=None, yname=None, yunits=None, zname=None, zunits=None): """Constructor. """ if hasattr(z, '__call__'): _x, _y = numpy.meshgrid(y, x) z = z(_x, _y) xBivariateSplineBase.__init__(self, x, y, z, xname, xunits, yname, yunits, zname, zunits) RectBivariateSpline.__init__(self, x, y, z, bbox=[None, None, None, None], kx=kx, ky=ky, s=0) def __call__(self, x, y, dx=0, dy=0, grid=False): """Overloaded __call__method. Here we basically override the default value of the `grid` parameter from `True` to `False`, since we're typically interested in evaluating the splined at given physical coordinates, rather than grid points. """ return RectBivariateSpline.__call__(self, x, y, None, dx, dy, grid)
[docs] def vslice(self, x): """Return a vertical slice at a given x of the bivariate spline. Args ---- x : float The x value at which the vertical slice should be calculated. """ _x = self.y _y = self(x, _x) fmt = dict(xname=self.yname, xunits=self.yunits, yname=self.zname, yunits=self.zunits) return xInterpolatedUnivariateSplineLinear(_x, _y, **fmt)
[docs] def hslice(self, y): """Return an horizontal slice at a given y of the bivariate spline. Args ---- y : float The y value at which the horizontal slice should be calculated. """ _x = self.x _y = self(_x, y) fmt = dict(xname=self.xname, xunits=self.xunits, yname=self.zname, yunits=self.zunits) return xInterpolatedUnivariateSplineLinear(_x, _y, **fmt)
[docs] def build_vppf(self): """Create the vertical percent point function (or inverse of cdf). Warning ------- This really, really need to be fixed. Instead of grabbing a vertical slice at xmean, we should pass an argument to the function so that the subclasses can implement whatever is right for them. """ _xmean = 0.5*(self.xmin() + self.xmax()) _refppf = self.vslice(_xmean).build_ppf() _x = self.x.copy() _y = _refppf.x _z = numpy.zeros(shape = (_x.size, _y.size)) for i, _xp in enumerate(_x): _ppf = self.vslice(_xp).build_ppf() for j, _yp in enumerate(_y): _z[i, j] = _ppf(_yp) fmt = dict(yname='Normalized integral', xname=self.xname, xunits=self.xunits, zname=self.yname, zunits=self.yunits) return xInterpolatedBivariateSplineLinear(_x, _y, _z, **fmt)
[docs] def plot(self, num_pointsx=100, num_pointsy=100, num_contours=75, logz=False, show=True): """Plot the spline. Args ---- num_pointsx : int The number of x sampling points to be used to draw the spline. num_pointsy : int The number of y sampling points to be used to draw the spline. num_contours : int The number of contours for the color plot. show : bool, optional If True, `plt.show()` is called at the end, interrupting the flow. """ from ximpol.utils.matplotlib_ import pyplot as plt _x = numpy.linspace(self.xmin(), self.xmax(), num_pointsx) _y = numpy.linspace(self.ymin(), self.ymax(), num_pointsy) _x, _y = numpy.meshgrid(_x, _y) _z = self(_x, _y, grid=False) if logz: from matplotlib.colors import LogNorm _levels = numpy.logspace(-4, numpy.log10(_z.max()), num_contours) contour = plt.contourf(_x, _y, _z, levels=_levels, norm = LogNorm()) else: contour = plt.contourf(_x, _y, _z, num_contours) bar = plt.colorbar() if self.xname is not None: plt.xlabel(self.xlabel()) if self.yname is not None: plt.ylabel(self.ylabel()) if self.zname is not None: bar.set_label(self.zlabel()) if show: plt.show()
[docs]class xInterpolatedBivariateSplineLinear(xInterpolatedBivariateSpline): """Bivariate linear interpolated spline on a rectangular grid. """ def __init__(self, x, y, z, xname=None, xunits=None, yname=None, yunits=None, zname=None, zunits=None): xInterpolatedBivariateSpline.__init__(self, x, y, z, 1, 1, xname, xunits, yname, yunits, zname, zunits)
def main(): x = numpy.linspace(0, 2*numpy.pi, 100) y = numpy.sin(x) s = xInterpolatedUnivariateSplineLinear(x, y, 'x', 'au', 'y') s.plot() if __name__ == '__main__': main()