進程池和multiprocess.Pool模塊


一、為什么要有進程池

  首先,創建進程需要消耗時間,銷毀進程也需要時間。其次,即使開啟了成千上萬的進程,操作系統也不能讓它們同時執行,這樣反而會影響程序的效率。因此我們不能無限制的根據任務開啟或者結束進程。

進程池:定義了一個池子,在里面放上固定數量的進程,有需求來了,就拿這個池中的一個進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待認為。如果有許多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。

總結:也就是說,池中進程的數量是固定的,那么同一時間最多有固定數量的進程再運行。這樣 不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現並發效果。

二、進程池和多進程效率對比

import os
import time
import random
from multiprocessing import Pool
from multiprocessing import Process
def func(i):
    i += 1
if __name__ == '__main__':
    p = Pool(5)          # 創建了5個進程
    start = time.time()
    p.map(func,range(1000))  
p.close() # 是不允許再向進程池中添加任務 p.join() print(time.time() - start) # 0.35544490814208984 start = time.time() l = [] for i in range(1000): p = Process(target=func,args=(i,)) # 創建了一百個進程 p.start() l.append(p) [i.join() for i in l] print(time.time() - start) # 101.00088691711426
import os,time
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(3)
    return n**2

if __name__ == '__main__':
    p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞
                                    # 但不管該任務是否存在阻塞,同步調用都會在原地等着
    print(res_l)
進程池的同步調用
import os
import time
import random
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(random.random())
    return n**2

if __name__ == '__main__':
    p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行
                                          # 返回結果之后,將結果放入列表,歸還進程,之后再執行新的任務
                                          # 需要注意的是,進程池中的三個進程不會同時開啟或者同時結束
                                          # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。  
        res_l.append(res)

    # 異步apply_async用法:如果使用異步提交的任務,主進程需要使用jion,等待進程池內任務都處理完,然后可以用get收集結果
    # 否則,主進程結束,進程池可能還沒來得及執行,也就跟着一起結束了
    p.close()
    p.join()
    for res in res_l:
        print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get
進程池的異步調用

 

通過進程池利用socket實現並發聊天

#Pool內的進程數默認是cpu核數,假設為4(查看方法os.cpu_count())
#開啟6個客戶端,會發現2個客戶端處於等待狀態
#在每個進程內查看pid,會發現pid使用為4個,即多個客戶端公用4個進程
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn):
    print('進程pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool(4)
    while True:
        conn,*_=server.accept()
        p.apply_async(talk,args=(conn,))
        # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問
server端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
client端

 

import os
import time
from multiprocessing import Pool
# 參數 概念 回調函數
def func(i):    # 多進程中的io多,
    print('子進程%s:%s'%(i,os.getpid()))
    return i*'*'

def call(arg):   # 回調函數是在主進程中完成的,不能傳參數,只能接受多進程中函數的返回值
    print('回調 :',os.getpid())
    print(arg)

if __name__ == '__main__':
    print('---->',os.getpid())
    p = Pool(5)
    for i in range(10):
        p.apply_async(func,args=(i,),callback=call)
    p.close()
    p.join()
回調函數

如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數。

from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待進程池中所有進程執行完畢

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到所有結果
    print(nums) #主進程拿到所有的處理結果,可以在主進程中進行統一進行處理
無需回調函數

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM