高性能python


參考來源:Python金融大數據分析第八章

提高性能有如下方法

1、Cython,用於合並python和c語言靜態編譯泛型

2、IPython.parallel,用於在本地或者集群上並行執行代碼

3、numexpr,用於快速數值運算

4、multiprocessing,python內建的並行處理模塊

5、Numba,用於為cpu動態編譯python代碼

6、NumbaPro,用於為多核cpu和gpu動態編譯python代碼

 

為了驗證相同算法在上面不同實現上的的性能差異,我們先定義一個測試性能的函數

def perf_comp_data(func_list, data_list, rep=3, number=1): 
    '''Function to compare the performance of different functions. 
    Parameters 
    func_list : list 
    list with function names as strings

    data_list : list 
    list with data set names as strings 

    rep : int 
    number of repetitions of the whole comparison 
    
    number : int 
    number ofexecutions for every function 
    '''
    from timeit import repeat 
    res_list = {} 
    for name in enumerate(func_list): 
        stmt = name[1] + '(' + data_list[name[0]] + ')' 
        setup = "from __main__ import " + name[1] + ','+ data_list[name[0]] 
        results = repeat(stmt=stmt, setup=setup, repeat=rep, number=number) 
        res_list[name[1]] = sum(results) / rep
    res_sort = sorted(res_list.items(), key = lambda item : item[1])
    for item in res_sort: 
        rel = item[1] / res_sort[0][1]
        print ('function: ' + item[0] + ', av. time sec: %9.5f,   ' % item[1] + 'relative: %6.1f' % rel)

定義執行的算法如下

from math import * 
def f(x): 
    return abs(cos(x)) ** 0.5 + sin(2 + 3 * x)

對應的數學公式是

生成數據如下

i=500000
a_py = range(i)

第一個實現f1是在內部循環執行f函數,然后將每次的計算結果添加到列表中,實現如下

def f1(a): 
    res = [] 
    for x in a: 
        res.append(f(x)) 
    return res

當然實現這種方案的方法不止一種,可以使用迭代器或eval函數,我自己加入了使用生成器和map方法的測試,發現結果有明顯差距,不知道是否科學:

迭代器實現

def f2(a): 
    return [f(x) for x in a]

eval實現

def f3(a): 
    ex = 'abs(cos(x)) **0.5+ sin(2 + 3 * x)' 
    return [eval(ex) for x in a] 

生成器實現

def f7(a): 
    return (f(x) for x in a)

map實現

def f8(a): 
    return map(f, a)

接下來是使用numpy的narray結構的幾種實現

import numpy as np 
a_np = np.arange(i) 

def f4(a): 
    return (np.abs(np.cos(a)) ** 0.5 + np.sin(2 +  3 * a))

import numexpr as ne

def f5(a): 
    ex = 'abs(cos(a)) ** 0.5 + sin( 2 + 3 * a)' 
    ne.set_num_threads(1) 
    return ne.evaluate(ex)

def f6(a): 
    ex = 'abs(cos(a)) ** 0.5 + sin(2 + 3 * a)' 
    ne.set_num_threads(2) 
    return ne.evaluate(ex)

上面的f5和f6只是使用的處理器個數不同,可以根據自己電腦cpu的數目進行修改,也不是越大越好

下面進行測試

func_list = ['f1', 'f2', 'f3', 'f4', 'f5', 'f6', 'f7', 'f8'] 
data_list = ['a_py', 'a_py', 'a_py', 'a_np', 'a_np', 'a_np', 'a_py', 'a_py']
perf_comp_data(func_list, data_list)

測試結果如下

function: f8, av. time sec:   0.00000,   relative:    1.0
function: f7, av. time sec:   0.00001,   relative:    1.7
function: f6, av. time sec:   0.03787,   relative: 11982.7
function: f5, av. time sec:   0.05838,   relative: 18472.4
function: f4, av. time sec:   0.09711,   relative: 30726.8
function: f2, av. time sec:   0.82343,   relative: 260537.0
function: f1, av. time sec:   0.92557,   relative: 292855.2
function: f3, av. time sec:  32.80889,   relative: 10380938.6

發現f8的時間最短,調大一下時間精度再測一次

function: f8, av. time sec: 0.000002483,   relative:    1.0
function: f7, av. time sec: 0.000004741,   relative:    1.9
function: f5, av. time sec: 0.028068110,   relative: 11303.0
function: f6, av. time sec: 0.031389788,   relative: 12640.6
function: f4, av. time sec: 0.053619114,   relative: 21592.4
function: f1, av. time sec: 0.852619225,   relative: 343348.7
function: f2, av. time sec: 1.009691877,   relative: 406601.7
function: f3, av. time sec: 26.035869787,   relative: 10484613.6

發現使用map的性能最高,生成器次之,其他方法的性能就差的很遠了。但是使用narray數據的在一個數量級,使用python的list數據又在一個數量級。生成器的原理是並沒有生成一個完整的列表,而是在內部維護一個next函數,通過一邊循環迭代一遍生成下個元素的方法的實現的,所以他既不用在執行時遍歷整個循環,也不用分配整個空間,它花費的時間和空間跟列表的大小是沒有關系的,map與之類似,而其他實現都是跟列表大小有關系的。

 

內存布局

numpy的ndarray構造函數形式為

np.zeros(shape, dtype=float, order='C')

np.array(object, dtype=None, copy=True, order=None, subok=False, ndmin=0)

 shape或object定義了數組的大小或是引用了另一個一個數組

dtype用於定於元素的數據類型,可以是int8,int32,float8,float64等等

order定義了元素在內存中的存儲順序,c表示行優先,F表示列優先

下面來比較一下內存布局在數組很大時的差異,先構造同樣的的基於C和基於F的數組,代碼如下:

x = np.random.standard_normal(( 3, 1500000))
c  = np.array(x, order='C') 
f = np.array(x, order='F') 

下面來測試性能

%timeit c.sum(axis=0)
%timeit c.std(axis=0)
%timeit f.sum(axis=0)
%timeit f.std(axis=0)
%timeit c.sum(axis=1)
%timeit c.std(axis=1)
%timeit f.sum(axis=1)
%timeit f.std(axis=1)

輸出如下

100 loops, best of 3: 12.1 ms per loop
10 loops, best of 3: 83.3 ms per loop
10 loops, best of 3: 70.2 ms per loop
1 loop, best of 3: 235 ms per loop
100 loops, best of 3: 7.11 ms per loop
10 loops, best of 3: 37.2 ms per loop
10 loops, best of 3: 54.7 ms per loop
10 loops, best of 3: 193 ms per loop

可知,C內存布局要優於F內存布局

 

並行計算multiprocessing

首先要pip install multiprocessing安裝這個並行庫

利用Pool創建進程池的方法來實現並行計算

先看一個簡單的例子

from multiprocessing import Pool
import os, time, random
def long_time_task(name):
    print ('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print ('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print ('Parent process %s.' % os.getpid())
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print ('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print ('All subprocesses done.')

輸出結果為:

Parent process 54034.
Waiting for all subprocesses done...
Run task 1 (54875)...
Run task 2 (54877)...
Run task 0 (54873)...
Run task 3 (54878)...
Task 0 runs 1.06 seconds.
Task 2 runs 1.22 seconds.
Run task 4 (54873)...
Task 1 runs 2.60 seconds.
Task 3 runs 2.88 seconds.
Task 4 runs 1.88 seconds.
All subprocesses done.

對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用close()之后就不能繼續添加新的Process了。
請注意輸出的結果,task 0,1,2,3是立刻執行的,而task 4要等待前面某個task完成后才執行,這是因為Pool的默認大小在我的電腦上是4,因此,最多同時執行4個進程。這是Pool有意設計的限制,並不是操作系統的限制。如果改成:

p = Pool(5)

就可以同時跑5個進程。
由於Pool的默認大小是CPU的核數,如果你不幸擁有8核CPU,你要提交至少9個子進
程才能看到上面的等待效果。

上面說明了使用並行計算的方法,下面我們給出一個相同任務,測試它在不同的時間下所花費的時間

%pylab
import multiprocessing as mp 
import math 

def simulate_geometric_brownian_motion(p) : 
    M,I = p 
    # time steps, paths 
    S0 = 100; r = 0.05; sigma = 0.2; T = 1.0 
    # model parameters
    dt = T / M
    paths = np.zeros((M+1, I)) 
    paths[0] = S0 
    for t in range(1,M+1):
        paths[t] = paths[t -1]*np.exp((r-0.5 * sigma **2)* 
            dt + sigma*math.sqrt(dt)*np.random.standard_normal(I))
    return paths

I = 10000 # number of paths 
M = 100 # number of time steps 
t = 100 # number of tasks/simulations 

# running on server with 8 cores/16 threads 
from time import time 
times = [] 
for w in range(1, 17): 
    t0 = time() 
    pool = mp.Pool(processes=w) 
    # the pool of workers 
    result = pool.map(simulate_geometric_brownian_motion, t * [(M,I),]) 
    # the mapping of the function to the list of parameter tuples 
    times.append(time() -t0)

plt.plot(range(1, 17) , times)
plt.plot(range(1, 17) , times , 'ro')
plt.grid(True)
plt.xlabel('number of processes')
plt.ylabel('time in seconds')
plt. title( '%d Monte Carlo simulations' % t)

這是書上的源代碼對於simulate_geometric_brownian_motion算法,計算其在1到17個線程下所花費時間的不同,原書是在8核16cpu下測試的,測試圖如下

實際是在4核的ubuntu虛擬機測試的,並且計算量減少了很多,實際參數為

I = 100 # number of paths 
M = 10 # number of time steps 
t = 10 # number of tasks/simulations 

測試結果如下

差距太大了,要換電腦了,還以為死機了

 

 

 

 

未完,待續。。。。。。。。。。。。。。。。。。。。。。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM