concurrent.futures進行並發編程


Python中進行並發編程一般使用threading和multiprocessing模塊,不過大部分的並發編程任務都是派生一系列線程,從隊列中收集資源,然后用隊列收集結果。在這些任務中,往往需要生成線程池,concurrent.futures模塊對threading和multiprocessing模塊進行了進一步的包裝,可以很方便地實現池的功能。

下載

python3中concurrent.futures是標准庫,在python2中還需要自己安裝futures:

pip install futures

Executor與Future

concurrent.futures供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,都繼承自Executor,分別被用來創建線程池和進程池,接受max_workers參數,代表創建的線程數或者進程數。ProcessPoolExecutor的max_workers參數可以為空,程序會自動創建基於電腦cpu數目的進程數。

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests

def load_url(url):
    return requests.get(url)

url = 'http://httpbin.org'
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(load_url, url)

Executor中定義了submit()方法,這個方法的作用是提交一個可執行的回調task,並返回一個future實例。future能夠使用done()方法判斷該任務是否結束,done()方法是不阻塞的,使用result()方法可以獲取任務的返回值,這個方法是阻塞的。

print future.done()
print future.result().status_code

Future類似於js中的Promise,可以添加回調函數:

future.add_done_callback(fn)

回調函數fn在future取消或者完成后運行,參數是future本身。

submit()方法只能進行單個任務,用並發多個任務,需要使用map與as_completed。

map

URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']

def load_url(url):
    return requests.get(url)

with ThreadPoolExecutor(max_workers=3) as executor:
    for url, data in zip(URLS, executor.map(load_url, URLS)):
        print('%r page status_code %s' % (url, data.status_code))

 結果:

'http://httpbin.org' page status_code 200
'http://example.com/' page status_code 200
'https://api.github.com/' page status_code 200

 map方法接收兩個參數,第一個為要執行的函數,第二個為一個序列,會對序列中的每個元素都執行這個函數,返回值為執行結果組成的生成器。

   由上面可以看出返回結果與序列結果的順序是一致的

as_completed

  as_completed()方法返回一個Future組成的生成器,在沒有任務完成的時候,會阻塞,在有某個任務完成的時候,會yield這個任務,直到所有的任務結束。

def load_url(url):
    return url, requests.get(url).status_code

with ThreadPoolExecutor(max_workers=3) as executor:
    tasks = [executor.submit(load_url, url) for url in URLS]
    for future in as_completed(tasks):
        print future.result()

 結果:

('http://example.com/', 200)
('http://httpbin.org', 200)
('https://api.github.com/', 200)

 可以看出,結果與序列順序不一致,先完成的任務會先通知主線程。

wait

   wait方法可以讓主線程阻塞,直到滿足設定的要求。有三種條件ALL_COMPLETED, FIRST_COMPLETED,FIRST_EXCEPTION。

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
from concurrent.futures import as_completed
import requests

URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']

def load_url(url):
    requests.get(url)
    print url

with ThreadPoolExecutor(max_workers=3) as executor:
    tasks = [executor.submit(load_url, url) for url in URLS]
    wait(tasks, return_when=ALL_COMPLETED)
    print 'all_cone'

 返回:

http://example.com/
http://httpbin.org
https://api.github.com/
all_cone

 可以看出阻塞到任務全部完成。

ProcessPoolExecutor

使用ProcessPoolExecutor與ThreadPoolExecutor方法基本一致,注意文檔中有一句:

The __main__ module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter.

需要__main__模塊。

def main():
    with ProcessPoolExecutor() as executor:
        tasks = [executor.submit(load_url, url) for url in URLS]
        for f in as_completed(tasks):
            ret = f.done()
            if ret:
                print f.result().status_code

if __name__ == '__main__':
    main()

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM