Python幾種並發實現方案的性能比較


http://blog.csdn.net/permike/article/details/54846831

Python幾種並發實現方案的性能比較

分類:

目錄(?)[+]

1.  前言

偶然看到Erlang vs. Stackless python: a first benchmark,對Erlang和Stackless Python的並發處理性能進行了實驗比較,基本結論認為二者有比較相近的性能。我看完產生的問題是,Stackless python與Python的其他並發實現機制性能又會有多大區別呢,比如線程和進程。因此我采用與這篇文章相同的辦法來對Stackless Python、普通Python的thread模塊、普通Python的threading模塊、普通Python的processing模塊這四種並發實現方案進行了性能實驗,並將實驗過程和基本結果記錄在這里。

后來看到了基於greenlet實現的高性能網絡框架Eventlet,因而更新了實驗方案,將greenlet也加入了比較,雖然greenlet並非是一種真正意義上的並發處理,而是在單個線程下對程序塊進行切換輪流執行。

2.  實驗方案

實驗方案與Erlang vs. Stackless python: a first benchmark是相同的,用每種方案分別給出如下問題的實現,記錄完成整個處理過程的總時間來作為評判性能的依據:

  1. 由n個節點組成一個環狀網絡,在上面傳送共m個消息。
  2. 將每個消息(共m個),逐個發送給1號節點。
  3. 第1到n-1號節點在接收到消息后,都轉發給下一號節點。
  4. 第n號節點每次收到消息后,不再繼續轉發。
  5. 當m個消息都從1號逐個到達第n號節點時,認為全部處理結束。

2.1  硬件平台

Macbook Pro 3,1上的Vmware Fusion 1.0虛擬機中,注意這里給虛擬機只啟用了cpu的單個核心:

  • 原始Cpu:Core 2 Duo,2.4 GHz,2核心,4 MB L2 緩存,總線速度800 MHz
  • 分配給虛擬機的內存:796M

2.2  軟件平台

Vmware Fusion 1.0下的Debian etch:

  • 原始Python:Debian發行版自帶Python 2.4.4
  • Python 2.4.4 Stackless 3.1b3 060516
  • processing-0.52-py2.4-linux-i686.egg
  • 原始Python下的greenlet實現:py lib 0.9.2

3.  實驗過程及結果

各方案的實現代碼見后文。實驗時使用time指令記錄每次運行的總時間,選用的都是不做任何輸出的no_io實現(Python的print指令還是挺耗資源的,如果不注釋掉十有八九得影響測試結果),每次執行時設定n=300,m=10000(Erlang vs. Stackless python: a first benchmark文章中認為n可以設置為300,m則可以取10000到90000之間的數值分別進行測試)。

3.1  Stackless Python的實驗結果

real0m4.749s
user0m4.716s
sys0m0.028s

3.2  使用thread模塊的實驗結果

real1m9.222s
user0m34.418s
sys0m34.622s

也有時這樣:

real3m43.539s
user0m15.345s
sys3m27.953s

3.5  greenlet模塊的實驗結果

real    0m21.610s
user    0m20.713s
sys     0m0.215s

4.  結論與分析

4.1  Stackless Python

毫無疑問,Stackless Python幾乎有匪夷所思的並發性能,比其他方案快上幾十倍,而且借助Stackless Python提供的channel機制,實現也相當簡單。也許這個結果向我們部分揭示了沈仙人基於Stackless Python實現的Eurasia3能夠提供相當於C語言效果的恐怖並發性能的原因。

4.2  Python線程

從道理上來講,thread模塊似乎應該和threading提供基本相同的性能,畢竟threading只是對thread的一種封裝嘛,后台機制應該是一致的。或許threading由於本身類實例維護方面的開銷,應該會比直接用thread慢一點。從實驗結果來看,二者性能也確實差不多。只是不大明白為何threading方案的測試結果不是很穩定,即使對其他方案的測試運行多次,誤差也不會像threading這么飄。從代碼實現體驗來說,用threading配合Queue比直接用thread實在是輕松太多了,並且出錯的機會也要少很多。

4.3  Python進程

