一. 進程池
引入進程池 : from multiprocessing import Pool
創建進程池 : Pool( numprocess , initializer , initargs )
參數 : numprocess : 要創建的進程數,如果省略,默認使用cpu_count() + 1的值
initializer : 每個工作進程啟動時要執行的可調用對象,默認為None
initargs : 是要傳給initializer的參數組
主要方法 :
實例化一個進程池 : p = Pool()
1. p.apply ( func,args = ()) : 同步的效率,也就是說池中的進程一個一個的去執行任務
func : 進程池中的進程要執行的任務函數
args : 可迭代對象性參數,是傳給任務函數的參數
同步處理任務時,不需要close和join,並且進程池中的所有進程都是普通進程(主進程等其執行完再結束)
from multiprocessing import Pool import time def func(num): num += 1 return num if __name__ == '__main__': p = Pool(5) start = time.time() for i in range(10000): res = p.apply(func,args=(i,))# 同步處理這100個任務,同步是指,哪怕我進程中有5個進程,也依舊是1個進程1個進程的去執行任務 # time.sleep(0.5) print(res) print(time.time() - start)
2. p.apply_async( func,args = () , callback = None) : 異步的效率,也就是池中的進程一次性都去執行任務.
func : 進程池中的進程執行的任務函數
args : 可迭代對象性的參數,是傳給任務函數的參數
callback : 回調函數,就是每當進程池中有進程處理完任務了,返回的結果可以交給回調函數,由回調函數進行進一步處理,回調函數只異步才有,同步沒有.回調函數是父進程調用.
from multiprocessing import Process,Pool def func(i): i+=1 return i#普通進程處理過的數據返回給主進程p1 def call_back(p1): p1+=1 print(p1) if __name__ == '__main__': p = Pool() for i in range(10): p1 = p.apply_async(func,args=(i,),callback = call_back)#p調用普通進程並且接受其返回值,將返回值給要執行的回調函數處理 p.close() p.join()
異步處理任務時 : 必須要加上close和join. 進程池的所有進程都是守護進程(主進程代碼執行結束,守護進程就結束).
from multiprocessing import Pool import time def func(num): num += 1 return num if __name__ == '__main__': p = Pool(5) start = time.time() l = [] for i in range(10000): res = p.apply_async(func,args=(i,))# 異步處理這100個任務,異步是指,進程中有5個進程,一下就處理5個任務,接下來哪個進程處理完任務了,就馬上去接收下一個任務 l.append(res) # print(res.get())#也可以使用這種方法打印返回結果,但是會使異步進程變成同步,因為get()只能一個一個取 p.close() p.join() [print(i.get()) for i in l] print(time.time() - start)
3. map( func,iterable)
func : 進程池中的進程執行的任務函數
iterable : 可迭代對象,是把可迭代對象那個中的每個元素一次傳給任務函數當參數.
map方法自帶close和join
from multiprocessing import Pool def func(num): num += 1 print(num) return num if __name__ == '__main__': p = Pool(5) res = p.map(func,[i for i in range(100)]) # p.close()#map方法自帶這兩種功能 # p.join() print('主進程中map的返回值',res)