一,前言
-
進程:是程序,資源集合,進程控制塊組成,是最小的資源單位
- 特點:就對Python而言,可以實現真正的並行效果
- 缺點:進程切換很容易消耗cpu資源,進程之間的通信相對線程來說比較麻煩
-
線程:是進程中最小的執行單位。
- 特點無法利用多核,無法實現真正意義上是並行效果。
- 優點:對於IO密集型的操作可以很好利用IO阻塞的時間
二,多進程
2.1 multiprocessing模塊介紹
在上一節多線程中講到,由於GIL的原因,多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing。multiprocessing模塊用來開啟子進程,並在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
2.2 process類介紹
類的實例化(創建進程) Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動) 強調: 1. 需要使用關鍵字的方式來指定參數 2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號 參數介紹 group參數未使用,值始終為None target表示調用對象,即子進程要執行的任務 args表示調用對象的位置參數元組,args=(1,2,'hexin',) kwargs表示調用對象的字典,kwargs={'name':'hexin','age':18} name為子進程的名稱 # process方法 p.start():啟動進程,並調用該子進程中的p.run() p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖 p.is_alive():如果p仍然運行,返回True p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程
# process 屬性
p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置 p.name:進程的名稱 p.pid:進程的pid p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可) p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
2.3 多進程創建
創建形式一:普通形式
import time import random from multiprocessing import Process def fun(name): print('%s begin' %name) time.sleep(random.randrange(1, 3)) print('%s end' % name) if __name__ == '__main__': p1 = Process(target=fun, args=('w',)) p2 = Process(target=fun,args=('a',)) p3 = Process(target=fun,args=('l',)) p4 = Process(target=fun,args=('l',)) p1.start() p2.start() p3.start() p4.start() print('主線程')
輸出結果:
主線程
w begin
a begin
l begin
l begin
a end
l end
w end
l end
創建方式二:繼承方式
import time import random from multiprocessing import Process class Sleep(Process): def __init__(self,name): super().__init__() self.name = name def run(self): print('%s sleep begin' % self.name) time.sleep(random.randrange(1,5)) print('%s end' % self.name) if __name__ == '__main__': for i in ['a', 'b', 'c']: Sleep(i).start() print("main")
輸出:
main
b sleep begin
a sleep begin
c sleep begin
c end
b end
a end
2.4 進程同步
進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的。
共享同一打印終端,發現會有多行內容打印到一行的現象(多個進程共享並搶占同一個打印終端,亂了)
既然可以用文件共享數據,那么進程間通信用文件作為數據傳輸介質就可以了啊,可以,但是有問題:1.效率 2.需要自己加鎖處理
加鎖的目的是為了保證多個進程修改同一塊數據時,同一時間只能有一個修改,即串行的修改,沒錯,速度是慢了,犧牲了速度而保證了數據安全。
文件當做數據庫,模擬搶票(Lock互斥鎖)
#!/usr/bin/env python # -*- coding:utf-8 -*- #文件db的內容為:{"count":2} #注意一定要用雙引號,不然json無法識別 from multiprocessing import Process,Lock import json import time import random import os def work(filename,lock): #買票 # lock.acquire() with lock: with open(filename,encoding='utf-8') as f: dic=json.loads(f.read()) # print('剩余票數: %s' % dic['count']) if dic['count'] > 0: dic['count']-=1 time.sleep(random.randint(1,3)) #模擬網絡延遲 with open(filename,'w',encoding='utf-8') as f: f.write(json.dumps(dic)) print('%s 購票成功' %os.getpid()) else: print('%s 購票失敗' %os.getpid()) # lock.release() if __name__ == '__main__': lock=Lock() p_l=[] for i in range(10): p=Process(target=work,args=('db',lock)) p_l.append(p) p.start() for p in p_l: p.join() print('主線程')
輸出:
購票成功
購票成功
購票失敗
購票失敗
購票失敗
購票失敗
購票失敗
購票失敗
購票失敗
購票失敗
主線程
三,進程間通信
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。
3.1 進程間通信(IPC)方式一:隊列(推薦使用)
隊列方式一:Queue()
隊列先進先出,棧后進先出,創建隊列的類(底層就是以管道和鎖定的方式實現):
# 實例創建 Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。 # 參數說明 maxsize是隊列中允許最大項數,省略則無大小限制。 # 屬性介紹 q.put方法用以插入數據到隊列中 put方法還有兩個可選參數:blocked和timeout。 如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。 如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。 q.get方法可以從隊列讀取並且刪除一個元素。 get方法有兩個可選參數:blocked和timeout。 如果blocked為True(默認值),並且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。 如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常. q.get_nowait():同q.get(False) q.put_nowait():同q.put(False) q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。 q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
實例:基於隊列實現生產者和消費者模型
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: time.sleep(random.randint(1,3)) res=q.get() if res is None:break print('\033[45m消費者拿到了:%s\033[0m' %res) def producer(seq,q): for item in seq: time.sleep(random.randint(1,3)) print('\033[46m生產者生產了:%s\033[0m' %item) q.put(item) if __name__ == '__main__': q=Queue() c=Process(target=consumer,args=(q,)) c.start() producer(('包子%s' %i for i in range(5)),q) q.put(None) c.join() print('主線程')
輸出:
生產者生產了:包子0
消費者拿到了:包子0
生產者生產了:包子1
消費者拿到了:包子1
生產者生產了:包子2
消費者拿到了:包子2
生產者生產了:包子3
消費者拿到了:包子3
生產者生產了:包子4
消費者拿到了:包子4
主線程
隊列方式二:JoinableQueue()
JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
maxsize是隊列中允許最大項數,省略則無大小限制。
JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:
q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常
q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
生產者消費者模型:
from multiprocessing import Process,JoinableQueue import time,random def consumer(q): while True: # time.sleep(random.randint(1,2)) res=q.get() print('消費者拿到了 %s' %res) q.task_done() def producer(seq,q): for item in seq: # time.sleep(random.randrange(1,2)) q.put(item) print('生產者做好了 %s' %item) q.join() if __name__ == '__main__': q=JoinableQueue() seq=('包子%s' %i for i in range(5)) p=Process(target=consumer,args=(q,)) p.daemon=True #設置為守護進程,在主線程停止時p也停止,但是不用擔心,producer內調用q.join保證了consumer已經處理完隊列中的所有元素 p.start() producer(seq,q) print('主線程')
3.2 進程間通信(IPC)方式二:管道
# 創建實例 Pipe([duplex]):在進程之間創建一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道 # 參數介紹 dumplex:默認管道是全雙工的,如果將duplex射成False,conn1只能用於接收,conn2只能用於發送。 # 方法介紹 conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那么recv方法會拋出EOFError。 conn1.send(obj):通過連接發送對象。obj是與序列化兼容的任意對象 conn1.close():關閉連接。如果conn1被垃圾回收,將自動調用此方法 conn1.fileno():返回連接使用的整數文件描述符 conn1.poll([timeout]):如果連接上的數據可用,返回True。timeout指定等待的最長時限。如果省略此參數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數據到達。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。如果進入的消息,超過了這個最大值,將引發IOError異常,
並且在連接上無法進行進一步讀取。如果連接的另外一端已經關閉,再也不存在任何數據,將引發EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):通過連接發送字節數據緩沖區,buffer是支持緩沖區接口的任意對象,offset是緩沖區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,然后調用c.recv_bytes()函數進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩沖區接口(即bytearray對象或類似的對象)。
offset指定緩沖區中放置消息處的字節位移。返回值是收到的字節數。如果消息長度大於可用的緩沖區空間,將引發BufferTooShort異常。
生產者消費者實例:
from multiprocessing import Process,Pipe import time,os def consumer(p,name): left,right=p left.close() while True: try: baozi=right.recv() print('%s 收到包子:%s' %(name,baozi)) except EOFError: right.close() break def producer(seq,p): left,right=p right.close() for i in seq: left.send(i) # time.sleep(1) else: left.close() if __name__ == '__main__': left,right=Pipe() c1=Process(target=consumer,args=((left,right),'c1')) c1.start() seq=(i for i in range(10)) producer(seq,(left,right)) right.close() left.close() c1.join() print('主進程')
注意:生產者和消費者都沒有使用管道的某個端點,就應該將其關閉,如在生產者中關閉管道的右端,在消費者中關閉管道的左端。如果忘記執行這些步驟,程序可能再消費者中的recv()操作上掛起。管道是由操作系統進行引用計數的,必須在所有進程中關閉管道后才能生產EOFError異常。因此在生產者中關閉管道不會有任何效果,付費消費者中也關閉了相同的管道端點。
管道可以用於雙向通信,利用通常在客戶端/服務器中使用的請求/響應模型或遠程過程調用,就可以使用管道編寫與進程交互的程序,如下
from multiprocessing import Process,Pipe import time,os def adder(p,name): server,client=p client.close() while True: try: x,y=server.recv() except EOFError: server.close() break res=x+y server.send(res) print('server done') if __name__ == '__main__': server,client=Pipe() c1=Process(target=adder,args=((server,client),'c1')) c1.start() server.close() client.send((10,20)) print(client.recv()) client.close() c1.join() print('主進程')
輸出:
30
server done
主進程
四,進程池
4.1 進程池介紹
開多進程的目的是為了並發,如果有多核,通常有幾個核就開幾個進程,進程開啟過多,效率反而會下降(開啟進程是需要占用系統資源的,而且開啟多余核數目的進程也無法做到並行),但很明顯需要並發執行的任務要遠大於核數,這時我們就可以通過維護一個進程池來控制進程數目,比如httpd的進程模式,規定最小進程數和最大進程數。
當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。而且對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,就重用進程池中的進程。在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多台主機,並行操作可以節約大量的時間。
4.2 進程池使用
4.2.1進程池方式一:
同步調用:
from multiprocessing import Pool import os,time def work(n): print('{} run'.format(os.getpid())) time.sleep(1) return n ** 2 # ret if __name__ == '__main__': p = Pool(3) # 創建3個進程 res_1 = [] for i in range(20): res = p.apply(work,args=(i,)) '''同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞, 但不管該任務是否存在阻塞,同步調用都會在原地等着,只是等的過程中若是任務發生了阻塞就會被奪走cpu的執行權限; 個人理解:程序判斷兩個子程序執行的間隔時間,過長則判斷存在阻塞,屆時奪走上次進程ID的使用權限,從進程池分配新的進程ID''' res_1.append(res) print(res_1)
異步調用:
from multiprocessing import Pool import os,time def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務 res_l=[] for i in range(10): print(i) # for循環會提前運行完畢,進程池內的任務還未執行。 res=p.apply_async(work,args=(i,)) #同步運行,阻塞、直到本次任務執行完畢拿到res res_l.append(res) #將調用apply_async方法,得到返回進程內存地址結果 #異步apply_async用法:如果使用異步提交的任務,主進程需要使用join,等待進程池內任務都處理完,然后可以用get收集結果, # 否則,主進程結束,進程池可能還沒來得及執行,也就跟着一起結束了 p.close() p.join() for res in res_l: print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get
apply_sync和apply方法
# apply_async方法 from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): print(i) msg = "hello %d" %(i) res=pool.apply_async(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去 res_l.append(res) # 將apply_async方法得到的內存地址結果加入列表 print("==============================>") #沒有后面的join,或get,則程序整體結束,進程池中的任務還沒來得及全部執行完 # 也都跟着主進程一起結束了 pool.close() #關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成 pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束 print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步 # 是在join后執行的,證明結果已經計算完畢,剩下的事情就是調用每個對象下的get方法去獲取結果 for i in res_l: print(i.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get #apply方法 from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(0.1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去 res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另外一個 print("==============================>") pool.close() pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束 print(res_l) #看到的就是最終的結果組成的列表 for i in res_l: #apply是同步的,所以直接得到結果,沒有get()方法 print(i)
map方法
#map import os,time from multiprocessing import Pool def func(i): time.sleep(1) print('子進程{}'.format(os.getpid())) return i if __name__ == '__main__': p = Pool(5) ret = p.map(func,range(10)) # func(next(range(10))) print(ret)
4.2.1 concurrent.futures 模塊實現‘池’
同步調用:
# #同步調用:提交/調用一個任務,然后就在原地等着,等到該任務執行完畢拿到結果,再執行下一行代碼 from concurrent.futures import ProcessPoolExecutor import time def save_test(name,n): n = n+1 time.sleep(n) print("{}的名字次數為{}".format(name,n)) return n if __name__ == '__main__': start = time.time() ex = ProcessPoolExecutor(max_workers=3) lista = ["Tom","Jerry","XiaoHua","Ming"] for i,j in enumerate(lista): #存在兩個以上的參數時,直接用逗號隔開,不需要用括號 task = ex.submit(save_test,j,i).result() print(task) #ex.shutdown(wait=True)是進程池內部的進程都執行完畢,才會關閉,然后執行后續代碼 ex.shutdown(wait=True) print("主進程直接運行") stop = time.time() print(stop-start)
異步調用:
#異步調用: 提交/調用一個任務,不在原地等着,直接執行下一行代碼 from concurrent.futures import ProcessPoolExecutor import time def save_test(name,n): n = n+1 time.sleep(n) print("{}的名字次數為{}".format(name,n)) return n if __name__ == '__main__': obj = list() start = time.time() ex = ProcessPoolExecutor(max_workers=3) lista = ["letme","Mlxg","XiaoHu","Ming"] for i,j in enumerate(lista): #存在兩個以上的參數時,直接用逗號隔開,不需要用括號 task = ex.submit(save_test,j,i) obj.append(task) #ex.shutdown(wait=True)是進程池內部的進程都執行完畢,才會關閉,然后執行后續代碼 ex.shutdown(wait=True) print("主進程直接運行") for i in obj: print(i.result()) stop = time.time() print(stop-start)
map方法使用
from concurrent.futures import ProcessPoolExecutor import urllib.request URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/'] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: print('%r page is %d bytes' % (url, len(conn.read()))) if __name__ == '__main__': executor = ProcessPoolExecutor(max_workers=3) executor.map(load_url,URLS) print('主進程')
五,總結
用futures的寫法上更簡潔一些,concurrent.futures的性能並沒有更好,只是讓編碼變得更簡單。考慮並發編程的時候,任何簡化都是好事。從長遠來看,concurrent.futures編寫的代碼更容易維護。
使用map時,future是逐個迭代提交,multiprocessing.Pool是批量提交jobs,因此對於大批量jobs的處理,multiprocessing.Pool效率會更高一些。對於需要長時間運行的作業,用future更佳,future提供了更多的功能(callback, check status, cancel)。
concurrent.futures.ProcessPoolExecutor是對multiprocessing的封裝,在運行時需導入__main__,不能直接在交互窗口工作。