進擊のpython
並發編程——重要方法
前面提到的方法都是比較基本的,比較基礎
這個小節我們着重了解一下以下幾個方法:
1.守護進程
2.互斥鎖
3.隊列
守護進程
守護進程,就有點像古時候的封建奴隸制下的皇上與太監,皇上死了,太監就得陪着殉葬,守護着皇上
守護進程就是這樣的關系,當主進程結束,守護進程隨之消失
守護進程一般是怎么用呢?如果我們有兩個進程並發執行,那我們就可以開一個主進程,再開一個子進程
但是呢,如果我們開的這個子進程在主進程結束之后就沒什么用了,那這個時候就建議用守護進程了
p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程
當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程
必須在p.start()之前設置
from multiprocessing import Process
def func(name, *args, **kwargs):
print(f'{name}執行了')
pass
if __name__ == '__main__':
p = Process(target=func, args=('子進程',))
# p.daemon = True
p.start()
print('我是主進程... ...')
正常開啟子進程,都不是守護進程,由於聲明之后還要有一段時間才能執行子進程
所以先打印 “我是主進程... ...” ,再打印 “子進程被執行了”
但是當我把注釋的語句打開,p就變成了守護進程(注意,是在start上面)
由於還是主進程會先打印,就會導致,子進程還沒打印就陪葬了
就只會打印“我是主進程... ...”
互斥鎖
首先啊,原則上進程之間是不能夠相互通信的,但是捏,進程之間還是共享一套文件資源的
所以訪問同一個文件或者同一個打印終端,是沒有問題的
但是計算機是貪婪地,有共享,就會有競爭
有競爭,如果沒有限制,沒有秩序,就會混亂
import time
from multiprocessing import Process
def func(name, *args, **kwargs):
print(f'{name}正在執行.. ..')
time.sleep(2)
print(f'{name}執行完畢!')
pass
if __name__ == '__main__':
for i in range(3):
p = Process(target=func, args=(f'進程{i}',))
p.start()
我想達到的效果是不是應該是
進程0正在執行.. ..
進程0執行完畢!
進程1正在執行.. ..
進程1執行完畢!
進程2正在執行.. ..
進程2執行完畢!
而實際情況打印的是:
進程1正在執行.. ..
進程0正在執行.. ..
進程2正在執行.. ..
進程1執行完畢!
進程0執行完畢!
進程2執行完畢!
這就出現了競爭(多打印幾次,還會出現別的結果)
就好像你去上廁所,但是大家都不排隊,你擠進去了,別人還往里擠
還有的時候開門看看你,所以為了解決這個問題,我們一般都在衛生間加把鎖
上廁所的時候鎖一下門,我不完事,誰也別想進來
是對於程序也是,加一個鎖,就讓程序變得有序了
其實也就相當於把並發變成了串行,誠然降低了效率,但是保證了數據的安全
import time
from multiprocessing import Process, Lock
def func(name, lock, *args, **kwargs):
lock.acquire()
print(f'{name}正在執行.. ..')
time.sleep(2)
print(f'{name}執行完畢!')
lock.release()
pass
if __name__ == '__main__':
lock = Lock()
for i in range(3):
p = Process(target=func, args=(f'進程{i}', lock))
p.start()
lock = Lock(),對象的實例化,實例化出一個鎖
然后鎖,有關鎖(acquire)就有開鎖(release)
這樣就保證某個子進程在用打印的時候,別的子進程只能在外面等待,就達到了我們的目的
搶票系統
這個經典的例子也可以通過這個搶票系統來幫助理解
首先所有人都可以查票,如果有票就可以買
我們可以先寫一下
import json
from multiprocessing import Process
def search(name, *args, **kwargs):
msg = json.load(open('db.json'))
print(f'{name}查看票數,當前有{msg["count"]}張')
pass
def get(name, *args, **kwargs):
msg = json.load(open('db.json'))
if msg["count"] > 0:
msg["count"] -= 1
json.dump(msg, open('db.json', 'w'))
print(f'{name}買到票了,當前有{msg["count"]}張')
def main(name):
search(name)
get(name)
if __name__ == '__main__':
for i in range(1, 5):
p = Process(target=main, args=(f"第{i}個人",))
p.start()
json.dump({'count': 1}, open("db.json", 'w'))
執行一下
第2個人查看票數,當前有1張
第2個人買到票了,當前有0張
第4個人查看票數,當前有1張
第3個人查看票數,當前有1張
第4個人買到票了,當前有0張
第3個人買到票了,當前有0張
第1個人查看票數,當前有1張
第1個人買到票了,當前有0張
問題出現了,只有一張票,但是我們卻賣出去四張
這是不對的,應該是查詢大家一起查,買票只能一個一個買
所以可以這樣:
import json
import time
from multiprocessing import Process, Lock
def search(name, *args, **kwargs):
msg = json.load(open('db.json'))
print(f'{name}查看票數,當前有{msg["count"]}張')
pass
def get(name,lock, *args, **kwargs):
lock.acquire()
msg = json.load(open('db.json'))
if msg["count"] > 0:
msg["count"] -= 1
json.dump(msg, open('db.json', 'w'))
print(f'{name}買到票了,當前有{msg["count"]}張')
else:
print(f'{name}沒買到票!')
lock.release()
def main(name,lock):
time.sleep(1) # 模擬網絡延遲
search(name)
get(name,lock)
if __name__ == '__main__':
lock = Lock()
for i in range(1, 5):
p = Process(target=main, args=(f"第{i}個人",lock))
p.start()
json.dump({'count': 1}, open("db.json", 'w'))
輸出結果就是我們想的那樣了,只有一個人買到了
第2個人查看票數,當前有1張
第2個人買到票了,當前有0張
第4個人查看票數,當前有0張
第4個人沒買到票!
第3個人查看票數,當前有0張
第3個人沒買到票!
第1個人查看票數,當前有0張
第1個人沒買到票!
橫向對比join方法,在剛開始的時候我們也利用join方法進行了等待的操作
我們看看用join怎么做
import json
import time
from multiprocessing import Process, Lock
def search(name, *args, **kwargs):
msg = json.load(open('db.json'))
print(f'{name}查看票數,當前有{msg["count"]}張')
pass
def get(name, lock, *args, **kwargs):
lock.acquire()
msg = json.load(open('db.json'))
if msg["count"] > 0:
msg["count"] -= 1
json.dump(msg, open('db.json', 'w'))
print(f'{name}買到票了,當前有{msg["count"]}張')
else:
print(f'{name}沒買到票!')
lock.release()
def main(name, lock):
time.sleep(1) # 模擬網絡延遲
search(name)
get(name, lock)
if __name__ == '__main__':
lock = Lock()
for i in range(1, 5):
p = Process(target=main, args=(f"第{i}個人", lock))
p.start()
p.join()
發現也可以達到我哦們想要的效果,但是查票的過程也變成串行了
很明顯大家查票時應該是並發地去查詢而無需考慮數據准確與否
此時join與互斥鎖的區別就顯而易見了:
join是將一個任務整體串行,而互斥鎖的好處則是可以將一個任務中的某一段代碼串行
比如只讓main函數中的get任務串行
隊列
進程彼此之間互相隔離,要實現進程間通信(IPC)
multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
是消息傳遞,不是大數據!
看一下隊列中有什么
class Queue(object):
def __init__(self, maxsize=-1):
self._maxsize = maxsize
maxsize:可放入的允許的最大項數,如果不設置,默認無限
但是隊列是基於內存的,真正的無限其實取決於內存
def qsize(self):
return 0
def empty(self):
return False
def full(self):
return False
def put(self, obj, block=True, timeout=None):
pass
def put_nowait(self, obj):
pass
def get(self, block=True, timeout=None):
pass
def get_nowait(self):
pass
def close(self):
pass
def join_thread(self):
pass
def cancel_join_thread(self):
pass
隊列的方法大概是上述這些
put():往隊列里放信息
get():從隊列里拿數據
empty():判斷隊列是否為空
full():判斷隊列滿沒滿
同時也要記住,隊列是“先進先出”
當隊列滿了,再往里放,就會阻塞
當隊列空了,再往外取,也會阻塞
from multiprocessing import Queue
q = Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.full())
print(q.get())
print(q.get())
print(q.get())
print(q.empty())
可以看到打印出來的是1 2 3
因為我們放的順序就是1 2 3