Barrier
柵欄,也叫屏障。可以想象成路障、道閘。
Python 3.2引入的新功能。
構造方法:
threading.Barrier(parties, action=None, timeout=None)
構建Barrier對象,parties 指定參與方數目,timeout是wait方法未指定時超時的默認值。
n_waiting 當前在柵欄中等待的線程數
parties 通過柵欄所需的線程數
wait(timeout=None) 等待通過柵欄,返回0到線程數-1的整數(barrier_id),每個線程返回不同。如果wait方法設置了超時,並超時發送,柵欄將處於broken狀態。
例1:
#Barrier 柵欄
import threading,logging
logging.basicConfig(level=logging.INFO,format="[-] %(threadName)s %(message)s")
def work(barrier:threading.Barrier):
logging.info("n_waiting = {}".format(barrier.n_waiting)) # 等待的線程數
bid = barrier.wait() # 參與者的id,返回0到線程數減1的數值
logging.info("after barrier {}".format(bid)) # 柵欄之后
barrier = threading.Barrier(3) # 3個參與者,每3個開閘放行,0,1,2 4,5,6
for x in range(1,4): # 所有參數者個數,4,5,6,10,15
threading.Event().wait(1)
threading.Thread(target=work,args=(barrier,),name="Barrier-{}".format(x)).start()
運行結果:
[-] Barrier-1 n_waiting = 0
[-] Barrier-2 n_waiting = 1
[-] Barrier-3 n_waiting = 2
[-] Barrier-3 after barrier 2
[-] Barrier-2 after barrier 1
[-] Barrier-1 after barrier 0
每一個進來就等待,不夠3個就阻塞,直到夠3個就開閘放行。
Barrier實例的方法:
broken 檢測柵欄是否處於打破的狀態,返回True或False
abort() 將柵欄置於broken狀態,等待中的線程或者調用等待方法的線程都會拋出threading.BrokenBarrieError異常,直到reset方法來恢復柵欄
reset() 恢復柵欄,重新開始攔截
例2:
#Barrier 柵欄
import threading,logging
logging.basicConfig(level=logging.INFO,format="[-] %(threadName)s %(message)s")
def work(barrier:threading.Barrier):
logging.info("n_waiting = {}".format(barrier.n_waiting))
try:
bid = barrier.wait()
logging.info("after barrier {}".format(bid))
except threading.BrokenBarrierError:
logging.info("Broken Barrier in {}".format(threading.current_thread()))
barrier = threading.Barrier(3)
for x in range(1,12): #12個
if x == 3:
barrier.abort() #有一個人壞了規矩
elif x == 6:
barrier.reset()
threading.Event().wait(1)
threading.Thread(target=work,args=(barrier,),name="Barrier-{}".format(x)).start()
運行結果:
[-] Barrier-1 n_waiting = 0 #0,1
[-] Barrier-2 n_waiting = 1
[-] Barrier-2 Broken Barrier in <Thread(Barrier-2, started 3124)>
[-] Barrier-1 Broken Barrier in <Thread(Barrier-1, started 8036)>
[-] Barrier-3 n_waiting = 0
[-] Barrier-3 Broken Barrier in <Thread(Barrier-3, started 7428)>
[-] Barrier-4 n_waiting = 0
[-] Barrier-4 Broken Barrier in <Thread(Barrier-4, started 1828)>
[-] Barrier-5 n_waiting = 0
[-] Barrier-5 Broken Barrier in <Thread(Barrier-5, started 7416)>
[-] Barrier-6 n_waiting = 0 #6,7,8
[-] Barrier-7 n_waiting = 1
[-] Barrier-8 n_waiting = 2
[-] Barrier-8 after barrier 2
[-] Barrier-7 after barrier 1
[-] Barrier-6 after barrier 0
[-] Barrier-9 n_waiting = 0 #9,10,11
[-] Barrier-10 n_waiting = 1
[-] Barrier-11 n_waiting = 2
[-] Barrier-11 after barrier 2
[-] Barrier-9 after barrier 0
[-] Barrier-10 after barrier 1
一共有12個參與者,依次開始,1和2處於等待狀態,到達第3的時候,進入了broken狀態,則直到第6個,才恢復柵欄,從6開始繼續攔截,達到3個(6,7,8)就放行,9,10,11也放行。
例3:
wait方法
#Barrier 柵欄
import threading,logging
logging.basicConfig(level=logging.INFO,format="[-] %(threadName)s %(message)s")
def work(barrier:threading.Barrier,i:int):
logging.info("n_waiting = {}".format(barrier.n_waiting))
try:
if i < 3:
bid = barrier.wait(1) #超時1秒就將柵欄置於broken狀態,拋出異常后續語句不會執行
else:
if i == 6:
barrier.reset() #恢復柵欄
bid = barrier.wait()
# logging.info("broken status = {}".format(barrier.broken)) #是否處於broken狀態
logging.info("after barrier {}".format(bid))
except threading.BrokenBarrierError:
logging.info("Broken Barrier in {}".format(threading.current_thread()))
barrier = threading.Barrier(3)
for i in range(1,11): #10個
threading.Event().wait(2) #強制延遲2秒,讓出時間片
threading.Thread(target=work,args=(barrier,i),name="Barrier-{}".format(i)).start()
運行結果:
[-] Barrier-1 n_waiting = 0
[-] Barrier-1 Broken Barrier in <Thread(Barrier-1, started 3100)>
[-] Barrier-2 n_waiting = 0
[-] Barrier-2 Broken Barrier in <Thread(Barrier-2, started 8836)>
[-] Barrier-3 n_waiting = 0
[-] Barrier-3 Broken Barrier in <Thread(Barrier-3, started 8428)>
[-] Barrier-4 n_waiting = 0
[-] Barrier-4 Broken Barrier in <Thread(Barrier-4, started 1204)>
[-] Barrier-5 n_waiting = 0
[-] Barrier-5 Broken Barrier in <Thread(Barrier-5, started 1556)>
[-] Barrier-6 n_waiting = 0
[-] Barrier-7 n_waiting = 1
[-] Barrier-8 n_waiting = 2
[-] Barrier-8 after barrier 2
[-] Barrier-7 after barrier 1
[-] Barrier-6 after barrier 0
[-] Barrier-9 n_waiting = 0
[-] Barrier-10 n_waiting = 1
阻塞中
wait方法在等待超時1秒后,就強制將柵欄置於broken狀態,直到第6個的時候才reset恢復,然后6,7,8放行,9,10,繼續阻塞。如果此時有第11個,就會9,10,11放行。
應用場景:
並發初始化
所有線程都必須初始化完成后,才能繼續工作,例如運行前加載數據,檢查,如果這些工作沒完成就不能正常工作運行。
10個線程做10種工作准備,每個線程負責一種工作,只有10個線程都完成后,才能繼續工作,先完成的要等待后完成的線程。
例如,啟動一個程序,需要先加載磁盤文件、緩存預熱、初始化連接池等工作,這些工作可以齊頭並進,不過只有都滿足了,程序才能繼續向后執行。假設數據庫鏈接失敗,則初始化工作失敗,就要abort,柵欄broken,所有線程收到異常退出。
工作量
有10個計算任務,完成6個,就算工作完成。
PPTBYG
