使用 Python 进行性能计算的初学者指南¶
日期 | 2015-10-30(最后修改),2006-03-23(创建) |
---|
比较 weave 与 NumPy、Pyrex、Psyco、Fortran(77 和 90)以及 C++ 用于求解拉普拉斯方程。本文最初由 Prabhu Ramachandran 撰写。
laplace.py 是下面讨论的完整 Python 代码。源代码包(perfpy_2.tgz)除了 Fortran 代码、纯 C++ 代码、Pyrex 源代码和一个 setup.py 脚本用于构建 f2py 和 Pyrex 模块。
简介¶
这是一份使用 Python 进行性能计算的简单入门文档。我们将使用 NumPy、SciPy 的 weave(使用 weave.blitz 和 weave.inline)以及 Pyrex。我们还将展示如何使用 f2py 包装 Fortran 子例程并在 Python 中调用它。
我们还将利用这个机会对在 Python 中解决特定数值问题的各种方法进行基准测试,并将它们与 C++ 中的算法实现进行比较。
问题描述¶
我们将考虑的示例是使用迭代有限差分方案(四点平均、高斯-赛德尔或高斯-约旦)求解二维拉普拉斯方程的非常简单(即微不足道)的情况。问题的正式规范如下。我们需要求解某个未知函数 u(x,y),使得 ∇2u = 0,并指定边界条件。为了方便起见,感兴趣的域被认为是矩形,并且给出了该矩形边上的边界值。
可以证明,这个问题可以使用简单的四点平均方案求解,如下所示。将域离散化为 (nx x ny) 个点的网格。然后,函数 u 可以表示为二维数组 - u(nx, ny)。矩形边上的 u 值是给定的。可以通过以下方式迭代求解。
for i in range(1, nx-1):
for j in range(1, ny-1):
u[i,j] = ((u[i-1, j] + u[i+1, j])*dy**2 +
(u[i, j-1] + u[i, j+1])*dx**2)/(2.0*(dx**2 + dy**2))
其中 dx 和 dy 是离散域沿 x 轴和 y 轴的长度。
数值解¶
在纯 Python 中实现一个求解器非常简单。使用一个简单的 NumPy 数组来存储解矩阵 u。以下代码演示了一个简单的求解器。
import numpy
class Grid:
"""A simple grid class that stores the details and solution of the
computational grid."""
def __init__(self, nx=10, ny=10, xmin=0.0, xmax=1.0,
ymin=0.0, ymax=1.0):
self.xmin, self.xmax, self.ymin, self.ymax = xmin, xmax, ymin, ymax
self.dx = float(xmax-xmin)/(nx-1)
self.dy = float(ymax-ymin)/(ny-1)
self.u = numpy.zeros((nx, ny), 'd')
# used to compute the change in solution in some of the methods.
self.old_u = self.u.copy()
def setBCFunc(self, func):
"""Sets the BC given a function of two variables."""
xmin, ymin = self.xmin, self.ymin
xmax, ymax = self.xmax, self.ymax
x = numpy.arange(xmin, xmax + self.dx*0.5, self.dx)
y = numpy.arange(ymin, ymax + self.dy*0.5, self.dy)
self.u[0 ,:] = func(xmin,y)
self.u[-1,:] = func(xmax,y)
self.u[:, 0] = func(x,ymin)
self.u[:,-1] = func(x,ymax)
def computeError(self):
"""Computes absolute error using an L2 norm for the solution.
This requires that self.u and self.old_u must be appropriately
setup."""
v = (self.u - self.old_u).flat
return numpy.sqrt(numpy.dot(v,v))
class LaplaceSolver:
"""A simple Laplacian solver that can use different schemes to
solve the problem."""
def __init__(self, grid, stepper='numeric'):
self.grid = grid
self.setTimeStepper(stepper)
def slowTimeStep(self, dt=0.0):
"""Takes a time step using straight forward Python loops."""
g = self.grid
nx, ny = g.u.shape
dx2, dy2 = g.dx**2, g.dy**2
dnr_inv = 0.5/(dx2 + dy2)
u = g.u
err = 0.0
for i in range(1, nx-1):
for j in range(1, ny-1):
tmp = u[i,j]
u[i,j] = ((u[i-1, j] + u[i+1, j])*dy2 +
(u[i, j-1] + u[i, j+1])*dx2)*dnr_inv
diff = u[i,j] - tmp
err += diff*diff
return numpy.sqrt(err)
def setTimeStepper(self, stepper='numeric'):
"""Sets the time step scheme to be used while solving given a
string which should be one of ['slow', 'numeric', 'blitz',
'inline', 'fastinline', 'fortran']."""
if stepper == 'slow':
self.timeStep = self.slowTimeStep
# ...
else:
self.timeStep = self.numericTimeStep
def solve(self, n_iter=0, eps=1.0e-16):
err = self.timeStep()
count = 1
while err > eps:
if n_iter and count >= n_iter:
return err
err = self.timeStep()
count = count + 1
return count
这段代码非常简单,很容易编写,但如果我们对任何较大的问题(比如 500 x 500 的网格点)运行它,我们会发现它需要很长时间才能运行。在这种情况下,CPU 的瓶颈是 slowTimeStep
方法。在下一节中,我们将使用 NumPy 来加速它。
使用 NumPy¶
事实证明,LaplaceSolver.slowTimeStep
方法的最内层循环可以用一个更简单的 NumPy 表达式来表示。这里是一个重写的 timeStep 方法。
def numericTimeStep(self, dt=0.0):
"""Takes a time step using a NumPy expression."""
g = self.grid
dx2, dy2 = g.dx**2, g.dy**2
dnr_inv = 0.5/(dx2 + dy2)
u = g.u
g.old_u = u.copy() # needed to compute the error.
# The actual iteration
u[1:-1, 1:-1] = ((u[0:-2, 1:-1] + u[2:, 1:-1])*dy2 +
(u[1:-1,0:-2] + u[1:-1, 2:])*dx2)*dnr_inv
return g.computeError()
整个 for i 和 j 循环都被一个单独的 NumPy 表达式替换了。NumPy 表达式按元素进行操作,因此上面的表达式有效。它基本上计算了四点平均值。如果你已经学习了 NumPy 教程并使用过 NumPy,你应该能够理解它是如何工作的。这个表达式的妙处在于它完全在 C 中完成。这使得计算速度快得多。为了快速比较,以下是一些关于 500x500 网格上单次迭代的数字。在 PIII 450Mhz 和 192 MB RAM 上,上面的代码大约需要 0.3 秒,而之前的代码大约需要 15 秒。这接近 50 倍的速度提升。你还会注意到一些事情。
- 我们不能像之前在 for 循环中那样计算误差。我们需要复制数据,然后使用 computeError 函数来完成。这会消耗内存,而且不太美观。这当然是一个限制,但值得 50 倍的速度提升。
- 这个表达式将使用临时变量。因此,在一次迭代中,在已经计算过的位置计算的值不会在迭代过程中使用。例如,在原始的 for 循环中,一旦计算出 u[1,1] 的值,u[1,2] 的下一个值将使用新计算的 u[1,1] 而不是旧的值。但是,由于 NumPy 表达式在内部使用临时变量,因此只会使用 u[1,1] 的旧值。在本例中这不是一个严重的问题,因为已知即使发生这种情况,算法也会收敛(但时间会增加一倍,这会将收益降低一半,但仍然让我们获得了 25 倍的提升)。
除了这两个问题,很明显使用 NumPy 可以极大地提高速度。现在我们将使用神奇的 weave 包来进一步加速它。
使用 weave.blitz¶
如果我们使用 weave.blitz,NumPy 表达式可以加速很多。以下是新的函数。
# import necessary modules and functions
from scipy import weave
# ...
def blitzTimeStep(self, dt=0.0):
"""Takes a time step using a NumPy expression that has been
blitzed using weave."""
g = self.grid
dx2, dy2 = g.dx**2, g.dy**2
dnr_inv = 0.5/(dx2 + dy2)
u = g.u
g.old_u = u.copy()
# The actual iteration
expr = "u[1:-1, 1:-1] = ((u[0:-2, 1:-1] + u[2:, 1:-1])*dy2 + "\
"(u[1:-1,0:-2] + u[1:-1, 2:])*dx2)*dnr_inv"
weave.blitz(expr, check_size=0)
return g.computeError()
如果你注意到,唯一改变的是我们用引号括起原始的数字表达式,并将这个字符串称为 'expr',然后调用 weave.blitz。当 'check_size' 关键字设置为 1 时,它会进行一些健全性检查,在调试代码时使用。但是,为了纯粹的速度,最好将其设置为 0。这次当我们为 500x500 数组的单次迭代计时代码时,它只花了大约 0.1 秒,这大约是三倍的增长!这里还有几点需要注意。
- 第一次调用此方法时,它将花费很长时间在幕后进行一些操作。下次调用它时,它将立即运行。有关此的更多详细信息,请参阅 weave 文档。基本上,weave.blitz 将 NumPy 表达式转换为 C++ 代码,并使用 blitz++ 进行数组表达式,构建一个 Python 模块,将其存储在特殊位置,并在下次调用函数时调用它。
- 同样,我们需要使用一个临时数组来计算误差。
- blitz 不使用临时变量进行计算,因此其行为更像原始的(慢速)for 循环,即计算的值会立即被重新使用。
除了这些要点之外,结果与原始的 for 循环相同。它比原始代码快大约 170 倍!现在我们将看看另一种加速原始 for 循环的方法。进入 weave.inline!
使用 weave.inline¶
Inline 允许将 C 或 C++ 代码直接嵌入到 Python 代码中。以下是在代码中嵌入的简单版本。
from scipy.weave import converters
# ...
def inlineTimeStep(self, dt=0.0):
"""Takes a time step using inlined C code -- this version uses
blitz arrays."""
g = self.grid
nx, ny = g.u.shape
dx2, dy2 = g.dx**2, g.dy**2
dnr_inv = 0.5/(dx2 + dy2)
u = g.u
code = """
#line 120 "laplace.py" (This is only useful for debugging)
double tmp, err, diff;
err = 0.0;
for (int i=1; i<nx-1; ++i) {
for (int j=1; j<ny-1; ++j) {
tmp = u(i,j);
u(i,j) = ((u(i-1,j) + u(i+1,j))*dy2 +
(u(i,j-1) + u(i,j+1))*dx2)*dnr_inv;
diff = u(i,j) - tmp;
err += diff*diff;
}
}
return_val = sqrt(err);
"""
# compiler keyword only needed on windows with MSVC installed
err = weave.inline(code,
['u', 'dx2', 'dy2', 'dnr_inv', 'nx', 'ny'],
type_converters=converters.blitz,
compiler = 'gcc')
return err
代码本身看起来非常直观(这也是内联如此酷的原因)。内联函数调用的参数都是自解释的。带有 '#line 120 ...' 的行仅用于调试,不会以任何方式影响速度。同样,第一次运行此函数时,它需要很长时间才能在幕后完成一些操作,而下一次运行时,它就会飞速完成。这次要注意,我们在循环内部有更大的灵活性,并且可以轻松地计算误差项,而无需使用临时数组。对该版本的计时结果表明,对于 500x500 数组,每次迭代仅需 0.04 秒!这对应于比普通 for 循环快 375 倍的速度提升。请记住,我们没有牺牲 Python 的任何令人难以置信的灵活性!此循环包含看起来非常漂亮的代码,但如果需要,我们可以通过编写一些脏代码来进一步提高速度。我们在这里不会深入讨论,但足以说明,通过使用不同的方法可以获得两倍的速度提升。此代码基本上对 NumPy 数组数据执行指针运算,而不是使用 blitz++ 数组。此代码由 Eric Jones 贡献。本文附带的源代码包含此代码。
接下来,我们看看如何使用 f2py 在 Fortran 中轻松实现循环并从 Python 中调用它。
使用 f2py¶
f2py 是一款很棒的工具,它允许您轻松地从 Python 中调用 Fortran 函数。首先,我们将编写一个小型 Fortran77 子例程来执行我们的计算。以下是代码。
c File flaplace.f
subroutine timestep(u,n,m,dx,dy,error)
double precision u(n,m)
double precision dx,dy,dx2,dy2,dnr_inv,tmp,diff
integer n,m,i,j
cf2py intent(in) :: dx,dy
cf2py intent(in,out) :: u
cf2py intent(out) :: error
cf2py intent(hide) :: n,m
dx2 = dx*dx
dy2 = dy*dy
dnr_inv = 0.5d0 / (dx2+dy2)
error = 0
do 200,j=2,m-1
do 100,i=2,n-1
tmp = u(i,j)
u(i,j) = ((u(i-1,j) + u(i+1,j))*dy2+
& (u(i,j-1) + u(i,j+1))*dx2)*dnr_inv
diff = u(i,j) - tmp
error = error + diff*diff
100 continue
200 continue
error = sqrt(error)
end
以 cf2py 开头的行是特殊的 f2py 指令,在 f2py 中有文档记录。对于那些了解一些 Fortran 的人来说,其余代码非常直观。我们使用以下命令为其轻松创建 Python 模块。
% f2py -c flaplace.f -m flaplace
以下是 Python 方面的代码。
import flaplace
def fortranTimeStep(self, dt=0.0):
"""Takes a time step using a simple fortran module that
implements the loop in Fortran. """
g = self.grid
g.u, err = flaplace.timestep(g.u, g.dx, g.dy)
return err
就是这样!希望有一天 scipy.weave 能够让我们在内联中执行此操作,而无需我们编写单独的 Fortran 文件。Fortran 代码和 f2py 示例由 f2py 的作者 Pearu Peterson 贡献。无论如何,使用此模块,对于 500x500 网格,每次迭代大约需要 0.029 秒!这比原始代码快了大约 500 倍。
f2py 还可以与更现代的 Fortran 版本一起使用。这很有用,因为 Fortran90 具有特殊的数组功能,允许更紧凑、更美观和更快的代码。以下是 Fortran90 中的相同子例程
! File flaplace90_arrays.f90
subroutine timestep(u,n,m,dx,dy,error)
implicit none
!Pythonic array indices, from 0 to n-1.
real (kind=8), dimension(0:n-1,0:m-1), intent(inout):: u
real (kind=8), intent(in) :: dx,dy
real (kind=8), intent(out) :: error
integer, intent(in) :: n,m
real (kind=8), dimension(0:n-1,0:m-1) :: diff
real (kind=8) :: dx2,dy2,dnr_inv
!f2py intent(in) :: dx,dy
!f2py intent(in,out) :: u
!f2py intent(out) :: error
!f2py intent(hide) :: n,m
dx2 = dx*dx
dy2 = dy*dy
dnr_inv = 0.5d0 / (dx2+dy2)
diff=u
u(1:n-2, 1:m-2) = ((u(0:n-3, 1:m-2) + u(2:n-1, 1:m-2))*dy2 + &
(u(1:n-2,0:m-3) + u(1:n-2, 2:m-1))*dx2)*dnr_inv
error=sqrt(sum((u-diff)**2))
end subroutine
请注意,操作是在一行中执行的,非常类似于 Numpy。编译步骤完全相同。在 1000x1000 网格中,此代码比 fortran77 循环快 2.2 倍。
源代码包(perfpy_2.tgz)还包含一个使用 forall 结构的 Fortran95 子例程,其性能非常相似。
使用 Pyrex/Cython¶
我们还使用 Pyrex 中的代码实现了 timeStep 函数,该代码来自快速内联版本。Pyrex 源代码比 weave、blitz 或 Fortran 代码略长,因为我们必须公开 NumPy 数组结构。基本函数如下所示。
Travis Oliphant 写了一个 Cython 版本 的示例。
def pyrexTimeStep(ndarray u, double dx, double dy):
if chr(u.descr.type) <> "d":
raise TypeError("Double array required")
if u.nd <> 2:
raise ValueError("2 dimensional array required")
cdef int nx, ny
cdef double dx2, dy2, dnr_inv, err
cdef double *elem
nx = u.dimensions[0]
ny = u.dimensions[1]
dx2, dy2 = dx**2, dy**2
dnr_inv = 0.5/(dx2 + dy2)
elem = u.data
err = 0.0
cdef int i, j
cdef double *uc, *uu, *ud, *ul, *ur
cdef double diff, tmp
for i from 1 <= i < nx-1:
uc = elem + i*ny + 1
ur = elem + i*ny + 2
ul = elem + i*ny
uu = elem + (i+1)*ny + 1
ud = elem + (i-1)*ny + 1
for j from 1 <= j < ny-1:
tmp = uc[0]
uc[0] = ((ul[0] + ur[0])*dy2 +
(uu[0] + ud[0])*dx2)*dnr_inv
diff = uc[0] - tmp
err = err + diff*diff
uc = uc + 1; ur = ur + 1; ul = ul + 1
uu = uu + 1; ud = ud + 1
return sqrt(err)
该函数看起来很长,但写起来并不难。也可以在不进行指针运算的情况下编写,方法是提供方便的函数来访问数组。但是,上面显示的代码速度很快。本文提供的源代码包含完整的 Pyrex 文件以及用于构建它的 setup.py 脚本。对该版本进行计时,我们发现该版本与快速内联版本一样快,仅需 0.025 秒。
使用 Matlab 和 Octave¶
我们在 Matlab 和 Octave 中实现了 Numeric 版本(laplace.m),并在不同的计算机上运行测试(因此下表中的“估计”值)。我们发现 Matlab 中没有获得明显的加速,而 Octave 的运行速度比 NumPy 慢两倍。详细的图表可以在这里找到 这里。
C++ 中的实现¶
最后,为了比较,我们在简单的 C++ 中实现了这一点(没有花哨的东西),没有任何 Python。人们会期望 C++ 代码更快,但令人惊讶的是,并没有快多少!考虑到用 Python 开发非常容易,这种速度降低并不重要。
最终比较¶
以下是 500x500 网格在 100 次迭代中的计时结果。请注意,我们还比较了使用慢速 Python 版本以及 Psyco 的结果。
解决方案类型 | 耗时(秒) |
---|---|
Python(估计) | 1500.0 |
Python + Psyco(估计) | 1138.0 |
Python + NumPy 表达式 | 29.3 |
Blitz | 9.5 |
内联 | 4.3 |
快速内联 | 2.3 |
Python/Fortran | 2.9 |
Pyrex | 2.5 |
Matlab(估计) | 29.0 |
Octave(估计) | 60.0 |
纯 C++ | 2.16 |
考虑到 Python 的灵活性和强大功能,这真是太棒了。
从此处下载本指南的源代码:perfpy_2.tgz
查看示例的完整 Python 代码:laplace.py
查看示例的完整 Matlab/Octave 代码:laplace.m
章节作者:Unknown[6], Unknown[4], DavidLinke, Unknown[156], Unknown[157], MartinSpacek, Unknown[158], Pauli Virtanen
附件