主要內容
python的multiprocess模塊和用法
在python程序中的進程操作
之前我們已經了解了很多進程相關的理論知識,了解進程是什么應該不再困難了,剛剛我們已經了解了,運行中的程序就是一個進程。所有的進程都是通過它的父進程來創建的。因此,運行起來的python程序也是一個進程,那么我們也可以在程序中再創建進程。多個進程可以實現並發效果,也就是說,當我們的程序中存在多個進程的時候,在某些時候,就會讓程序的執行速度變快。以我們之前所學的知識,並不能實現創建進程這個功能,所以我們就需要借助python中強大的模塊。
multiprocess模塊
仔細說來,multiprocess不是一個模塊而是python中一個操作、管理進程的包。 之所以叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的所有子模塊。由於提供的子模塊非常多,為了方便大家歸類記憶,我將這部分大致分為四個部分:創建進程部分,進程同步部分,進程池部分,進程之間數據共享。
multiprocess.process模塊
process模塊介紹
process模塊是一個創建進程的模塊,借助這個模塊,就可以完成進程的創建。
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動) 強調: 1. 需要使用關鍵字的方式來指定參數 2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號 參數介紹: group參數未使用,值始終為None target表示調用對象,即子進程要執行的任務 args表示調用對象的位置參數元組,args=(1,2,'egon',) kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} name為子進程的名稱
p.start():啟動進程,並調用該子進程中的p.run()
p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法
p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖
p.is_alive():如果p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程
方法介紹
p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)
p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
屬性介紹
在Windows操作系統中由於沒有fork(linux操作系統中創建進程的機制),在創建子進程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執行了整個文件。因此如果將process()直接寫在文件中就會無限遞歸創建子進程報錯。所以必須把創建子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候 ,就不會遞歸運行了。
使用process模塊創建進程
在一個python進程中開啟子進程,start方法和並發效果。
import time from multiprocessing import Process def f(name): print('hello', name) print('我是子進程') if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() time.sleep(1) print('執行主進程的內容了') 在python中啟動的第一個子進程
2.子進程中進行傳參操作
import os import time from multiprocessing import Process def process1(n,name,num = 20): print('process1 : ',os.getpid()) print('n : ',n,name,num) time.sleep(10) if __name__ == '__main__': print(os.getpid()) p = Process(target=process1,args=[1,'alex',30]) p.start()
# 主進程默認會等待子進程執行完畢之后才結束
# 主進程和子進程之間的代碼是異步的
# 為什么主進程要等待子進程結束 回收一些子進程的資源
# 開啟一個進程是有時間開銷的 :操作系統響應開啟進程指令,給這個進程分配必要的資源
import os import time from multiprocessing import Process def func(): print(os.getpid(),os.getppid()) time.sleep(1) if __name__ == '__main__': print(os.getpid(),os.getppid()) # process id,parent process id Process(target=func).start() # func print('*'*20) time.sleep(0.5) print('*'*40)
3. join的用法
import os from multiprocessing import Process def func(exp): print(os.getpid(),os.getppid()) result = eval(exp) with open('file','w') as f: f.write(str(result)) if __name__ == '__main__': print(os.getpid(),os.getppid()) # process id,parent process id # 3*5+5/6 p = Process(target=func,args=['3*5']) # func p.start() ret = 5/6 p.join() # join方法能夠檢測到p進程是否已經執行完了,阻塞知道p執行結束 with open('file') as f: result = f.read() ret = ret + int(result) print(ret)
4.開啟多個子進程

import os import time from multiprocessing import Process def process(n): print(os.getpid(),os.getppid()) time.sleep(1) print(n) if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=process,args=[i,]) p.start() p_lst.append(p) for p in p_lst:p.join() # 檢測p是否結束 如果沒有結束就阻塞直到結束 如果已經結束了就不阻塞 print('求和')
5.開啟子進程的第二種方式
除了上面這些開啟進程的方法,還有一種以繼承Process類的形式開啟進程的方式
import os from multiprocessing import Process class Myprocess(Process): def __init__(self,*args): super().__init__() self.args = args def run(self): print(os.getpid(),self.name,self.pid) for name in self.args: print('%s和女主播聊天'%name) if __name__ == '__main__': print(os.getpid()) p = Myprocess('yuan','wusir') p.start() # 在執行start的時候,會幫我們主動執行run方法中的內容
6.進程之間的數據隔離問題
from multiprocessing import Process def work(): global n n=0 print('子進程內: ',n) if __name__ == '__main__': n = 100 p=Process(target=work) p.start() print('主進程內: ',n) 進程之間的數據隔離問題
守護進程
會隨着主進程的結束而結束。
主進程創建守護進程
其一:守護進程會在主進程代碼執行結束后就終止
其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止
import os import time from multiprocessing import Process class Myprocess(Process): def __init__(self,person): super().__init__() self.person = person def run(self): print(os.getpid(),self.name) print('%s正在和女主播聊天' %self.person) p=Myprocess('哪吒') p.daemon=True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程代碼執行結束,p即終止運行 p.start() time.sleep(10) # 在sleep時查看進程id對應的進程ps -ef|grep id print('主') 守護進程的啟動
from multiprocessing import Process def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() time.sleep(0.1) print("main-------")#打印該行則主進程代碼結束,則守護進程p1應該被終止.#可能會有p1任務執行的打印信息123,因為主進程打印main----時,p1也執行了,但是隨即被終止. 主進程代碼執行結束守護進程立即結束
from multiprocessing import Process import time import random class Myprocess(Process): def __init__(self,person): self.name=person super().__init__() def run(self): print('%s正在和網紅臉聊天' %self.name) time.sleep(random.randrange(1,5)) print('%s還在和網紅臉聊天' %self.name) p1=Myprocess('哪吒') p1.start() p1.terminate()#關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活 print(p1.is_alive()) #結果為True print('開始') print(p1.is_alive()) #結果為False 進程對象的其他方法:terminate,is_alive
class Myprocess(Process): def __init__(self,person): self.name=person # name屬性是Process中的屬性,標示進程的名字 super().__init__() # 執行父類的初始化方法會覆蓋name屬性 #self.name = person # 在這里設置就可以修改進程名字了 #self.person = person #如果不想覆蓋進程名,就修改屬性名稱就可以了 def run(self): print('%s正在和網紅臉聊天' %self.name) # print('%s正在和網紅臉聊天' %self.person) time.sleep(random.randrange(1,5)) print('%s正在和網紅臉聊天' %self.name) # print('%s正在和網紅臉聊天' %self.person) p1=Myprocess('哪吒') p1.start() print(p1.pid) #可以查看子進程的進程id 進程對象的其他屬性:pid和name
進程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)
鎖 —— multiprocess.Lock
通過剛剛的學習,我們千方百計實現了程序的異步,讓多個任務可以同時在幾個進程中並發處理,他們之間的運行沒有順序,一旦開啟也不受我們控制。盡管並發編程讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題。
當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題
import os import time import random from multiprocessing import Process def work(n): print('%s: %s is running' %(n,os.getpid())) time.sleep(random.random()) print('%s:%s is done' %(n,os.getpid())) if __name__ == '__main__': for i in range(3): p=Process(target=work,args=(i,)) p.start() 多進程搶占輸出資源
# 由並發變成了串行,犧牲了運行效率,但避免了競爭 import os import time import random from multiprocessing import Process,Lock def work(lock,n): lock.acquire() print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,i)) p.start() 使用鎖維護執行順序
上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程序又重新變成串行了,這樣確實會浪費了時間,卻保證了數據的安全。
接下來,我們以模擬搶票為例,來看看數據安全的重要性
#文件db的內容為:{"count":1} #注意一定要用雙引號,不然json無法識別 #並發運行,效率高,但競爭寫同一文件,數據寫入錯亂 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('\033[43m剩余票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(0.1) #模擬讀數據的網絡延遲 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模擬寫數據的網絡延遲 json.dump(dic,open('db','w')) print('\033[43m購票成功\033[0m') def task(): search() get() if __name__ == '__main__': for i in range(100): #模擬並發100個客戶端搶票 p=Process(target=task) p.start() 多進程同時搶購余票
#文件db的內容為:{"count":5} #注意一定要用雙引號,不然json無法識別 #並發運行,效率高,但競爭寫同一文件,數據寫入錯亂 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('\033[43m剩余票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(random.random()) #模擬讀數據的網絡延遲 if dic['count'] >0: dic['count']-=1 time.sleep(random.random()) #模擬寫數據的網絡延遲 json.dump(dic,open('db','w')) print('\033[32m購票成功\033[0m') else: print('\033[31m購票失敗\033[0m') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock = Lock() for i in range(100): #模擬並發100個客戶端搶票 p=Process(target=task,args=(lock,)) p.start() 使用鎖來保證數據安全
#加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。 雖然可以用文件共享數據實現進程間通信,但問題是: 1.效率低(共享數據基於文件,而文件是硬盤上的數據) 2.需要自己加鎖處理 #因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道。 隊列和管道都是將數據存放於內存中 隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來, 我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。
總結:
# 同步控制
# 只要用到了鎖 鎖之內的代碼就變成同步的了
# 鎖 :控制一段代碼 同一時間 只能被一個進程執行
信號量 —— multiprocess.Semaphore(了解)
互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據 。
假設商場里有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念
from multiprocessing import Process,Semaphore import time,random def go_ktv(sem,user): sem.acquire() print('%s 占到一間ktv小屋' %user) time.sleep(random.randint(0,3)) #模擬每個人在ktv中待的時間不同 sem.release() if __name__ == '__main__': sem=Semaphore(4) p_l=[] for i in range(13): p=Process(target=go_ktv,args=(sem,'user%s' %i,)) p.start() p_l.append(p) for i in p_l: i.join() print('============》') 例子
事件 —— multiprocess.Event(了解)
python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。
clear:將“Flag”設置為False
set:將“Flag”設置為True
事件介紹
# 事件
# wait的方法 根據一個狀態來決定自己是否要阻塞
# 狀態相關的方法
# set 將狀態改為T
# clear 將狀態改為F
# is_set 判斷當前的狀態是否為T
import time import random from multiprocessing import Process,Event def car(i,e): # 感知狀態的變化 if not e.is_set(): # 當前這個事件的狀態如果是False print('car%s正在等待'%i) # 這輛車正在等待通過路口 e.wait() # 阻塞 直到有一個e.set行為 # 等紅燈 print('car%s通過路口'%i) def traffic_light(e): # 修改事件的狀態 print('\033[1;31m紅燈亮\033[0m') # 事件在創立之初的狀態是False,相當於我程序中的紅燈 time.sleep(2) # 紅燈亮2s while True: if not e.is_set(): # False print('\033[1;32m綠燈亮\033[0m') e.set() elif e.is_set(): print('\033[1;31m紅燈亮\033[0m') e.clear() time.sleep(2) if __name__ == '__main__': e = Event() Process(target=traffic_light,args=[e,]).start() for i in range(50): time.sleep(random.randrange(0,5,2)) Process(target=car,args=[i,e]).start()