python異步並發模塊concurrent.futures入門詳解


concurrent.futures是一個非常簡單易用的庫,主要用來實現多線程和多進程的異步並發。

本文主要對concurrent.futures庫相關模塊進行詳解,並分別提供了詳細的示例demo。

1. 模塊安裝

1) python 3.x中自帶了concurrent.futures模塊

2) python 2.7需要安裝futures模塊,使用命令pip install futures安裝即可

pypi地址:https://pypi.python.org/pypi/futures/

2. concurrent.futures模塊詳解

2.1 Executor對象

class concurrent.futures.Executor

Executor是一個抽象類,它提供了異步執行調用的方法。它不能直接使用,但可以通過它的兩個子類ThreadPoolExecutor或者ProcessPoolExecutor進行調用。

2.1.1 Executor.submit(fn, *args, **kwargs)

fn:需要異步執行的函數

*args, **kwargs:fn參數

示例:

#-*- coding:utf-8 -*-
from concurrent import futures
 
def test(num):
    import time
    return time.ctime(),num
 
with futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(test,1)
    print future.result()
 
>>>
('Tue Jan 17 15:23:10 2017', 1)

2.1.2 Executor.map(func, *iterables, timeout=None)

相當於map(func, *iterables),但是func是異步執行。timeout的值可以是int或float,如果操作超時,會返回raisesTimeoutError;如果不指定timeout參數,則不設置超時間。

func:需要異步執行的函數

*iterables:可迭代對象,如列表等。每一次func執行,都會從iterables中取參數。

timeout:設置每次異步操作的超時時間

示例:

#-*- coding:utf-8 -*-
from concurrent import futures
 
def test(num):
    import time
    return time.ctime(),num
 
data=[1,2,3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
    for future in executor.map(test,data):
        print future
 
>>>
('Tue Jan 17 15:23:47 2017', 1)
('Tue Jan 17 15:23:47 2017', 2)
('Tue Jan 17 15:23:47 2017', 3)

 

2.1.3 Executor.shutdown(wait=True)

釋放系統資源,在Executor.submit()或 Executor.map()等異步操作后調用。使用with語句可以避免顯式調用此方法

2.3 ThreadPoolExecutor對象

ThreadPoolExecutor類是Executor子類,使用線程池執行異步調用.

class concurrent.futures.ThreadPoolExecutor(max_workers)

使用max_workers數目的線程池執行異步調用

2.4 ProcessPoolExecutor對象

ThreadPoolExecutor類是Executor子類,使用進程池執行異步調用.

class concurrent.futures.ProcessPoolExecutor(max_workers=None)

使用max_workers數目的進程池執行異步調用,如果max_workers為None則使用機器的處理器數目(如4核機器max_worker配置為None時,則使用4個進程進行異步並發)。

示例:

#-*- coding:utf-8 -*-
from concurrent import futures
 
def test(num):
    import time
    return time.ctime(),num
 
def muti_exec(m,n):
    #m 並發次數
    #n 運行次數
 
    with futures.ProcessPoolExecutor(max_workers=m) as executor: #多進程
    #with futures.ThreadPoolExecutor(max_workers=m) as executor: #多線程
        executor_dict=dict((executor.submit(test,times), times) for times in range(m*n))
 
    for future in futures.as_completed(executor_dict):
        times = executor_dict[future]
        if future.exception() is not None:
            print('%r generated an exception: %s' % (times,future.exception()))
        else:
            print('RunTimes:%d,Res:%s'% (times, future.result()))
 
if __name__ == '__main__':
    muti_exec(5,1)
 
>>>
RunTimes:0,Res:('Tue Jan 17 15:56:53 2017', 0)
RunTimes:4,Res:('Tue Jan 17 15:56:53 2017', 4)
RunTimes:3,Res:('Tue Jan 17 15:56:53 2017', 3)
RunTimes:1,Res:('Tue Jan 17 15:56:53 2017', 1)
RunTimes:2,Res:('Tue Jan 17 15:56:53 2017', 2)

3. 附錄:Python GIL相關

要理解GIL的含義,我們需要從Python的基礎講起。像C++這樣的語言是編譯型語言,所謂編譯型語言,是指程序輸入到編譯器,編譯器再根據語言的語 法進行解析,然后翻譯成語言獨立的中間表示,最終鏈接成具有高度優化的機器碼的可執行程序。編譯器之所以可以深層次的對代碼進行優化,是因為它可以看到整 個程序(或者一大塊獨立的部分)。這使得它可以對不同的語言指令之間的交互進行推理,從而給出更有效的優化手段。

與此相反,Python是解釋型語言。程序被輸入到解釋器來運行。解釋器在程序執行之前對其並不了解;它所知道的只是Python的規則,以及在執行過程 中怎樣去動態的應用這些規則。它也有一些優化,但是這基本上只是另一個級別的優化。由於解釋器沒法很好的對程序進行推導,Python的大部分優化其實是 解釋器自身的優化。

現在我們來看一下問題的症結所在。要想利用多核系統,Python必須支持多線程運行。作為解釋型語言,Python的解釋器必須做到既安全又高效。我們都知道多線程編程會遇到的問題,解釋器要留意的是避免在不同的線程操作內部共享的數據,同時它還要保證在管理用戶線程時保證總是有最大化的計算資源。

那么,不同線程同時訪問時,數據的保護機制是怎樣的呢?答案是解釋器全局鎖。從名字上看能告訴我們很多東西,很顯然,這是一個加在解釋器上的全局(從解釋器的角度看)鎖(從互斥或者類似角度看)。這種方式當然很安全,但是它有一層隱含的意思(Python初學者需要了解這個):對於任何Python程序,不管有多少的處理器,任何時候都總是只有一個線程在執行。

”為什么我全新的多線程Python程序運行得比其只有一個線程的時候還要慢?“許多人在問這個問題時還是非常犯暈的,因為顯然一個具有兩個線程的程序要比其只有一個線程時要快(假設該程序確實是可並行的)。事實上,這個問題被問得如此頻繁以至於Python的專家們精心制作了一個標准答案:”不要使用多線程,請使用多進程”。

所以,對於計算密集型的,我還是建議不要使用python的多線程而是使用多進程方式,而對於IO密集型的,還是勸你使用多進程方式,因為使用多線程方式出了問題,最后都不知道問題出在了哪里,這是多么讓人沮喪的一件事情!

建議使用多進程並發而不是多線程並發!

4. 參考文檔

http://pythonhosted.org/futures/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM