一 進程池
在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多台主機,並行操作可以節約大量的時間。多進程是實現並發的手段之一,需要注意的問題是:
- 很明顯需要並發執行的任務通常要遠大於核數
- 一個操作系統不可能無限開啟進程,通常有幾個核就開幾個進程
- 進程開啟過多,效率反而會下降(開啟進程是需要占用系統資源的,而且開啟多余核數目的進程也無法做到並行)
例如當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。
我們就可以通過維護一個進程池來控制進程數目,比如httpd的進程模式,規定最小進程數和最大進程數...
ps:對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,就重用進程池中的進程。
創建進程池的類:如果指定numprocess為3,則進程池會從無到有創建三個進程,然后自始至終使用這三個進程去執行所有任務,不會開啟其他進程
Pool([numprocess [,initializer [, initargs]]]):創建進程池
參數介紹:
1 numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值 2 initializer:是每個工作進程啟動時要執行的可調用對象,默認為None 3 initargs:是要傳給initializer的參數組
方法介紹:
p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數並發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async() p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。 p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之后調用
其他方法(了解部分)
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法
obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。
obj.ready():如果調用完成,返回True
obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常
obj.wait([timeout]):等待結果變為可用。
obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數
應用:

from multiprocessing import Pool import os,time def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) #同步運行,阻塞、直到本次任務執行完畢拿到res res_l.append(res) print(res_l)

from multiprocessing import Pool import os,time def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) #異步apply_async用法:如果使用異步提交的任務,主進程需要使用jion,等待進程池內任務都處理完,然后可以用get收集結果,否則,主進程結束,進程池可能還沒來得及執行,也就跟着一起結束了 p.close() #禁止往進程池內再添加任務 p.join() for res in res_l: print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get

#一:使用進程池(非阻塞,apply_async) #coding: utf-8 from multiprocessing import Pool import time def func(msg): print( "msg:", msg) time.sleep(0.1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply_async(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去 res_l.append(res) print("==============================>") #沒有后面的join,或get,則程序整體結束,進程池中的任務還沒來得及全部執行完也都跟着主進程一起結束了 pool.close() #關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成 pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束 print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join后執行的,證明結果已經計算完畢,剩下的事情就是調用每個對象下的get方法去獲取結果 for i in res_l: print(i.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get #二:使用進程池(阻塞,apply) #coding: utf-8 from multiprocessing import Pool import time def func(msg): print( "msg:", msg) time.sleep(0.1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去 res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另外一個 print("==============================>") pool.close() pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束 print(res_l) #看到的就是最終的結果組成的列表 for i in res_l: #apply是同步的,所以直接得到結果,沒有get()方法 print(i)
使用進程池維護固定數目的進程

#Pool內的進程數默認是cpu核數,假設為4(查看方法os.cpu_count()) #開啟6個客戶端,會發現2個客戶端處於等待狀態 #在每個進程內查看pid,會發現pid使用為4個,即多個客戶端公用4個進程 from socket import * from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): print('進程pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p=Pool() while True: conn,client_addr=server.accept() p.apply_async(talk,args=(conn,client_addr)) # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問

from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
發現:並發開啟多個客戶端,服務端同一時間只有4個不同的pid,干掉一個客戶端,另外一個客戶端才會進來,被4個進程之一處理
二 回調函數
需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

from multiprocessing import Pool import requests import os def get_page(url): print('<進程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<進程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) #拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了 ''' 打印結果: <進程3388> get https://www.baidu.com <進程3389> get https://www.python.org <進程3390> get https://www.openstack.org <進程3388> get https://help.github.com/ <進程3387> parse https://www.baidu.com <進程3389> get http://www.sina.com.cn/ <進程3387> parse https://www.python.org <進程3387> parse https://help.github.com/ <進程3387> parse http://www.sina.com.cn/ <進程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] '''

from multiprocessing import Pool import requests import re def get_page(url,pattern): response=requests.get(url) if response.status_code == 200: return (response.text,pattern) def parse_page(info): page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } print(dic) if __name__ == '__main__': pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get() # res=requests.get('http://maoyan.com/board/7') # print(re.findall(pattern,res.text))
如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數

from multiprocessing import Pool import time def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待進程池中所有進程執行完畢 nums=[] for res in res_l: nums.append(res.get()) #拿到所有結果 print(nums) #主進程拿到所有的處理結果,可以在主進程中進行統一進行處理