python並發模塊之concurrent.futures(一)


 

Python3.2開始,標准庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫線程池/進程池提供了直接的支持,他屬於上層的封裝,對於用戶來說,不用在考慮那么多東西了。

官方參考資料:https://pythonhosted.org/futures/

1.Executor

Exectuor是基礎模塊,這是一個抽象類,其子類分為ThreadPoolExecutor和ProcessPoolExecutor,分別被用來創建線程池和進程池。

提供的方法如下:

Executor.submit(fn, *args, **kwargs)

fn:為需要異步執行的函數
args,kwargs:為給函數傳遞的參數
就來看看官網的這個例子:

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

  

我們使用submit方法來往線程池中加入一個task(pow函數),submit返回一個Future對象。其中future.result()的result方法的作用是拿到調用返回的結果。如果沒有執行完畢就會去等待。這里我們使用with操作符,使得當任務執行完成之后,自動執行shutdown函數,而無需編寫相關釋放代碼。
關於更多future的具體方法說明看后面的future部分解釋。

Executor.map(fn, *args, **kwargs)

map(func, *iterables, timeout=None)
此map函數和python自帶的map函數功能類似,只不過concurrent模塊的map函數從迭代器獲得參數后異步執行。並且,每一個異步操作,能用timeout參數來設置超時時間,timeout的值可以是int或float型,如果操作timeout的話,會raisesTimeoutError。如果timeout參數不指定的話,則不設置超時間。

func:為需要異步執行的函數
iterables:可以是一個能迭代的對象.
timeout:設置每次異步操作的超時時間

from concurrent.futures import ThreadPoolExecutor
import requests
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
        req= requests.get(url, timeout=60)
        print('%r page is %d bytes' % (url, len(req.content)))
executor = ThreadPoolExecutor(max_workers=3)
executor.map(load_url,URLS)
print('主線程結束')

  

submit函數和map函數,根據需要,選一個使用即可。

map返回一個迭代器,其中的回調函數的參數 最好是可以迭代的數據類型,如list;如果有 多個參數 則 多個參數的 數據長度相同;
如: pool.map(work,[[1,2],[3,4]],[0,1]]) 中 [1,2]對應0 ;[3,4]對應1 ;其實內部執行的函數為 work([1,2],0) ; work([3,4],1)
map返回的結果 是 有序結果;是根據迭代函數執行順序返回的結果
使用map的優點是 每次調用回調函數的結果不用手動的放入結果list中
Executor.shutdown(wait=True)

此函數用於釋放異步執行操作后的系統資源。Executor實現了enter__和__exit使得其對象可以使用with操作符。
在這里可以使用with上下文關鍵字代替,如上面第一個submit的例子。

2.Future對象

submit函數返回future對象,future提供了跟蹤任務執行狀態的方法,Future實例可以被Executor.submit()方法創建。除了測試之外不應該直接創建。

cancel():嘗試去取消調用。如果調用當前正在執行,不能被取消。這個方法將返回False,否則調用將會被取消,方法將返回True

cancelled():如果調用被成功取消返回True

running():如果當前正在被執行不能被取消返回True

done():如果調用被成功取消或者完成running返回True

result(Timeout = None):拿到調用返回的結果。如果沒有執行完畢就會去等待

exception(timeout=None):捕獲程序執行過程中的異常

add_done_callback(fn):將fn綁定到future對象上。當future對象被取消或完成運行時,fn函數將會被調用

3.wait方法

 wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。使用wait方法的一個優勢就是獲得更大的自由度,它接收三個參數FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默認設置為ALL_COMPLETED。

  如果采用默認的ALL_COMPLETED,程序會阻塞直到線程池里面的所有任務都完成,再執行主線程:

#!/usr/bin/env python  
# encoding: utf-8  
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import requests
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
    req = requests.get(url, timeout=60)
    print('%r page is %d bytes' % (url, len(req.content)))
executor = ThreadPoolExecutor(max_workers=3)
f_list = []
for url in URLS:
    future = executor.submit(load_url,url)
    f_list.append(future)
print(wait(f_list))
print('主線程結束')

  

如果采用FIRST_COMPLETED參數,程序並不會等到線程池里面所有的任務都完成。

from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import requests
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
    req=requests.get(url, timeout=60)
    print('%r page is %d bytes' % (url, len(req.content)))
executor = ThreadPoolExecutor(max_workers=3)
f_list = []
for url in URLS:
    future = executor.submit(load_url,url)
    f_list.append(future)
print(wait(f_list,return_when='FIRST_COMPLETED'))
print('主線程結束')

  

關於模塊的基本使用就是上面的這些。后續會做一些拓展或者案例。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM