From:https://www.cnblogs.com/weihengblog/p/9812110.html
concurrent.futures 官方文檔:https://docs.python.org/3/library/concurrent.futures.html
concurrent.futures: 線程池, 讓你更加高效, 並發的處理任務:https://www.h3399.cn/201906/703751.html
python 因為其全局解釋器鎖 GIL 而無法通過線程實現真正的平行計算。這個論斷我們不展開,但是有個概念我們要說明,
IO 密集型 vs 計算密集型:
- IO密集型:讀取文件,讀取網絡套接字頻繁。
- 計算密集型:大量消耗CPU的數學與邏輯運算,也就是我們這里說的平行計算。
而 concurrent.futures 模塊,可以利用 multiprocessing 實現真正的平行計算。
核心原理是:concurrent.futures 會以子進程的形式,平行的運行多個 python 解釋器,從而令 python 程序可以利用多核 CPU 來提升執行速度。由於 子進程 與 主解釋器 相分離,所以他們的全局解釋器鎖也是相互獨立的。每個子進程都能夠完整的使用一個CPU 內核。
Python 模塊 - Concurrent.futures
從 Python3.2開始,Python 標准庫提供了 concurrent.futures 模塊,為開發人員提供了啟動異步任務的高級接口。 它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 兩個類,實現了對 threading 和 multiprocessing 的更高級的抽象,對編寫 線程池/進程池 提供了直接的支持。 可以將相應的 tasks 直接放入線程池/進程池,不需要維護Queue來操心死鎖的問題,線程池/進程池會自動幫我們調度。
Future總結
1. python3自帶,python2需要安裝 2. Executer對象 它是一個抽象類,它提供了異步執行的方法,他不能直接使用,但可以通過它的子類 ThreadPoolExecuter和ProcessPoolExecuter 2.1 Executer.submit(fn, *args, **kwargs) fn: 需要異步執行的函數 *args,**kwargs fn 接受的參數 該方法的作用就是提交一個可執行的回調 task,它返回一個 Future 對象 2.2 map(fn, *iterables, timeout=None, chunksize=1) map(task,URLS) # 返回一個 map()迭代器,這個迭代器中的回調執行返回的結果是有序的 3. Future對象相關 future可以理解為一個在未來完成的操作,這是異步編程的基礎 通常情況下我們在遇到IO操作的時候,將會發生阻塞,cpu不能做其他事情 而future的引入幫助我們在這段等待時間可以完成其他的操作 3.1 done(): 如果當前線程已取消/已成功,返回True。 3.2 cance(): 如果當前線程正在執行,並且不能取消調用,返回Flase。否則調用取消,返回True 3.3 running(): 如果當前的線程正在執行,則返回True 3.4 result(): 返回調用返回的值,如果調用尚未完成,則此方法等待 如果等待超時,會拋出concurrent.futures.TimeoutError 如果沒有指定超時時間,則等待無時間限制 如果在完成之前,取消了Future,則會引發CancelledError 4. as_completed(): 在多個Future實例上的迭代器將會被返回 這些Future實例由fs完成時產生。 由fs返回的任何重復的Future,都會被返回一次。 里面保存的都是已經執行完成的Future對象 5. wait(): 返回一個元祖,元祖包含兩個元素 1. 已完成的future集合 2. 未完成的future集合
初體驗:
-
# coding=utf-8 from concurrent import futures from concurrent.futures import Future import time def return_future(msg): time.sleep(3) return msg pool = futures.ThreadPoolExecutor(max_workers=2) t1 = pool.submit(return_future,'hello') t2 = pool.submit(return_future,'world') time.sleep(3) print(t1.done()) # 如果順利完成,則返回True time.sleep(3) print(t2.done()) print(t1.result()) # 獲取future的返回值 time.sleep(3) print(t2.result()) print("主線程")
map(func,* iterables,timeout = None,chunksize = 1 )
# coding=utf-8 import time from concurrent.futures import Future,as_completed from concurrent.futures import ThreadPoolExecutor as Pool import requests import time URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com'] def task(url,timeout=10): return requests.get(url=url,timeout=timeout) pool = Pool() result = pool.map(task,URLS) start_time = time.time() # 按照 URLS 的順序返回 for res in result: print("{} {}".format(res.url,len(res.content))) # 無序的 with Pool(max_workers=3) as executer: future_task = [executer.submit(task,url) for url in URLS] for f in as_completed(future_task): if f.done(): f_ret = f.result() # f.result()得到task的返回值,requests對象 print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content))) print("耗時", time.time() - start_time) print("主線程")
Future對象
Future可以理解為一個未來完成的操作
當我們執行io操作的時候,在等待返回結果之前會產生阻塞
cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他操作
from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed import requests import time URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com'] def task(url,timeout=10): return requests.get(url=url,timeout=timeout) # start_time = time.time() # for url in URLS: # ret = task(url) # print("{} {}".format(ret.url,len(ret.content))) # print("耗時",time.time() - start_time) with Pool(max_workers=3) as executor: # 創建future任務 future_task = [executor.submit(task,url) for url in URLS] for f in future_task: if f.running(): print("%s is running"%str(f)) for f in as_completed(future_task): try: ret = f.done() if ret: f_ret = f.result() print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content))) except Exception as e: f.cance() print(e) """ url不是按照順序返回的,說明並發時,當訪問某一個url時,如果沒有得到返回結果,不會發生阻塞 <Future at 0x1c63990e6d8 state=running> is running <Future at 0x1c639922780 state=running> is running <Future at 0x1c639922d30 state=running> is running <Future at 0x1c63990e6d8 state=finished returned Response>, done, result: http://www.baidu.com/, 2381 <Future at 0x1c639922780 state=finished returned Response>, done, result: https://www.qq.com?fromdefault, 243101 <Future at 0x1c639922d30 state=finished returned Response>, done, result: http://sina.com/, 23103 """
模塊方法
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
wait() 會返回一個tuple,tuple 會包含兩個集合:已完成的集合 和 未完成的集合。使用 wait() 會獲得更大的自由度,他接受三個參數:FIRST_COMPLETED、FIRST_EXCEPTION、ALL_COMPLETE。默認為 ALL_COMPLETE。
如果采用默認的 ALL_COMPLETED,程序會阻塞直到線程池里面的所有任務都完成,再執行主線程:
from concurrent.futures import Future from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed, wait import requests URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com'] def task(url, timeout=10): r = requests.get(url=url, timeout=timeout) print(r.status_code) with Pool(max_workers=3) as execute: future_task = [execute.submit(task, url) for url in URLS] for f in future_task: if f.running(): print("%s" % (str(f))) """ 並且wait還有timeout和return_when兩個參數 return_when有三個常量 (默認是 ALL_COMPLETED) FIRST_COMPLETED 任何一個future_task執行完成時/取消時,改函數返回 FIRST_EXCEPTION 任何一個future_task發生異常時,該函數返回,如果沒有異常發生,等同於ALL_COMPLETED ALL_COMPLETED 當所有的future_task執行完畢返回。 """ results = wait(future_task, return_when="FIRST_COMPLETED") # done = results[0] for d in done: print(d)
concurrent.futures.as_completed(fs, timeout=None)
在多個 Future 實例上的迭代器將會被返回,這些 Future 實例由 fs 完成時產生。由 fs 返回的任何重復的 Future,都會被返回一次。里面保存的都是已經執行完成的 Future 對象。
from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed import requests import time URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com'] def task(url,timeout=10): return requests.get(url=url,timeout=timeout) with Pool(max_workers=3) as executor: # 創建future任務 future_task = [executor.submit(task,url) for url in URLS] for f in future_task: if f.running(): print("%s is running"%str(f)) for f in as_completed(future_task): try: ret = f.done() if ret: f_ret = f.result() print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content))) except Exception as e: f.cance() print(e)
下面我們將學習 concurrent.futures
模塊中的類。concurrent.futures 基礎模塊是 executor 和 future。
使用示例代碼:
# -*- coding:utf-8 -*- import redis from redis import WatchError from concurrent.futures import ProcessPoolExecutor r = redis.Redis(host='127.0.0.1', port=6379) # 減庫存函數, 循環直到減庫存完成 # 庫存充足, 減庫存成功, 返回True # 庫存不足, 減庫存失敗, 返回False def reduce_stock(): # python中redis事務是通過pipeline的封裝實現的 with r.pipeline() as pipe: while True: try: # watch庫存鍵, multi后如果該key被其他客戶端改變, 事務操作會拋出WatchError異常 pipe.watch('stock:count') count = int(pipe.get('stock:count')) if count > 0: # 有庫存 # 事務開始 pipe.multi() pipe.decr('stock:count') # 把命令推送過去 # execute返回命令執行結果列表, 這里只有一個decr返回當前值 print(pipe.execute()[0]) return True else: return False except WatchError as ex: # 打印WatchError異常, 觀察被watch鎖住的情況 print(ex) pipe.unwatch() def worker(): while True: # 沒有庫存就退出 if not reduce_stock(): break if __name__ == "__main__": # 設置庫存為100 r.set("stock:count", 100) # 多進程模擬多個客戶端提交 with ProcessPoolExecutor() as pool: for _ in range(10): pool.submit(worker)
concurrent.futures 模塊詳解
1. Executor對象
class concurrent.futures.Executor
Executor是一個抽象類,它提供了異步執行調用的方法。它不能直接使用,但可以通過它的兩個子類ThreadPoolExecutor或者ProcessPoolExecutor進行調用。
1.1 Executor.submit(fn, *args, **kwargs)
fn:需要異步執行的函數
*args, **kwargs:fn 的參數
示例代碼:
# -*- coding:utf-8 -*- from concurrent import futures def test(num): import time return time.ctime(), num with futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(test, 1) print(future.result())
1.2 Executor.map(func, *iterables, timeout=None)
相當於map(func, *iterables),但是func是異步執行。timeout的值可以是int或float,如果操作超時,會返回raisesTimeoutError;如果不指定timeout參數,則不設置超時間。
func:需要異步執行的函數
*iterables:可迭代對象,如列表等。每一次func執行,都會從iterables中取參數。
timeout:設置每次異步操作的超時時間
示例代碼:
# -*- coding:utf-8 -*- from concurrent import futures def test(num): import time return time.ctime(), num data = [1, 2, 3] with futures.ThreadPoolExecutor(max_workers=1) as executor: for future in executor.map(test, data): print(future)
1.3 Executor.shutdown(wait=True)
釋放系統資源,在Executor.submit()或 Executor.map()等異步操作后調用。使用with語句可以避免顯式調用此方法。
2. ThreadPoolExecutor對象
ThreadPoolExecutor類是Executor子類,使用線程池執行異步調用.
class concurrent.futures.ThreadPoolExecutor(max_workers),使用 max_workers 數目的線程池執行異步調用
3. ProcessPoolExecutor對象
ThreadPoolExecutor類是Executor子類,使用進程池執行異步調用.
class concurrent.futures.ProcessPoolExecutor(max_workers=None),使用 max_workers數目的進程池執行異步調用,如果max_workers為None則使用機器的處理器數目(如4核機器max_worker配置為None時,則使用4個進程進行異步並發)。
示例代碼:
# -*- coding:utf-8 -*- from concurrent import futures def test(num): import time return time.ctime(), num def muti_exec(m, n): # m 並發次數 # n 運行次數 with futures.ProcessPoolExecutor(max_workers=m) as executor: # 多進程 # with futures.ThreadPoolExecutor(max_workers=m) as executor: #多線程 executor_dict = dict((executor.submit(test, times), times) for times in range(m * n)) for future in futures.as_completed(executor_dict): times = executor_dict[future] if future.exception() is not None: print('%r generated an exception: %s' % (times, future.exception())) else: print('RunTimes:%d,Res:%s' % (times, future.result())) if __name__ == '__main__': muti_exec(5, 1)
調度單個任務
執行者類Executor調度單個任務,使用submit() 函數,然后用返回的 Future 實例等待任務結果。
Executor 是一個 Python concurrent.futures
模塊的抽象類。 它不能直接使用,我們需要使用以下具體子類之一 -
ThreadPoolExecutor:線程池
ProcessPoolExecutor:進程池
示例代碼:
from concurrent import futures import time import random def task(n): time.sleep(random.randint(1, 10)) return n executor = futures.ThreadPoolExecutor(max_workers=3) future = executor.submit(task, 5) print('future: {}'.format(future)) result = future.result() print('result: {}'.format(result))
線程池 和 進程池
- ThreadPoolExecutor 是
Executor
類的具體子類之一。 子類使用多線程,我們得到一個提交任務的線程池。 該池將任務分配給可用線程並安排它們運行。 - ProcessPoolExecutor 是
Executor
類的具體子類之一。 它使用多重處理,並且我們獲得提交任務的進程池。 此池將任務分配給可用的進程並安排它們運行。
如何創建一個 ThreadPoolExecutor 或者 ProcessPoolExecutor?
在concurrent.futures
模塊及其具體子類Executor
的幫助下,可以很容易地創建一個線程池或者進程池。 需要使用我們想要的池中的線程數構造一個ThreadPoolExecutor 或者 ProcessPoolExecutor
。 默認情況下,數字是5
。然后可以提交一個任務到線程池或者進程池。 當submit()
任務時,會返回Future
對象。 Future
對象有一個名為done()
的方法,它告訴Future
是否已經解決。 有了這個,為這個特定的Future
對象設定了一個值。 當任務完成時,線程池執行器將該值設置為Future
的對象。
線程池 示例代碼:
from concurrent.futures import ThreadPoolExecutor from time import sleep def task(message): sleep(2) return message def main(): executor = ThreadPoolExecutor(5) future = executor.submit(task, "Completed") print(future.done()) sleep(2) print(future.done()) print(future.result()) if __name__ == '__main__': main()
結果截圖:
在上面的例子中,一個ThreadPoolExecutor
已經由5個線程構造而成。 然后,在提供消息之前等待2秒的任務被提交給線程池執行器。 從輸出中可以看出,任務直到2
秒才完成,所以第一次調用done()
將返回False
。 2
秒后,任務完成,我們通過調用result()
方法得到future
的結果。
進程池 示例代碼:
from concurrent.futures import ProcessPoolExecutor from time import sleep def task(message): sleep(2) return message def main(): executor = ProcessPoolExecutor(5) future = executor.submit(task, ("Completed")) print(future.done()) sleep(2) print(future.done()) print(future.result()) if __name__ == '__main__': main()
實例化ThreadPoolExecutor 或者 ProcessPoolExecutor 之 上下文管理器
另一種實例化ThreadPoolExecutor
的方法是在上下文管理器的幫助下完成的。 它的工作方式與上例中使用的方法類似。 使用上下文管理器的主要優點是它在語法上看起來不錯。 實例化可以在下面的代碼的幫助下完成
with ThreadPoolExecutor(max_workers = 5) as executor
或者
with ProcessPoolExecutor(max_workers = 5) as executor
示例
以下示例是從 Python 文檔借用的。 在這個例子中,首先必須導入 concurrent.futures
模塊。 然后創建一個名為 load_url()
的函數,它將加載請求的url。 然后該函數用池中的5
個線程創建 ThreadPoolExecutor
。 ThreadPoolExecutor
已被用作上下文管理器。 我們可以通過調用 result()
方法來獲得 future
的結果。
import concurrent.futures import urllib.request URLS = [ 'http://www.foxnews.com/', 'https://www.yiibai.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/' ] def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
以下將是上面的Python腳本的輸出 -
-
'http: //some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
-
'http: //www.foxnews.com/' page is 229313 bytes
-
'http: //www.yiibai.com/' page is 168933 bytes
-
'http: //www.bbc.co.uk/' page is 283893 bytes
-
'http: //europe.wsj.com/' page is 938109 bytes
進程池:
import concurrent.futures from concurrent.futures import ProcessPoolExecutor import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() def main(): with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data))) if __name__ == '__main__': main()
使用 map() 調度多任務,有序返回
使用map(),多個worker並發地從輸入迭代器里取數據,處理,然后按順序返回結果。
示例代碼:
from concurrent import futures import time import random def task(n): time.sleep(random.randint(1, 10)) return n executor = futures.ThreadPoolExecutor(max_workers=3) results = executor.map(task, range(1, 10)) print('unprocessed results: {}'.format(results)) real_results = list(results) print('real results: {}'.format(real_results))
使用 Executor.map() 函數
Python map()
函數廣泛用於許多任務。 一個這樣的任務是對可迭代內的每個元素應用某個函數。 同樣,可以將迭代器的所有元素映射到一個函數,並將這些作為獨立作業提交到ThreadPoolExecutor
之外。 考慮下面的Python腳本示例來理解函數的工作原理。
示例
在下面的示例中,map
函數用於將square()
函數應用於values
數組中的每個值。
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed values = [2, 3, 4, 5] def square(n): return n * n def main(): with ThreadPoolExecutor(max_workers=3) as executor: results = executor.map(square, values) for result in results: print(result) if __name__ == '__main__': main()
以下將是上面的Python腳本的輸出 :
進程池:
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import as_completed values = [2, 3, 4, 5] def square(n): return n * n def main(): with ProcessPoolExecutor(max_workers=3) as executor: results = executor.map(square, values) for result in results: print(result) if __name__ == '__main__': main()
多任務調度,無序返回
不斷將任務submit到executor,返回future列表,使用as_completed無序產生每個任務的結果。
示例代碼:
from concurrent import futures import time import random def task(n): time.sleep(random.randint(1, 10)) return n executor = futures.ThreadPoolExecutor(max_workers=3) future_list = [executor.submit(task, i) for i in range(1, 10)] for f in futures.as_completed(future_list): print(f.result())
何時使用ProcessPoolExecutor 和 ThreadPoolExecutor ?
現在我們已經學習了兩個Executor
類 - ThreadPoolExecutor
和ProcessPoolExecutor
,我們需要知道何時使用哪個執行器。需要在受CPU限制的工作負載情況下選擇ProcessPoolExecutor
,而在受I/O限制的工作負載情況下則需要選擇ThreadPoolExecutor
。
如果使用ProcessPoolExecutor
,那么不需要擔心GIL,因為它使用多處理。 而且,與ThreadPoolExecution
相比,執行時間會更少。
轉載:https://blog.csdn.net/freeking101/article/details/97395745