前言
前面我們已經將線程並發編程與進程並行編程全部摸了個透,其實我第一次學習他們的時候感覺非常困難甚至是吃力。因為概念實在是太多了,各種鎖,數據共享同步,各種方法等等讓人十分頭痛。所以這邊要告訴你一個好消息,前面的所有學習的知識點其實都是為本章知識點做鋪墊,在學習了本章節的內容后關於如何使用多線程並發與多進程並行就采取本章節中介紹的方式即可。
這里要介紹一點與之前內容不同的地方,即如果使用隊列進行由進程池創建的進程之間數據共享的話不管是multiprocessing模塊下的Queue還是queue模塊下的Queue都不能為進程池中所創建的進程進行數據共享,我們需要用到另一個隊列即multiprocessing.Manager()中的Queue。當然這個我也會在下面介紹到。那么開始學習吧!
執行器
最早期的Python2中是沒有線程池這一概念的,只有進程池。直到Python3的出現才引入了線程池,其實關於他們的使用都是非常簡單,而且接口也是高度統一甚至說一模一樣的。而線程池與進程池的作用即是為了讓我們能夠更加便捷的管理線程或進程。
我們先說一下,如果需要使用線程池或進程池,需要導入模塊concurrent.futures。
from concurrent.futures import ThreadPoolExecutor# 線程池執行器
from concurrent.futures import ProcessPoolExecutor# 進程池執行器
這里介紹一下,關於線程池或者進程池創建出的線程與進程與我們使用multiprocessing模塊或者threading模塊中創建的線程或進程有什么區別。我們以多線程為例:
import threading def task(): ident = threading.get_ident() print(ident) # 銷毀當前執行任務的線程 if __name__ == '__main__': for i in range(10): t1 = threading.Thread(target=task,) # 領任務 t1.start() # 等待CPU調度,而不是立即執行 # 執行 # ==== 執行結果 ==== Ps:可以看到每個線程的id號都不一樣,這也印證了圖上說的。 """ 10392 12068 5708 13864 2604 7196 7324 9728 9664 472 """
import threading from concurrent.futures import ThreadPoolExecutor # 線程池執行器 def task(): ident = threading.get_ident() print(ident) # 結束任務,不銷毀當前執行任務的線程,直到所有任務都執行完畢。 if __name__ == '__main__': pool = ThreadPoolExecutor(max_workers=2) # 這里代表有2個線程可以領取任務 for i in range(10): pool.submit(task) # 執行器啟動任務,將這些任務給2個人分配,也就是說task這個任務會被這2個線程不斷的執行,直到執行完畢后這2個線程才會死亡 # ==== 執行結果 ==== Ps:可以看到這里都讓這2個線程把任務接了,內存開銷相比於上面的要小。 """ 7272 7272 7272 7272 11596 7272 11596 11596 11596 11596 """


