本文將介紹使用multiprocessing進行多線程和多進程操作。
多線程
如果我需要對一個列表的每一個元素都要執行一個函數操作,並且每個元素執行的操作互不影響,那我們可以讓列表里的所有元素在同時進行執行,而不是使用for循環讓其一個一個執行,這種操作常常在獨寫文件中處理,比如我們要下載一系列的圖片,數量很多如果一個一個下載,就會讓很多cpu資源閑置,這時候我們就可以使用多線程及進行。
我們通過multiprocessing.pool.ThreadPool類創造一個線程池,可以指定內部線程個數(同時對列表的多少元素同時進行)。之后使用map方法,或者apply_async方法對數據進行異步同時對多個元素操作,下面直接看例子。
例子一:map方法
from multiprocessing.pool import ThreadPool import time def func(p): print(p) time.sleep(3) tic = time.time() my_task = [1,2,3,4,5,6,7,8,9,10] pool = ThreadPool(5) pool.map(func, my_task) # 把任務放入線程池,進行 toc = time.time() print('costs: {}s'.format(toc - tic))
1 2 3 4 5 6 7 8 9 10 costs: 6.009862422943115s
可以根據執行時間看到,首先是對前5個元素進行了函數func操作,等待了3秒后再對后面5個元素進行操作,完成了並行運算。不過我們要注意的是map方法的函數func只能接受1個傳參,所以如果你有多個參數需要傳入,那map方式就不適合了(當然你可以通過傳入一個元組然后在內部進行讀取也可以實現傳出“多個”參數的目的)。還有提醒一點的是,如果線程沒有結束,那整個程序是不會結束的(稱為父進程堵塞)
方式二:apply_async方法
from multiprocessing.pool import ThreadPool import time def func(p,s): print(p,s) time.sleep(3) tic = time.time() my_task = [1,2,3,4,5,6,7,8,9,10] pool = ThreadPool(5) for item in my_task: pool.apply_async(func, args=(item,"可以")) # 把任務放入線程池,進行 toc = time.time() pool.close() # 關閉線程池 pool.join() # 等待所有任務完成 print('costs: {}s'.format(toc - tic))
costs: 0.0029494762420654297s 1 可以 2 可以 3 可以 4 可以 5 可以 6 可以 7 可以 8 可以 9 可以 10 可以
大家可能會疑惑,為啥是先打印costs:0.00294呢?其實這就是apply_async方法相對於map方法的差別了,剛說到map方法是會阻塞父進程的,換句話說只有當你的元素都執行完之后,才會繼續執行pool.map后面的語句,而pool.apply_async則只需要把任務放進去就會立刻開始執行之后的命令,不需要等待放入線程池的任務執行完成,換句話說,這里的costs:0.00294只是刻畫的for循環的時間,並不是我的元素在func中只想完成的時間,這稱為不阻塞父進程。這個方法的優勢就是func是可以有多個傳參的。
以上是multiprocessing.pool進行多線程操作的簡單運用,推薦是使用多線程進行IO操作,互不影響。
多進程
在一些模塊的調用上需要開啟新的進程才能工作,例如使用ffmpeg這些包來抽取圖片單幀信息,這個包只能在一個進程中使用,因此如果你把這個抽去任務放到線程池中其實也就是在一個一個抽取,因此在一些任務上我們需要采用多進程的方式進行,我們就可以利用multiprocessing中的Pool模塊(注意這里是大寫)。以下是模板,其實和線程池的操作一樣這里采用的是map函數(具體見上文)
from multiprocessing import Pool
def func(item):
pass
pool = Pool(processes=8) # 設置進程池的大小
pool.map(func, load_dict) # 其中load_dict是一個列表或者可迭代對象
pool.close()
pool.join()
但是要注意這樣代表着開啟了新的進程,那意味着每個進程之間是無法通信的,也就是看不到對方的東西的,因此進程池一般用於儲存等操作。如果非要想進程間進行通訊,例如我想用一個列表儲存一下無法被讀取的文件名,則可以使用multiprocessing中的Manager模塊。使得各個進程中都能看到這個List,代碼模板。
from multiprocessing import Pool, Manager
manager = Manager()
transition_label_list = manager.list() # 注冊一個列表可以被所有進程看到的列表
def func(mp4):
transition_label_list.append(...)
pass
pool = Pool(processes=8)
pool.map(func, load_dict)
pool.close()
pool.join()
print('多進程完成')
這里要注意到,如果你要把transition_label_list儲存到本地,最好是先使用list(transition_label_list)將其轉化為列表(transition_label_list此時的類型並不是List而是一個Manager的一個類)之后再儲存。
參考網站:
python3的multiprocessing多進程-Pool進程池模塊 - 整合俠 - 博客園 (cnblogs.com)
Python 多進程之間共享變量 - 知乎 (zhihu.com)
【python】詳解multiprocessing多進程-Pool進程池模塊(二)_brucewong0516的博客-CSDN博客_python多進程pool