一 互斥鎖
進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂,如下
#並發運行,效率高,但競爭同一打印終端,帶來了打印錯亂 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) if __name__ == '__main__': for i in range(3): p=Process(target=work) p.start()
如何控制,就是加鎖處理。而互斥鎖的意思就是互相排斥,如果把多個進程比喻為多個人,互斥鎖的工作原理就是多個人都要去爭搶同一個資源:衛生間,一個人搶到衛生間后上一把鎖,其他人都要等着,等到這個完成任務后釋放鎖,其他人才有可能有一個搶到......所以互斥鎖的原理,就是把並發改成穿行,降低了效率,但保證了數據安全不錯亂
#由並發變成了串行,犧牲了運行效率,但避免了競爭 from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() #加鎖 print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) lock.release() #釋放鎖 if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,)) p.start()
二 模擬搶票練習
多個進程共享同一文件,我們可以把文件當數據庫,用多個進程模擬多個人執行搶票任務
#文件db.txt的內容為:{"count":1} #注意一定要用雙引號,不然json無法識別 from multiprocessing import Process import time,json def search(name): dic=json.load(open('db.txt')) time.sleep(1) print('\033[43m%s 查到剩余票數%s\033[0m' %(name,dic['count'])) def get(name): dic=json.load(open('db.txt')) time.sleep(1) #模擬讀數據的網絡延遲 if dic['count'] >0: dic['count']-=1 time.sleep(1) #模擬寫數據的網絡延遲 json.dump(dic,open('db.txt','w')) print('\033[46m%s 購票成功\033[0m' %name) def task(name): search(name) get(name) if __name__ == '__main__': for i in range(10): #模擬並發10個客戶端搶票 name='<路人%s>' %i p=Process(target=task,args=(name,)) p.start()
並發運行,效率高,但競爭寫同一文件,數據寫入錯亂,只有一張票,賣成功給了10個人
<路人0> 查到剩余票數1 <路人1> 查到剩余票數1 <路人2> 查到剩余票數1 <路人3> 查到剩余票數1 <路人4> 查到剩余票數1 <路人5> 查到剩余票數1 <路人6> 查到剩余票數1 <路人7> 查到剩余票數1 <路人8> 查到剩余票數1 <路人9> 查到剩余票數1 <路人0> 購票成功 <路人4> 購票成功 <路人1> 購票成功 <路人5> 購票成功 <路人3> 購票成功 <路人7> 購票成功 <路人2> 購票成功 <路人6> 購票成功 <路人8> 購票成功 <路人9> 購票成功
加鎖處理:購票行為由並發變成了串行,犧牲了運行效率,但保證了數據安全
#把文件db.txt的內容重置為:{"count":1} from multiprocessing import Process,Lock import time,json def search(name): dic=json.load(open('db.txt')) time.sleep(1) print('\033[43m%s 查到剩余票數%s\033[0m' %(name,dic['count'])) def get(name): dic=json.load(open('db.txt')) time.sleep(1) #模擬讀數據的網絡延遲 if dic['count'] >0: dic['count']-=1 time.sleep(1) #模擬寫數據的網絡延遲 json.dump(dic,open('db.txt','w')) print('\033[46m%s 購票成功\033[0m' %name) def task(name,lock): search(name) with lock: #相當於lock.acquire(),執行完自代碼塊自動執行lock.release() get(name) if __name__ == '__main__': lock=Lock() for i in range(10): #模擬並發10個客戶端搶票 name='<路人%s>' %i p=Process(target=task,args=(name,lock)) p.start()
執行結果
<路人0> 查到剩余票數1 <路人1> 查到剩余票數1 <路人2> 查到剩余票數1 <路人3> 查到剩余票數1 <路人4> 查到剩余票數1 <路人5> 查到剩余票數1 <路人6> 查到剩余票數1 <路人7> 查到剩余票數1 <路人8> 查到剩余票數1 <路人9> 查到剩余票數1 <路人0> 購票成功
三 互斥鎖與join
使用join可以將並發變成串行,互斥鎖的原理也是將並發變成穿行,那我們直接使用join就可以了啊,為何還要互斥鎖,說到這里我趕緊試了一下
#把文件db.txt的內容重置為:{"count":1} from multiprocessing import Process,Lock import time,json def search(name): dic=json.load(open('db.txt')) print('\033[43m%s 查到剩余票數%s\033[0m' %(name,dic['count'])) def get(name): dic=json.load(open('db.txt')) time.sleep(1) #模擬讀數據的網絡延遲 if dic['count'] >0: dic['count']-=1 time.sleep(1) #模擬寫數據的網絡延遲 json.dump(dic,open('db.txt','w')) print('\033[46m%s 購票成功\033[0m' %name) def task(name,): search(name) get(name) if __name__ == '__main__': for i in range(10): name='<路人%s>' %i p=Process(target=task,args=(name,)) p.start() p.join()
執行結果
<路人0> 查到剩余票數1 <路人0> 購票成功 <路人1> 查到剩余票數0 <路人2> 查到剩余票數0 <路人3> 查到剩余票數0 <路人4> 查到剩余票數0 <路人5> 查到剩余票數0 <路人6> 查到剩余票數0 <路人7> 查到剩余票數0 <路人8> 查到剩余票數0 <路人9> 查到剩余票數0
發現使用join將並發改成穿行,確實能保證數據安全,但問題是連查票操作也變成只能一個一個人去查了,很明顯大家查票時應該是並發地去查詢而無需考慮數據准確與否,此時join與互斥鎖的區別就顯而易見了,join是將一個任務整體串行,而互斥鎖的好處則是可以將一個任務中的某一段代碼串行,比如只讓task函數中的get任務串行
def task(name,): search(name) # 並發執行 lock.acquire() get(name) #串行執行 lock.release()
四 總結
加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行地修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是:
1、效率低(共享數據基於文件,而文件是硬盤上的數據)
2、需要自己加鎖處理
因此我們最好找尋一種解決方案能夠兼顧:
1、效率高(多個進程共享一塊內存的數據)
2、幫我們處理好鎖問題。
這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道。
隊列和管道都是將數據存放於內存中,而隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,因而隊列才是進程間通信的最佳選擇。
我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。