一,共享數據
展望未來,基於消息傳遞的並發編程是大勢所趨
即便是使用線程,推薦做法也是將程序設計為大量獨立的線程集合
通過消息隊列交換數據。這樣極大地減少了對使用鎖定和其他同步手段的需求,
還可以擴展到分布式系統中
進程間通信應該盡量避免使用本節所講的共享數據的方式
進程間數據是獨立的,可以借助於隊列或管道實現通信,二者都是基於消息傳遞的 雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止於此 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,
from multiprocessing import Manager,Process,Lock
import os
def work(d,lock):
# with lock: #不加鎖而操作共享的數據,肯定會出現數據錯亂
d['count']-=1
if __name__ == '__main__':
lock=Lock()
with Manager() as m:
dic=m.dict({'count':100})
p_l=[]
for i in range(100):
p=Process(target=work,args=(dic,lock))
p_l.append(p)
p.start()
for p in p_l:
p.join()
print(dic)
#{'count': 94}
1 # 共享文件,速度慢硬盤IO,沒有鎖 2 # 內存級別 3 # 隊列,有鎖,速度快 4 # 沒有鎖就是數據可能不安全 5 # 管道,隔離的進程之間通信,沒有鎖 6 7 # IPC就是進程之間怎么通信,2種方式,隊列和管道 8 9 # 共享數據,共享內存最原始的方式,比如共享字典,沒有鎖,跟管道一樣 10 # Manager 共享數據 11 # 代碼一 12 from multiprocessing import Manager,Process 13 def work(dic): 14 dic['count']-=1 15 # if __name__ == '__main__': 16 # m=Manager() 17 # share_dic=m.dict({'count':100}) 18 # p_l=[] 19 # for i in range(100): #開100個進程 20 # p=Process(target=work,args=(share_dic,)) 21 # p_l.append(p) 22 # p.start() #這里只是發信號 23 # # p.join() #放在這里就是串行 24 # for i in p_l: #可能出現同時寫,因為共享數據 25 # i.join() 26 # print(share_dic) 27 28 # 加鎖共享數據,不會對數據產生修改 29 from multiprocessing import Manager,Process,Lock 30 def work(dic,mutex): 31 # mutex.acquire() #加鎖的2重寫法 32 # dic['count']-=1 33 # mutex.release() 34 with mutex: 35 dic['count']-=1 36 37 if __name__ == '__main__': 38 mutex=Lock() 39 m=Manager() 40 share_dic=m.dict({'count':100}) 41 p_l=[] 42 for i in range(100): 43 p=Process(target=work,args=(share_dic,mutex)) 44 p_l.append(p) 45 p.start() 46 for i in p_l: 47 i.join() 48 print(share_dic)
二,開啟進程池
在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多台主機,並行操作可以節約大量的時間。多進程是實現並發的手段之一,需要注意的問題是:
- 很明顯需要並發執行的任務通常要遠大於核數
- 一個操作系統不可能無限開啟進程,通常有幾個核就開幾個進程
- 進程開啟過多,效率反而會下降(開啟進程是需要占用系統資源的,而且開啟多余核數目的進程也無法做到並行)
例如當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。
我們就可以通過維護一個進程池來控制進程數目,比如httpd的進程模式,規定最小進程數和最大進程數...
ps:對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,就重用進程池中的進程。
創建進程池的類:如果指定numprocess為3,則進程池會從無到有創建三個進程,然后自始至終使用這三個進程去執行所有任務,不會開啟其他進程
1 Pool([numprocess [,initializer [, initargs]]]):創建進程池
參數介紹:
1 numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值 2 initializer:是每個工作進程啟動時要執行的可調用對象,默認為None 3 initargs:是要傳給initializer的參數組
方法介紹:
1 p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數並發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async() 2 p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。 3 4 p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成 5 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之后調用
其他方法(了解部分)
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法 obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。 obj.ready():如果調用完成,返回True obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常 obj.wait([timeout]):等待結果變為可用。 obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數
應用:
from multiprocessing import Pool
import os,time
def work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2
if __name__ == '__main__':
p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務
res_l=[]
for i in range(10):
res=p.apply(work,args=(i,)) #同步運行,阻塞、直到本次任務執行完畢拿到res
res_l.append(res)
print(res_l)
from multiprocessing import Pool
import os,time
def work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2
if __name__ == '__main__':
p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務
res_l=[]
for i in range(10):
res=p.apply_async(work,args=(i,)) #同步運行,阻塞、直到本次任務執行完畢拿到res
res_l.append(res)
#異步apply_async用法:如果使用異步提交的任務,主進程需要使用jion,等待進程池內任務都處理完,然后可以用get收集結果,否則,主進程結束,進程池可能還沒來得及執行,也就跟着一起結束了
p.close()
p.join()
for res in res_l:
print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get
#一:使用進程池(非阻塞,apply_async)
#coding: utf-8
from multiprocessing import Process,Pool
import time
def func(msg):
print( "msg:", msg)
time.sleep(1)
return msg
if __name__ == "__main__":
pool = Pool(processes = 3)
res_l=[]
for i in range(10):
msg = "hello %d" %(i)
res=pool.apply_async(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去
res_l.append(res)
print("==============================>") #沒有后面的join,或get,則程序整體結束,進程池中的任務還沒來得及全部執行完也都跟着主進程一起結束了
pool.close() #關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成
pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束
print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join后執行的,證明結果已經計算完畢,剩下的事情就是調用每個對象下的get方法去獲取結果
for i in res_l:
print(i.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get
#二:使用進程池(阻塞,apply)
#coding: utf-8
from multiprocessing import Process,Pool
import time
def func(msg):
print( "msg:", msg)
time.sleep(0.1)
return msg
if __name__ == "__main__":
pool = Pool(processes = 3)
res_l=[]
for i in range(10):
msg = "hello %d" %(i)
res=pool.apply(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去
res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另外一個
print("==============================>")
pool.close()
pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束
print(res_l) #看到的就是最終的結果組成的列表
for i in res_l: #apply是同步的,所以直接得到結果,沒有get()方法
print(i)
練習2:使用進程池維護固定數目的進程(重寫練習1)
#Pool內的進程數默認是cpu核數,假設為4(查看方法os.cpu_count())
#開啟6個客戶端,會發現2個客戶端處於等待狀態
#在每個進程內查看pid,會發現pid使用為4個,即多個客戶端公用4個進程
from socket import *
from multiprocessing import Pool
import os
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)
def talk(conn,client_addr):
print('進程pid: %s' %os.getpid())
while True:
try:
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__':
p=Pool()
while True:
conn,client_addr=server.accept()
p.apply_async(talk,args=(conn,client_addr))
# p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))
發現:並發開啟多個客戶端,服務端同一時間只有3個不同的pid,干掉一個客戶端,另外一個客戶端才會進來,被3個進程之一處理
三,進程回調函數
回掉函數:
需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
from multiprocessing import Pool
import requests
import json
import os
def get_page(url):
print('<進程%s> get %s' %(os.getpid(),url))
respone=requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}
def pasrse_page(res):
print('<進程%s> parse %s' %(os.getpid(),res['url']))
parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)
if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
]
p=Pool(3)
res_l=[]
for url in urls:
res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
res_l.append(res)
p.close()
p.join()
print([res.get() for res in res_l]) #拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了
'''
打印結果:
<進程3388> get https://www.baidu.com
<進程3389> get https://www.python.org
<進程3390> get https://www.openstack.org
<進程3388> get https://help.github.com/
<進程3387> parse https://www.baidu.com
<進程3389> get http://www.sina.com.cn/
<進程3387> parse https://www.python.org
<進程3387> parse https://help.github.com/
<進程3387> parse http://www.sina.com.cn/
<進程3387> parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]
'''
from multiprocessing import Pool
import time,random
import requests
import re
def get_page(url,pattern):
response=requests.get(url)
if response.status_code == 200:
return (response.text,pattern)
def parse_page(info):
page_content,pattern=info
res=re.findall(pattern,page_content)
for item in res:
dic={
'index':item[0],
'title':item[1],
'actor':item[2].strip()[3:],
'time':item[3][5:],
'score':item[4]+item[5]
}
print(dic)
if __name__ == '__main__':
pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)
url_dic={
'http://maoyan.com/board/7':pattern1,
}
p=Pool()
res_l=[]
for url,pattern in url_dic.items():
res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
res_l.append(res)
for i in res_l:
i.get()
# res=requests.get('http://maoyan.com/board/7')
# print(re.findall(pattern,res.text))
如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數
from multiprocessing import Pool
import time,random,os
def work(n):
time.sleep(1)
return n**2
if __name__ == '__main__':
p=Pool()
res_l=[]
for i in range(10):
res=p.apply_async(work,args=(i,))
res_l.append(res)
p.close()
p.join() #等待進程池中所有進程執行完畢
nums=[]
for res in res_l:
nums.append(res.get()) #拿到所有結果
print(nums) #主進程拿到所有的處理結果,可以在主進程中進行統一進行處理