processing模塊給出的進程方案大致比thread線程要慢一倍,並且這是在我特意調整虛擬機給它預備了足夠空閑內存、避免使用交換分區的情況下取得的(特意分給虛擬機700多M內存就是為了這個)。而其他方案僅僅占用數M內存,完全無需特意調大可用內存總量。當然,如果給虛擬機多啟用幾個核心的話,processing也許會占上點便宜,畢竟目前thread模塊是不能有效利用多cpu資源的(經實驗,Stackless Python在開啟雙核的情況下表現的性能和單核是一樣的,說明也是不能有效利用多cpu)。因此一種比較合理的做法是根據cpu的數量,啟用少量幾個進程,而在進程內部再開啟線程進行實際業務處理,這也是目前Python社區推薦的有效利用多cpu資源的辦法。好在processing配合其自身提供的Queue模塊,編程體驗還是比較輕松的。

4.4  greenlet超輕量級方案

基於greenlet的實現則性能僅次於Stackless Python,大致比Stackless Python慢一倍,比其他方案快接近一個數量級。其實greenlet不是一種真正的並發機制,而是在同一線程內,在不同函數的執行代碼塊之間切換,實施“你運行一會、我運行一會”,並且在進行切換時必須指定何時切換以及切換到哪。greenlet的接口是比較簡單易用的,但是使用greenlet時的思考方式與其他並發方案存在一定區別。線程/進程模型在大邏輯上通常從並發角度開始考慮,把能夠並行處理的並且值得並行處理的任務分離出來,在不同的線程/進程下運行,然后考慮分離過程可能造成哪些互斥、沖突問題,將互斥的資源加鎖保護來保證並發處理的正確性。greenlet則是要求從避免阻塞的角度來進行開發,當出現阻塞時,就顯式切換到另一段沒有被阻塞的代碼段執行,直到原先的阻塞狀況消失以后,再人工切換回原來的代碼段繼續處理。因此,greenlet本質是一種合理安排了的串行,實驗中greenlet方案能夠得到比較好的性能表現,主要也是因為通過合理的代碼執行流程切換,完全避免了死鎖和阻塞等情況(執行帶屏幕輸出的ring_greenlet.py我們會看到腳本總是一個一個地處理消息,把一個消息在環上從頭傳到尾之后,再開始處理下一個消息)。因為greenlet本質是串行,因此在沒有進行顯式切換時,代碼的其他部分是無法被執行到的,如果要避免代碼長時間占用運算資源造成程序假死,那么還是要將greenlet與線程/進程機制結合使用(每個線程、進程下都可以建立多個greenlet,但是跨線程/進程時greenlet之間無法切換或通訊)。

Stackless則比較特別,對很多資源從底層進行了並發改造,並且提供了channel等更適合“並發”的通訊機制實現,使得資源互斥沖突的可能性大大減小,並發性能自然得以提高。粗糙來講,greenlet是“阻塞了我就先干點兒別的,但是程序員得明確告訴greenlet能先干點兒啥以及什么時候回來”;Stackless則是“東西我已經改造好了,你只要用我的東西,並發沖突就不用操心,只管放心大膽地並發好了”。greenlet應該是學習了Stackless的上下文切換機制,但是對底層資源沒有進行適合並發的改造。並且實際上greenlet也沒有必要改造底層資源的並發性,因為它本質是串行的單線程,不與其他並發模型混合使用的話是無法造成對資源的並發訪問的。

greenlet 封裝后的 eventlet 方案

eventlet 是基於 greenlet 實現的面向網絡應用的並發處理框架,提供“線程”池、隊列等與其他 Python 線程、進程模型非常相似的 api,並且提供了對 Python 發行版自帶庫及其他模塊的超輕量並發適應性調整方法,比直接使用 greenlet 要方便得多。並且這個解決方案源自著名虛擬現實游戲“第二人生”,可以說是久經考驗的新興並發處理模型。其基本原理是調整 Python 的 socket 調用,當發生阻塞時則切換到其他 greenlet 執行,這樣來保證資源的有效利用。需要注意的是:

  • eventlet 提供的函數只能對 Python 代碼中的 socket 調用進行處理,而不能對模塊的 C 語言部分的 socket 調用進行修改。對后者這類模塊,仍然需要把調用模塊的代碼封裝在 Python 標准線程調用中,之后利用 eventlet 提供的適配器實現 eventlet 與標准線程之間的協作。
  • 再有,雖然 eventlet 把 api 封裝成了非常類似標准線程庫的形式,但兩者的實際並發執行流程仍然有明顯區別。在沒有出現 I/O 阻塞時,除非顯式聲明,否則當前正在執行的 eventlet 永遠不會把 cpu 交給其他的 eventlet,而標准線程則是無論是否出現阻塞,總是由所有線程一起爭奪運行資源。所有 eventlet 對 I/O 阻塞無關的大運算量耗時操作基本沒有什么幫助。

在性能測試結果方面,eventlet 消耗的運行時間大致是 greenlet 方案的 3 到 5 倍,而 Python 標准線程模型的 thread 方式消耗的運行時間大致是 eventlet 測試代碼的 8 到 10 倍。其中前者可能是因為我們在 eventlet 的測試代碼中,使用隊列機制來完成所有的消息傳遞,而隊列上的訪問互斥保護可能額外消耗了一些運算資源。總體而言,eventlet 模型的並發性能雖然比 Stackless Python 和直接使用 greenlet 有一定差距,但仍然比標准線程模型有大約一個數量級的優勢,這也就不奇怪近期很多強調並發性能的網絡服務器實現采取 eventlet 、線程、進程三者組合使用的實現方案。

5.  實驗代碼

實驗代碼下載:

  • 版本3 下載:增加了 eventlet 方案的實驗代碼。
  • 版本2 下載:增加了 greenlet 方案的實驗代碼。
  • 版本1 下載:包括 Stackless Python 、 thread 、 threading 、 processing 四種方案的實驗代碼。

為方便閱讀,將實驗中用到的幾個腳本的代碼粘貼如下,其中Stackless Python方案的代碼實現直接取自Erlang vs. Stackless python: a first benchmark

5.1  ring_no_io_slp.py

復制代碼
#!/Library/Frameworks/Python.framework/Versions/2.5/bin/python # encoding: utf-8 import sys import stackless as SL def run_benchmark(n, m): # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m)) firstP = cin = SL.channel() for s in xrange(1, n): seqn = s cout = SL.channel() # # print("*> s = %d" % (seqn, )) t = SL.tasklet(loop)(seqn, cin, cout) cin = cout else: seqn = s+1 # # print("$> s = %d" % (seqn, )) t = SL.tasklet(mloop)(seqn, cin) for r in xrange(m-1, -1, -1): # # print("+ sending Msg# %d" % r) firstP.send(r) SL.schedule() def loop(s, cin, cout): while True: r = cin.receive() cout.send(r) if r > 0: # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r)) pass else: # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s)) break def mloop(s, cin): while True: r = cin.receive() if r > 0: # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r)) pass else: # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s)) break def pid(): return repr(SL.getcurrent()).split()[-1][2:-1] if __name__ == '__main__': run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
復制代碼

5.2  ring_no_io_thread.py

復制代碼
#!/Library/Frameworks/Python.framework/Versions/2.5/bin/python # encoding: utf-8 import sys, time import thread SLEEP_TIME = 0.0001 def run_benchmark(n, m): # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m)) locks = [thread.allocate_lock() for i in xrange(n)] firstP = cin = [] cin_lock_id = 0 for s in xrange(1, n): seqn = s cout = [] cout_lock_id = s # print("*> s = %d" % (seqn, )) thread.start_new_thread(loop, (seqn, locks, cin, cin_lock_id, cout, cout_lock_id)) cin = cout cin_lock_id = cout_lock_id else: seqn = s+1 # print("$> s = %d" % (seqn, )) thread.start_new_thread(mloop, (seqn, locks, cin, cin_lock_id)) for r in xrange(m-1, -1, -1): # print("+ sending Msg# %d" % r) lock = locks[0] lock.acquire() firstP.append(r) lock.release() time.sleep(SLEEP_TIME) try: while True: time.sleep(SLEEP_TIME) except: pass def loop(s, locks, cin, cin_lock_id, cout, cout_lock_id): while True: lock = locks[cin_lock_id] lock.acquire() if len(cin) > 0: r = cin.pop(0) lock.release() else: lock.release() time.sleep(SLEEP_TIME) continue lock = locks[cout_lock_id] lock.acquire() cout.append(r) lock.release() if r > 0: # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r)) pass else: # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s)) break def mloop(s, locks, cin, cin_lock_id): while True: lock = locks[cin_lock_id] lock.acquire() if len(cin) > 0: r = cin.pop(0) lock.release() else: lock.release() time.sleep(SLEEP_TIME) continue if r > 0: # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r)) pass else: # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s)) break thread.interrupt_main() def pid(): return thread.get_ident() if __name__ == '__main__': run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
復制代碼

5.3  ring_no_io_queue.py

復制代碼
#!/Library/Frameworks/Python.framework/Versions/2.5/bin/python # encoding: utf-8 import sys import threading, Queue def run_benchmark(n, m): # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m)) firstP = cin = Queue.Queue() for s in xrange(1, n): seqn = s cout = Queue.Queue() # print("*> s = %d" % (seqn, )) t = Loop(seqn, cin, cout) t.setDaemon(False) t.start() cin = cout else: seqn = s+1 # print("$> s = %d" % (seqn, )) t = MLoop(seqn, cin) t.setDaemon(False) t.start() for r in xrange(m-1, -1, -1): # print("+ sending Msg# %d" % r) firstP.put(r) class Loop(threading.Thread): def __init__(self, s, cin, cout): threading.Thread.__init__(self) self.cin = cin self.cout = cout self.s = s def run(self): while True: r = self.cin.get() self.cout.put(r) if r > 0: # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), self.s, r)) pass else: # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), self.s)) break class MLoop(threading.Thread): def __init__(self, s, cin): threading.Thread.__init__(self) self.cin = cin self.s = s def run(self): while True: r = self.cin.get() if r > 0: # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), self.s, r)) pass else: # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), self.s)) break def pid(): return threading.currentThread() if __name__ == '__main__': run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
復制代碼

5.4  ring_no_io_proc.py

復制代碼
#!/Library/Frameworks/Python.framework/Versions/2.5/bin/python # encoding: utf-8 import sys import processing, Queue def run_benchmark(n, m): # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m)) firstP = cin = processing.Queue() for s in xrange(1, n): seqn = s cout = processing.Queue() # print("*> s = %d" % (seqn, )) p = processing.Process(target = loop, args = [seqn, cin, cout]) p.start() cin = cout else: seqn = s+1 # print("$> s = %d" % (seqn, )) p = processing.Process(target = mloop, args = [seqn, cin]) p.start() for r in xrange(m-1, -1, -1): # print("+ sending Msg# %d" % r) firstP.put(r) p.join() def loop(s, cin, cout): while True: r = cin.get() cout.put(r) if r > 0: # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r)) pass else: # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s)) break def mloop(s, cin): while True: r = cin.get() if r > 0: # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r)) pass else: # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s)) break def pid(): return processing.currentProcess() if __name__ == '__main__': run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
復制代碼

5.5  ring_no_io_greenlet.py

復制代碼
#!/Library/Frameworks/Python.framework/Versions/2.5/bin/python # encoding: utf-8 import sys from py.magic import greenlet def run_benchmark(n, m): # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m)) glets = [greenlet.getcurrent()] for s in xrange(1, n): seqn = s glets.append(greenlet(loop)) # print("*> s = %d" % (seqn, )) else: seqn = s+1 glets.append(greenlet(mloop)) # print("$> s = %d" % (seqn, )) glets[-1].switch(seqn, glets) for r in xrange(m-1, -1, -1): # print("+ sending Msg# %d" % r) glets[1].switch(r) def loop(s, glets): previous = glets[s - 1] next = glets[s + 1] if s > 1: r = previous.switch(s - 1, glets) else: r = previous.switch() while True: if r > 0: # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid("loop", s), s, r)) pass else: # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid("loop", s), s)) break next.switch(r) r = previous.switch() next.switch(r) def mloop(s, glets): previous = glets[s - 1] r = previous.switch(s - 1, glets) while True: if r > 0: # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid("mloop", s), s, r)) pass else: # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid("mloop", s), s)) break r = previous.switch() def pid(func, s): return "<<%s(Greenlet-%d, started)>>" % (func, s) if __name__ == '__main__': run_benchmark(int(sys.argv[1]), int(sys.argv[2]))
復制代碼

5.6  ring_no_io_eventlet.py

復制代碼
#!/Library/Frameworks/Python.framework/Versions/2.5/bin/python # encoding: utf-8 import sys import eventlet def run_benchmark(n, m): # print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m)) firstP = cin = eventlet.Queue() for s in xrange(1, n): seqn = s cout = eventlet.Queue() # print("*> s = %d" % (seqn, )) eventlet.spawn_n(loop, seqn, cin, cout) cin = cout else: seqn = s+1 # print("$> s = %d" % (seqn, )) for r in xrange(m-1, -1, -1): # print("+ sending Msg# %d" % r) firstP.put(r) mloop(seqn, cin) def loop(s, cin, cout): while True: r = cin.get() cout.put(r) if r > 0: # print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r)) pass else: # print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s)) break def mloop(s, cin): while True: r = cin.get() if r > 0: # print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r)) pass else: # print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s)) break def pid(): return eventlet.greenthread.getcurrent() if __name__ == '__main__': run_benchmark(int(sys.argv[1]), int(sys.argv[2]))


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM