一、進程池
為什么要有進程池?進程池的概念。
在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。那么在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程么?首先,創建進程需要消耗時間,銷毀進程也需要消耗時間。第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,這樣反而會影響程序的效率。因此我們不能無限制的根據任務開啟或者結束進程。那么我們要怎么做呢?
在這里,要給大家介紹一個進程池的概念,定義一個池子,在里面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那么同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現並發效果。
二、概念介紹——multiprocess.Pool
Pool([numprocess [,initializer [, initargs]]])
:創建進程池
三、參數用法
- numprocess:要創建的進程數,如果省略,將默認使用
cpu_count()
的值 - initializer:是每個工作進程啟動時要執行的可調用對象,默認為None
- initargs:是要傳給initializer的參數組
四、主要方法
p.apply(func [, args [, kwargs]])
:在一個池工作進程中執行func(*args,**kwargs),然后返回結果。需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數並發地執行func函數,必須從不同線程調用p.apply()
函數或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]])
:在一個池工作進程中執行func(*args,**kwargs),然后返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。
p.close()
:關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成
P.join()
:等待所有工作進程退出。此方法只能在close()
或teminate()
之后調用
五、其他方法(了解)
方法apply_async()
和map_async()
的返回值是AsyncResul的實例obj。實例具有以下方法:
obj.get()
:返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。
obj.ready()
:如果調用完成,返回True
obj.successful()
:如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常
obj.wait([timeout])
:等待結果變為可用。
obj.terminate()
:立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數
六、代碼實例——multiprocess.Pool
6.1 同步
import os,time
from multiprocessing import Pool
def work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2
if __name__ == '__main__':
p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務
res_l=[]
for i in range(10):
res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞
# 但不管該任務是否存在阻塞,同步調用都會在原地等着
print(res_l)
6.2 異步
import os
import time
import random
from multiprocessing import Pool
def work(n):
print('%s run' %os.getpid())
time.sleep(random.random())
return n**2
if __name__ == '__main__':
p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務
res_l=[]
for i in range(10):
res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行
# 返回結果之后,將結果放入列表,歸還進程,之后再執行新的任務
# 需要注意的是,進程池中的三個進程不會同時開啟或者同時結束
# 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。
res_l.append(res)
# 異步apply_async用法:如果使用異步提交的任務,主進程需要使用jion,等待進程池內任務都處理完,然后可以用get收集結果
# 否則,主進程結束,進程池可能還沒來得及執行,也就跟着一起結束了
p.close()
p.join()
for res in res_l:
print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get
七、進程池版socket並發聊天練習
7.1 server
#Pool內的進程數默認是cpu核數,假設為4(查看方法os.cpu_count())
#開啟6個客戶端,會發現2個客戶端處於等待狀態
#在每個進程內查看pid,會發現pid使用為4個,即多個客戶端公用4個進程
from socket import *
from multiprocessing import Pool
import os
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)
def talk(conn):
print('進程pid: %s' %os.getpid())
while True:
try:
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__':
p=Pool(4)
while True:
conn,*_=server.accept()
p.apply_async(talk,args=(conn,))
# p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問
7.2 client
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))
發現:並發開啟多個客戶端,服務端同一時間只有4個不同的pid,只能結束一個客戶端,另外一個客戶端才會進來。
八、回調函數
需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
8.1 使用多進程請求多個url來減少網絡等待浪費的時間
from multiprocessing import Pool
import requests
import json
import os
def get_page(url):
print('<進程%s> get %s' %(os.getpid(),url))
respone=requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}
def pasrse_page(res):
print('<進程%s> parse %s' %(os.getpid(),res['url']))
parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)
if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
]
p=Pool(3)
res_l=[]
for url in urls:
res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
res_l.append(res)
p.close()
p.join()
print([res.get() for res in res_l]) #拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了
'''
打印結果:
<進程3388> get https://www.baidu.com
<進程3389> get https://www.python.org
<進程3390> get https://www.openstack.org
<進程3388> get https://help.github.com/
<進程3387> parse https://www.baidu.com
<進程3389> get http://www.sina.com.cn/
<進程3387> parse https://www.python.org
<進程3387> parse https://help.github.com/
<進程3387> parse http://www.sina.com.cn/
<進程3387> parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]
'''
8.2 爬蟲實例
import re
from urllib.request import urlopen
from multiprocessing import Pool
def get_page(url,pattern):
response=urlopen(url).read().decode('utf-8')
return pattern,response
def parse_page(info):
pattern,page_content=info
res=re.findall(pattern,page_content)
for item in res:
dic={
'index':item[0].strip(),
'title':item[1].strip(),
'actor':item[2].strip(),
'time':item[3].strip(),
}
print(dic)
if __name__ == '__main__':
regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
pattern1=re.compile(regex,re.S)
url_dic={
'http://maoyan.com/board/7':pattern1,
}
p=Pool()
res_l=[]
for url,pattern in url_dic.items():
res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
res_l.append(res)
for i in res_l:
i.get()
九、無需回調函數
如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數。
from multiprocessing import Pool
import time,random,os
def work(n):
time.sleep(1)
return n**2
if __name__ == '__main__':
p=Pool()
res_l=[]
for i in range(10):
res=p.apply_async(work,args=(i,))
res_l.append(res)
p.close()
p.join() #等待進程池中所有進程執行完畢
nums=[]
for res in res_l:
nums.append(res.get()) #拿到所有結果
print(nums) #主進程拿到所有的處理結果,可以在主進程中進行統一進行處理
進程池的其他實現方法:https://docs.python.org/dev/library/concurrent.futures.html