使用 Python 进行性能计算的初学者指南

日期2015-10-30(最后修改),2006-03-23(创建)

比较 weave 与 NumPy、Pyrex、Psyco、Fortran(77 和 90)以及 C++ 用于求解拉普拉斯方程。本文最初由 Prabhu Ramachandran 撰写。

laplace.py 是下面讨论的完整 Python 代码。源代码包(perfpy_2.tgz)除了 Fortran 代码、纯 C++ 代码、Pyrex 源代码和一个 setup.py 脚本用于构建 f2py 和 Pyrex 模块。

简介

这是一份使用 Python 进行性能计算的简单入门文档。我们将使用 NumPy、SciPy 的 weave(使用 weave.blitz 和 weave.inline)以及 Pyrex。我们还将展示如何使用 f2py 包装 Fortran 子例程并在 Python 中调用它。

我们还将利用这个机会对在 Python 中解决特定数值问题的各种方法进行基准测试,并将它们与 C++ 中的算法实现进行比较。

问题描述

我们将考虑的示例是使用迭代有限差分方案(四点平均、高斯-赛德尔或高斯-约旦)求解二维拉普拉斯方程的非常简单(即微不足道)的情况。问题的正式规范如下。我们需要求解某个未知函数 u(x,y),使得 ∇2u = 0,并指定边界条件。为了方便起见,感兴趣的域被认为是矩形,并且给出了该矩形边上的边界值。

可以证明,这个问题可以使用简单的四点平均方案求解,如下所示。将域离散化为 (nx x ny) 个点的网格。然后,函数 u 可以表示为二维数组 - u(nx, ny)。矩形边上的 u 值是给定的。可以通过以下方式迭代求解。

    for i in range(1, nx-1):
        for j in range(1, ny-1):
            u[i,j] = ((u[i-1, j] + u[i+1, j])*dy**2 +
                      (u[i, j-1] + u[i, j+1])*dx**2)/(2.0*(dx**2 + dy**2))


其中 dx 和 dy 是离散域沿 x 轴和 y 轴的长度。

数值解

在纯 Python 中实现一个求解器非常简单。使用一个简单的 NumPy 数组来存储解矩阵 u。以下代码演示了一个简单的求解器。

   import numpy

   class Grid:
       """A simple grid class that stores the details and solution of the
       computational grid."""
       def __init__(self, nx=10, ny=10, xmin=0.0, xmax=1.0,
                    ymin=0.0, ymax=1.0):
           self.xmin, self.xmax, self.ymin, self.ymax = xmin, xmax, ymin, ymax
           self.dx = float(xmax-xmin)/(nx-1)
           self.dy = float(ymax-ymin)/(ny-1)
           self.u = numpy.zeros((nx, ny), 'd')
           # used to compute the change in solution in some of the methods.
           self.old_u = self.u.copy()

       def setBCFunc(self, func):
           """Sets the BC given a function of two variables."""
           xmin, ymin = self.xmin, self.ymin
           xmax, ymax = self.xmax, self.ymax
           x = numpy.arange(xmin, xmax + self.dx*0.5, self.dx)
           y = numpy.arange(ymin, ymax + self.dy*0.5, self.dy)
           self.u[0 ,:] = func(xmin,y)
           self.u[-1,:] = func(xmax,y)
           self.u[:, 0] = func(x,ymin)
           self.u[:,-1] = func(x,ymax)

       def computeError(self):
           """Computes absolute error using an L2 norm for the solution.
           This requires that self.u and self.old_u must be appropriately
           setup."""
           v = (self.u - self.old_u).flat
           return numpy.sqrt(numpy.dot(v,v))


   class LaplaceSolver:
       """A simple Laplacian solver that can use different schemes to
       solve the problem."""
       def __init__(self, grid, stepper='numeric'):
           self.grid = grid
           self.setTimeStepper(stepper)

       def slowTimeStep(self, dt=0.0):
           """Takes a time step using straight forward Python loops."""
           g = self.grid
           nx, ny = g.u.shape
           dx2, dy2 = g.dx**2, g.dy**2
           dnr_inv = 0.5/(dx2 + dy2)
           u = g.u

           err = 0.0
           for i in range(1, nx-1):
               for j in range(1, ny-1):
                   tmp = u[i,j]
                   u[i,j] = ((u[i-1, j] + u[i+1, j])*dy2 +
                            (u[i, j-1] + u[i, j+1])*dx2)*dnr_inv
                   diff = u[i,j] - tmp
                   err += diff*diff

           return numpy.sqrt(err)

       def setTimeStepper(self, stepper='numeric'):
           """Sets the time step scheme to be used while solving given a
           string which should be one of ['slow', 'numeric', 'blitz',
           'inline', 'fastinline', 'fortran']."""
            if stepper == 'slow':
              self.timeStep = self.slowTimeStep
            # ...
            else:
               self.timeStep = self.numericTimeStep

       def solve(self, n_iter=0, eps=1.0e-16):
           err = self.timeStep()
           count = 1

           while err > eps:
               if n_iter and count >= n_iter:
                   return err
               err = self.timeStep()
               count = count + 1

           return count


这段代码非常简单,很容易编写,但如果我们对任何较大的问题(比如 500 x 500 的网格点)运行它,我们会发现它需要很长时间才能运行。在这种情况下,CPU 的瓶颈是 slowTimeStep 方法。在下一节中,我们将使用 NumPy 来加速它。

使用 NumPy

事实证明,LaplaceSolver.slowTimeStep 方法的最内层循环可以用一个更简单的 NumPy 表达式来表示。这里是一个重写的 timeStep 方法。

       def numericTimeStep(self, dt=0.0):
           """Takes a time step using a NumPy expression."""
           g = self.grid
           dx2, dy2 = g.dx**2, g.dy**2
           dnr_inv = 0.5/(dx2 + dy2)
           u = g.u
           g.old_u = u.copy() # needed to compute the error.

           # The actual iteration
           u[1:-1, 1:-1] = ((u[0:-2, 1:-1] + u[2:, 1:-1])*dy2 +
                            (u[1:-1,0:-2] + u[1:-1, 2:])*dx2)*dnr_inv

           return g.computeError()

整个 for i 和 j 循环都被一个单独的 NumPy 表达式替换了。NumPy 表达式按元素进行操作,因此上面的表达式有效。它基本上计算了四点平均值。如果你已经学习了 NumPy 教程并使用过 NumPy,你应该能够理解它是如何工作的。这个表达式的妙处在于它完全在 C 中完成。这使得计算速度快得多。为了快速比较,以下是一些关于 500x500 网格上单次迭代的数字。在 PIII 450Mhz 和 192 MB RAM 上,上面的代码大约需要 0.3 秒,而之前的代码大约需要 15 秒。这接近 50 倍的速度提升。你还会注意到一些事情。

  1. 我们不能像之前在 for 循环中那样计算误差。我们需要复制数据,然后使用 computeError 函数来完成。这会消耗内存,而且不太美观。这当然是一个限制,但值得 50 倍的速度提升。
  2. 这个表达式将使用临时变量。因此,在一次迭代中,在已经计算过的位置计算的值不会在迭代过程中使用。例如,在原始的 for 循环中,一旦计算出 u[1,1] 的值,u[1,2] 的下一个值将使用新计算的 u[1,1] 而不是旧的值。但是,由于 NumPy 表达式在内部使用临时变量,因此只会使用 u[1,1] 的旧值。在本例中这不是一个严重的问题,因为已知即使发生这种情况,算法也会收敛(但时间会增加一倍,这会将收益降低一半,但仍然让我们获得了 25 倍的提升)。

除了这两个问题,很明显使用 NumPy 可以极大地提高速度。现在我们将使用神奇的 weave 包来进一步加速它。

使用 weave.blitz

