一、為什么要有進程池
首先,創建進程需要消耗時間,銷毀進程也需要時間。其次,即使開啟了成千上萬的進程,操作系統也不能讓它們同時執行,這樣反而會影響程序的效率。因此我們不能無限制的根據任務開啟或者結束進程。
進程池:定義了一個池子,在里面放上固定數量的進程,有需求來了,就拿這個池中的一個進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待認為。如果有許多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。
總結:也就是說,池中進程的數量是固定的,那么同一時間最多有固定數量的進程再運行。這樣 不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現並發效果。
二、進程池和多進程效率對比
import os import time import random from multiprocessing import Pool from multiprocessing import Process def func(i): i += 1 if __name__ == '__main__': p = Pool(5) # 創建了5個進程 start = time.time() p.map(func,range(1000))
p.close() # 是不允許再向進程池中添加任務 p.join() print(time.time() - start) # 0.35544490814208984 start = time.time() l = [] for i in range(1000): p = Process(target=func,args=(i,)) # 創建了一百個進程 p.start() l.append(p) [i.join() for i in l] print(time.time() - start) # 101.00088691711426

import os,time from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞 # 但不管該任務是否存在阻塞,同步調用都會在原地等着 print(res_l)

import os import time import random from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(random.random()) return n**2 if __name__ == '__main__': p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行 # 返回結果之后,將結果放入列表,歸還進程,之后再執行新的任務 # 需要注意的是,進程池中的三個進程不會同時開啟或者同時結束 # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。 res_l.append(res) # 異步apply_async用法:如果使用異步提交的任務,主進程需要使用jion,等待進程池內任務都處理完,然后可以用get收集結果 # 否則,主進程結束,進程池可能還沒來得及執行,也就跟着一起結束了 p.close() p.join() for res in res_l: print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get
通過進程池利用socket實現並發聊天

#Pool內的進程數默認是cpu核數,假設為4(查看方法os.cpu_count()) #開啟6個客戶端,會發現2個客戶端處於等待狀態 #在每個進程內查看pid,會發現pid使用為4個,即多個客戶端公用4個進程 from socket import * from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): print('進程pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p=Pool(4) while True: conn,*_=server.accept() p.apply_async(talk,args=(conn,)) # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問

from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))

import os import time from multiprocessing import Pool # 參數 概念 回調函數 def func(i): # 多進程中的io多, print('子進程%s:%s'%(i,os.getpid())) return i*'*' def call(arg): # 回調函數是在主進程中完成的,不能傳參數,只能接受多進程中函數的返回值 print('回調 :',os.getpid()) print(arg) if __name__ == '__main__': print('---->',os.getpid()) p = Pool(5) for i in range(10): p.apply_async(func,args=(i,),callback=call) p.close() p.join()
如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數。

from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待進程池中所有進程執行完畢 nums=[] for res in res_l: nums.append(res.get()) #拿到所有結果 print(nums) #主進程拿到所有的處理結果,可以在主進程中進行統一進行處理