通過tqdm,可以很方便地查看一項長耗時任務的執行進度。
為了提升效率,有時可以將任務拆分,提交到多個進程上執行,再將結果匯總。那么,利用tqdm是否可以對多進程中的任務進行進度監控呢?
我對此進行了實驗。
1、環境及版本
操作系統:macOS Big Sur
python版本:3.7.6
tqdm版本:4.42.1
2、測試代碼
2.1 方法一
已經知道,使用tqdm
最直接的方法是:
from tqdm import tqdm
for i in tqdm(range(int(1e6))):
pass
通過查看tqdm
類的源碼可以發現,按照上述方式調用時,range(...)
部分對應的形參實際是iterable
,即tqdm(iterable=range(int(1e6)))
。此外,tqdm
還可以接受一個total
參數,指的是需要計算的總次數。
基於上述原理,我們可以使用multiprocessing.Pool
對象的imap(imap_unordered)
方法來實現對多進程任務的進度追蹤。
具體地,代碼如下:
from multiprocessing import Pool
import tqdm
import time
def worker(i):
# 執行任務的函數
time.sleep(3)
pass
if __name__ == '__main__':
p = Pool(4)
start_time = time.time()
list(tqdm.tqdm(iterable=(p.imap(worker, range(10))), total=10))
print(f"共耗時{int(time.time() - start_time)}s")
p.close()
p.join()
由於Pool.imap()
返回的也是一個迭代器,且每個進程的worker
完成后就馬上返回結果,因此,上述方式可以監控到整體的進度。
實際上,如果不關注返回值的順序,使用Pool.imap_unordered()
能夠獲取更准確的進度,這是因為它的返回值是無序的。而imap
的有序將導致如果前面的任務較為耗時,即使后面的任務已經完成,也必須等到前面的完成后才能告訴tqdm更新進度。關於這一點,可以將上述worker
函數改為如下的形式,然后分別使用imap()
和imap_unordered()
進行驗證。
...
def worker(i):
# 執行任務的函數
if i == 0:
sleep(5)
else:
sleep(0.1)
...
不能使用
map()
/map_async()
方法。關於它們的區別,可以看我的上一篇文章。
2.2 方法二
tqdm其實通過concurrent.futures
包裝了對多進程的支持,使用起來更加的簡單:
from time import sleep
from tqdm.contrib.concurrent import process_map
def worker(i):
# 執行任務的函數
if i == 0:
sleep(5)
else:
sleep(0.1)
return i
results = process_map(worker, range(10), max_workers=4)
results
會存儲每個參數對應的計算結果。
不過這種方法的進度是按照參數傳入的順序進行更新的。也就是說,如果執行上述代碼,那么進度會卡在第一個位置上5s,然后瞬間變為100%;如果將range(1)
改為range(9, -1, -1)
那么進度的表現為:在不到1s的時間內(0.9s)完成90%,然后再等待(4.1s)完成剩余的10%。