如果我们使用 weave.blitz,NumPy 表达式可以加速很多。以下是新的函数。

   # import necessary modules and functions
   from scipy import weave
   # ...
       def blitzTimeStep(self, dt=0.0):
           """Takes a time step using a NumPy expression that has been
           blitzed using weave."""
           g = self.grid
           dx2, dy2 = g.dx**2, g.dy**2
           dnr_inv = 0.5/(dx2 + dy2)
           u = g.u
           g.old_u = u.copy()

           # The actual iteration
           expr = "u[1:-1, 1:-1] = ((u[0:-2, 1:-1] + u[2:, 1:-1])*dy2 + "\
                  "(u[1:-1,0:-2] + u[1:-1, 2:])*dx2)*dnr_inv"
           weave.blitz(expr, check_size=0)

           return g.computeError()


如果你注意到,唯一改变的是我们用引号括起原始的数字表达式,并将这个字符串称为 'expr',然后调用 weave.blitz。当 'check_size' 关键字设置为 1 时,它会进行一些健全性检查,在调试代码时使用。但是,为了纯粹的速度,最好将其设置为 0。这次当我们为 500x500 数组的单次迭代计时代码时,它只花了大约 0.1 秒,这大约是三倍的增长!这里还有几点需要注意。

  1. 第一次调用此方法时,它将花费很长时间在幕后进行一些操作。下次调用它时,它将立即运行。有关此的更多详细信息,请参阅 weave 文档。基本上,weave.blitz 将 NumPy 表达式转换为 C++ 代码,并使用 blitz++ 进行数组表达式,构建一个 Python 模块,将其存储在特殊位置,并在下次调用函数时调用它。
  2. 同样,我们需要使用一个临时数组来计算误差。
  3. blitz 使用临时变量进行计算,因此其行为更像原始的(慢速)for 循环,即计算的值会立即被重新使用。

除了这些要点之外,结果与原始的 for 循环相同。它比原始代码快大约 170 倍!现在我们将看看另一种加速原始 for 循环的方法。进入 weave.inline!

使用 weave.inline

Inline 允许将 C 或 C++ 代码直接嵌入到 Python 代码中。以下是在代码中嵌入的简单版本。

   from scipy.weave import converters
   # ...
       def inlineTimeStep(self, dt=0.0):
           """Takes a time step using inlined C code -- this version uses
           blitz arrays."""
           g = self.grid
           nx, ny = g.u.shape
           dx2, dy2 = g.dx**2, g.dy**2
           dnr_inv = 0.5/(dx2 + dy2)
           u = g.u

           code = """
                  #line 120 "laplace.py" (This is only useful for debugging)
                  double tmp, err, diff;
                  err = 0.0;
                  for (int i=1; i<nx-1; ++i) {
                      for (int j=1; j<ny-1; ++j) {
                          tmp = u(i,j);
                          u(i,j) = ((u(i-1,j) + u(i+1,j))*dy2 +
                                    (u(i,j-1) + u(i,j+1))*dx2)*dnr_inv;
                          diff = u(i,j) - tmp;
                          err += diff*diff;
                      }
                 }
                  return_val = sqrt(err);
                  """
           # compiler keyword only needed on windows with MSVC installed
           err = weave.inline(code,
                              ['u', 'dx2', 'dy2', 'dnr_inv', 'nx', 'ny'],
                              type_converters=converters.blitz,
                              compiler = 'gcc')
           return err


代码本身看起来非常直观(这也是内联如此酷的原因)。内联函数调用的参数都是自解释的。带有 '#line 120 ...' 的行仅用于调试,不会以任何方式影响速度。同样,第一次运行此函数时,它需要很长时间才能在幕后完成一些操作,而下一次运行时,它就会飞速完成。这次要注意,我们在循环内部有更大的灵活性,并且可以轻松地计算误差项,而无需使用临时数组。对该版本的计时结果表明,对于 500x500 数组,每次迭代仅需 0.04 秒!这对应于比普通 for 循环快 375 倍的速度提升。请记住,我们没有牺牲 Python 的任何令人难以置信的灵活性!此循环包含看起来非常漂亮的代码,但如果需要,我们可以通过编写一些脏代码来进一步提高速度。我们在这里不会深入讨论,但足以说明,通过使用不同的方法可以获得两倍的速度提升。此代码基本上对 NumPy 数组数据执行指针运算,而不是使用 blitz++ 数组。此代码由 Eric Jones 贡献。本文附带的源代码包含此代码。

接下来,我们看看如何使用 f2py 在 Fortran 中轻松实现循环并从 Python 中调用它。

使用 f2py

f2py 是一款很棒的工具,它允许您轻松地从 Python 中调用 Fortran 函数。首先,我们将编写一个小型 Fortran77 子例程来执行我们的计算。以下是代码。

c File flaplace.f
      subroutine timestep(u,n,m,dx,dy,error)
      double precision u(n,m)
      double precision dx,dy,dx2,dy2,dnr_inv,tmp,diff
      integer n,m,i,j
cf2py intent(in) :: dx,dy
cf2py intent(in,out) :: u
cf2py intent(out) :: error
cf2py intent(hide) :: n,m
      dx2 = dx*dx
      dy2 = dy*dy
      dnr_inv = 0.5d0 / (dx2+dy2)
      error = 0
      do 200,j=2,m-1
         do 100,i=2,n-1
            tmp = u(i,j)
            u(i,j) = ((u(i-1,j) + u(i+1,j))*dy2+
     &           (u(i,j-1) + u(i,j+1))*dx2)*dnr_inv
            diff = u(i,j) - tmp
            error = error + diff*diff
 100     continue
 200  continue
      error = sqrt(error)
      end

以 cf2py 开头的行是特殊的 f2py 指令,在 f2py 中有文档记录。对于那些了解一些 Fortran 的人来说,其余代码非常直观。我们使用以下命令为其轻松创建 Python 模块。

      % f2py -c flaplace.f -m flaplace

以下是 Python 方面的代码。

    import flaplace

        def fortranTimeStep(self, dt=0.0):
            """Takes a time step using a simple fortran module that
            implements the loop in Fortran.  """
            g = self.grid
            g.u, err = flaplace.timestep(g.u, g.dx, g.dy)
            return err


就是这样!希望有一天 scipy.weave 能够让我们在内联中执行此操作,而无需我们编写单独的 Fortran 文件。Fortran 代码和 f2py 示例由 f2py 的作者 Pearu Peterson 贡献。无论如何,使用此模块,对于 500x500 网格,每次迭代大约需要 0.029 秒!这比原始代码快了大约 500 倍。

f2py 还可以与更现代的 Fortran 版本一起使用。这很有用,因为 Fortran90 具有特殊的数组功能,允许更紧凑、更美观和更快的代码。以下是 Fortran90 中的相同子例程

! File flaplace90_arrays.f90
subroutine timestep(u,n,m,dx,dy,error)
implicit none
!Pythonic array indices, from 0 to n-1.
real (kind=8), dimension(0:n-1,0:m-1), intent(inout):: u
real (kind=8), intent(in) :: dx,dy
real (kind=8), intent(out) :: error
integer, intent(in) :: n,m
real (kind=8), dimension(0:n-1,0:m-1) :: diff
real (kind=8) :: dx2,dy2,dnr_inv
!f2py intent(in) :: dx,dy
!f2py intent(in,out) :: u
!f2py intent(out) :: error
!f2py intent(hide) :: n,m
dx2 = dx*dx
dy2 = dy*dy
dnr_inv = 0.5d0 / (dx2+dy2)
diff=u
u(1:n-2, 1:m-2) = ((u(0:n-3, 1:m-2) + u(2:n-1, 1:m-2))*dy2 + &
                         (u(1:n-2,0:m-3) + u(1:n-2, 2:m-1))*dx2)*dnr_inv