方法大全
| 執行器方法大全 | |
|---|---|
| submit(fn, *args, **kwargs) | 調度可調用對象 fn,以 fn(*args **kwargs) 方式執行並返回 |
| map(func, *iterables, timeout=None, chunksize=1) | 類似於 |
| shutdown(wait=True) | 等待,類似join()方法,並且在所有的任務完成后關閉執行器。wait=True為關閉,為False則是不關閉執行器的意思。 |
| Ps:其實對於線程池或進程池來說,他們的池都有一個官方的名稱叫做執行器,接口都是一樣的。那么接下來我就將線程池進程池這樣的名字換做執行器了,也是方便理解。 | |
基本使用
其實關於執行器的使用,我們有兩種方式,一種是依賴於with語句,一種是不依賴於with語句,那么我在這里推薦使用依賴於wait語句的執行器。
不依賴於with語句的執行器使用:
import threading from concurrent.futures import ThreadPoolExecutor # 線程池執行器 def task(): print("執行了") if __name__ == '__main__': pool = ThreadPoolExecutor(max_workers=2) # 這里代表有2個線程可以領取任務 , 對於線程池來講它是默認值是CPU核心數+4,對於進程池來講最大開啟的進程數是CPU核心數。 for i in range(10): pool.submit(task) # 執行器啟動任務,將這些任務給2個人分配,也就是說task這個任務會被這2個線程不斷的執行,直到執行完畢后這2個線程才會死亡 # ==== 執行結果 ==== Ps:可以看到這里都讓這2個線程把任務接了,內存開銷相比於上面的要小。 """ 執行了 執行了 執行了 執行了 執行了 執行了 執行了 執行了 執行了 執行了 """
依賴於with語句的執行器使用:
import threading from concurrent.futures import ThreadPoolExecutor # 線程池執行器 def task(): print("執行了") # 銷毀 if __name__ == '__main__': with ThreadPoolExecutor(max_workers=2) as pool: # 這里代表有2個線程可以領取任務 , 對於線程池來講它是默認值是CPU核心數+4,對於進程池來講最大開啟的進程數是CPU核心數。 for i in range(10): pool.submit(task) # 執行器啟動任務,將這些任務給2個人分配,也就是說task這個任務會被這2個線程不斷的執行,直到執行完畢后這2個線程才會死亡 # ==== 執行結果 ==== Ps:可以看到這里都讓這2個線程把任務接了,內存開銷相比於上面的要小。 """ 執行了 執行了 執行了 執行了 執行了 執行了 執行了 執行了 執行了 執行了 """
期程對象
方法大全
| 期程對象(由執行器執行的任務的返回結果)方法大全 | |
|---|---|
| 方法/屬性名稱 | 功能描述 |
| cancel() | 嘗試取消調用。 如果調用正在執行或已結束運行不能被取消則該方法將返回 False,否則調用會被取消並且該方法將返回 True。 |
| cancelled() | 如果調用成功取消返回 True。 |
| running() | 如果調用正在執行而且不能被取消那么返回 True 。 |
| done() | 如果調用已被取消或正常結束那么返回 True。 |
| result(timeout=None) | 即獲取任務的返回結果,最大等待timeout秒,如不設置則死等,超時觸發CancelledError異常。 |
| add_done_callback(fn) | 增加回調函數fn,這個fn應該至少有一個形參來接收當前期程對象。 |
| exception(timeout=None) | 返回由調用引發的異常。如果調用還沒完成那么這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,concurrent.futures.TimeoutError將會被觸發。timeout 可以是整數或浮點數。如果 timeout 沒有指定或為 None,那么等待時間就沒有限制。 |
| Ps:還有一些期程對象的方法沒有舉例出來。詳情參見文檔 | |
期程對象的作用
我們可以看到,我們上面的函數並沒有返回值,如果有返回值的話怎么辦呢?
import threading from concurrent.futures import ThreadPoolExecutor # 線程池執行器 def task(): print("執行了") return "玫瑰花" # 銷毀 if __name__ == '__main__': with ThreadPoolExecutor(max_workers=2) as pool: res = pool.submit(task) print(res) # <Future at 0x2539ea97850 state=finished returned str> 這個就是期程對象,可以看到他里面還有當前任務的執行狀態。 finished = 執行完了的意思 print(res.result()) # 通過該方法就可以拿到任務的返回結果 # ==== 執行結果 ==== """ 執行了 <Future at 0x2539ea97850 state=finished returned str> 玫瑰花 """
期程對象,也被稱為未來對象,是一個非常重要的概念。這里可以記一筆,在Django框架中也有些地方采取了期程對象這樣的設定,這是后話,后面再聊。
期程對象如何獲取返回結果
我們嘗試着將它的任務數量增多,發現使用期程對象直接獲取任務結果會導致阻塞,怎么解決?
import time import threading from concurrent.futures import ThreadPoolExecutor # 線程池執行器 def task(x): print("執行了,這是第%s個任務"%x) time.sleep(3) return "玫瑰花" # 銷毀 if __name__ == '__main__': with ThreadPoolExecutor(max_workers=2) as pool: for i in range(10): res = pool.submit(task,i) print(res.result()) # 每次獲取結果的時候都是阻塞,怎么辦?這個速率就變得非常的Low逼了。 # ==== 執行結果 ==== """ 執行了,這是第0個任務 玫瑰花 執行了,這是第1個任務 玫瑰花 執行了,這是第2個任務 玫瑰花 執行了,這是第3個任務 玫瑰花 執行了,這是第4個任務 玫瑰花 執行了,這是第5個任務 玫瑰花 執行了,這是第6個任務 玫瑰花 執行了,這是第7個任務 玫瑰花 執行了,這是第8個任務 玫瑰花 執行了,這是第9個任務 玫瑰花 """
我這里有一個辦法,可以值得嘗試一下。就是執行器本身有個方法shutdown(wait=True),它會導致當前主線程的阻塞。那么我們就可以這樣操作,主程序阻塞住,再將啟程對象全部放到一個列表中,當所有任務處理完畢后阻塞通行,這個時候我們再循環這個列表拿出其中的結果。
import time import threading from concurrent.futures import ThreadPoolExecutor # 線程池執行器 def task(x): print("執行了,這是第%s個任務"%x) time.sleep(3) return "玫瑰花" # 銷毀 if __name__ == '__main__': res_list = [] # 用於存放所有期程對象 with ThreadPoolExecutor(max_workers=2) as pool: for i in range(10): res = pool.submit(task,i) res_list.append(res) # 將期程對象放入列表 pool.shutdown(wait=True) # 代表必須將所有子線程的任務跑完再繼續向下執行主線程。 for i in res_list: print(i.result()) # ==== 執行結果 ==== """ 執行了,這是第0個任務 執行了,這是第1個任務 執行了,這是第2個任務 執行了,這是第3個任務 執行了,這是第4個任務 執行了,這是第5個任務 執行了,這是第6個任務 執行了,這是第7個任務 執行了,這是第8個任務 執行了,這是第9個任務 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 """
如果你覺得這種方法很贊,我只能送你兩個字,太low了。我們注意執行器的submit()方法,這玩意兒是異步提交。異步提交的結果需要用到回調函數來進行調用,我們來看一下它有多牛逼。
回調函數
import time import threading from concurrent.futures import ThreadPoolExecutor # 線程池執行器 def task(x): print("執行了,這是第%s個任務"%x) time.sleep(3) return "玫瑰花" # 銷毀 def callback(res): # 必須有一個形參,來接收期程對象 print(res.result()) # 打印結果,即task任務的返回結果 if __name__ == '__main__': with ThreadPoolExecutor(max_workers=2) as pool: for i in range(10): res = pool.submit(task,i) res.add_done_callback(callback) # <--- 增加回調函數,當期程對象中的任務處理狀態完畢后將自動調用回調函數 # ==== 執行結果 ==== # 異步提交牛逼不?只要任務返回了我們立馬就可以獲取到結果進行處理。 """ 執行了,這是第0個任務 執行了,這是第1個任務 玫瑰花 玫瑰花 執行了,這是第2個任務 執行了,這是第3個任務 玫瑰花 玫瑰花 執行了,這是第4個任務 執行了,這是第5個任務 玫瑰花 玫瑰花 執行了,這是第6個任務 執行了,這是第7個任務 玫瑰花 玫瑰花 執行了,這是第8個任務 執行了,這是第9個任務 玫瑰花 玫瑰花 """
擴展:進程池執行器任務數據共享
當我們使用進程池執行器啟動多進程執行任務時,如果想用數據共享,單純multiprocessing.Queue進程隊列並不支持。
import multiprocessing from concurrent.futures import ProcessPoolExecutor # 進程池執行器 def task_1(q): q.put("玫瑰花") print("放完了...") def task_2(q): print(q.get()) print("取到了") if __name__ == '__main__': q = multiprocessing.Queue() with ProcessPoolExecutor(max_workers=2) as pool: pool.submit(task_1,q) pool.submit(task_2,q) # ==== 執行結果 ==== # 阻塞住 """ """
這個時候我們需要用到multiprocessing中的Manager()中的Queue。
from multiprocessing import Manager from concurrent.futures import ProcessPoolExecutor # 進程池執行器 def task_1(q): q.put("玫瑰花") print("放完了...") def task_2(q): print(q.get()) print("取到了") if __name__ == '__main__': q = Manager().Queue() with ProcessPoolExecutor(max_workers=2) as pool: pool.submit(task_1,q) pool.submit(task_2,q) # ==== 執行結果 ==== # 成功 """ 放完了... 玫瑰花 取到了 """