error=sqrt(sum((u-diff)**2))
end subroutine

请注意,操作是在一行中执行的,非常类似于 Numpy。编译步骤完全相同。在 1000x1000 网格中,此代码比 fortran77 循环快 2.2 倍。

源代码包(perfpy_2.tgz)还包含一个使用 forall 结构的 Fortran95 子例程,其性能非常相似。

使用 Pyrex/Cython

我们还使用 Pyrex 中的代码实现了 timeStep 函数,该代码来自快速内联版本。Pyrex 源代码比 weave、blitz 或 Fortran 代码略长,因为我们必须公开 NumPy 数组结构。基本函数如下所示。

Travis Oliphant 写了一个 Cython 版本 的示例。

   def pyrexTimeStep(ndarray u, double dx, double dy):
       if chr(u.descr.type) <> "d":
           raise TypeError("Double array required")
       if u.nd <> 2:
           raise ValueError("2 dimensional array required")
       cdef int nx, ny
       cdef double dx2, dy2, dnr_inv, err
       cdef double *elem

       nx = u.dimensions[0]
       ny = u.dimensions[1]
       dx2, dy2 = dx**2, dy**2
       dnr_inv = 0.5/(dx2 + dy2)
       elem = u.data

       err = 0.0
       cdef int i, j
       cdef double *uc, *uu, *ud, *ul, *ur
       cdef double diff, tmp
       for i from 1 <= i < nx-1:
           uc = elem + i*ny + 1
           ur = elem + i*ny + 2
           ul = elem + i*ny
           uu = elem + (i+1)*ny + 1
           ud = elem + (i-1)*ny + 1

           for j from 1 <= j < ny-1:
               tmp = uc[0]
               uc[0] = ((ul[0] + ur[0])*dy2 +
                        (uu[0] + ud[0])*dx2)*dnr_inv
               diff = uc[0] - tmp
               err = err + diff*diff
               uc = uc + 1; ur = ur + 1;  ul = ul + 1
               uu = uu + 1; ud = ud + 1

       return sqrt(err)


该函数看起来很长,但写起来并不难。也可以在不进行指针运算的情况下编写,方法是提供方便的函数来访问数组。但是,上面显示的代码速度很快。本文提供的源代码包含完整的 Pyrex 文件以及用于构建它的 setup.py 脚本。对该版本进行计时,我们发现该版本与快速内联版本一样快,仅需 0.025 秒。

使用 Matlab 和 Octave

我们在 Matlab 和 Octave 中实现了 Numeric 版本(laplace.m),并在不同的计算机上运行测试(因此下表中的“估计”值)。我们发现 Matlab 中没有获得明显的加速,而 Octave 的运行速度比 NumPy 慢两倍。详细的图表可以在这里找到 这里

C++ 中的实现

最后,为了比较,我们在简单的 C++ 中实现了这一点(没有花哨的东西),没有任何 Python。人们会期望 C++ 代码更快,但令人惊讶的是,并没有快多少!考虑到用 Python 开发非常容易,这种速度降低并不重要。

最终比较

以下是 500x500 网格在 100 次迭代中的计时结果。请注意,我们还比较了使用慢速 Python 版本以及 Psyco 的结果。

解决方案类型 耗时(秒)
Python(估计) 1500.0
Python + Psyco(估计) 1138.0
Python + NumPy 表达式 29.3
Blitz 9.5
内联 4.3
快速内联 2.3
Python/Fortran 2.9
Pyrex 2.5
Matlab(估计) 29.0
Octave(估计) 60.0
纯 C++ 2.16

考虑到 Python 的灵活性和强大功能,这真是太棒了。

从此处下载本指南的源代码:perfpy_2.tgz

查看示例的完整 Python 代码:laplace.py

查看示例的完整 Matlab/Octave 代码:laplace.m

章节作者:Unknown[6], Unknown[4], DavidLinke, Unknown[156], Unknown[157], MartinSpacek, Unknown[158], Pauli Virtanen

附件