Barrier
柵欄,也叫屏障。可以想象成路障、道閘。
Python 3.2引入的新功能。
構造方法:
threading.Barrier(parties, action=None, timeout=None)
構建Barrier對象,parties 指定參與方數目,timeout是wait方法未指定時超時的默認值。
n_waiting 當前在柵欄中等待的線程數
parties 通過柵欄所需的線程數
wait(timeout=None) 等待通過柵欄,返回0到線程數-1的整數(barrier_id),每個線程返回不同。如果wait方法設置了超時,並超時發送,柵欄將處於broken狀態。
例1:
#Barrier 柵欄 import threading,logging logging.basicConfig(level=logging.INFO,format="[-] %(threadName)s %(message)s") def work(barrier:threading.Barrier): logging.info("n_waiting = {}".format(barrier.n_waiting)) # 等待的線程數 bid = barrier.wait() # 參與者的id,返回0到線程數減1的數值 logging.info("after barrier {}".format(bid)) # 柵欄之后 barrier = threading.Barrier(3) # 3個參與者,每3個開閘放行,0,1,2 4,5,6 for x in range(1,4): # 所有參數者個數,4,5,6,10,15 threading.Event().wait(1) threading.Thread(target=work,args=(barrier,),name="Barrier-{}".format(x)).start() 運行結果: [-] Barrier-1 n_waiting = 0 [-] Barrier-2 n_waiting = 1 [-] Barrier-3 n_waiting = 2 [-] Barrier-3 after barrier 2 [-] Barrier-2 after barrier 1 [-] Barrier-1 after barrier 0
每一個進來就等待,不夠3個就阻塞,直到夠3個就開閘放行。
Barrier實例的方法:
broken 檢測柵欄是否處於打破的狀態,返回True或False
abort() 將柵欄置於broken狀態,等待中的線程或者調用等待方法的線程都會拋出threading.BrokenBarrieError異常,直到reset方法來恢復柵欄
reset() 恢復柵欄,重新開始攔截
例2:
#Barrier 柵欄 import threading,logging logging.basicConfig(level=logging.INFO,format="[-] %(threadName)s %(message)s") def work(barrier:threading.Barrier): logging.info("n_waiting = {}".format(barrier.n_waiting)) try: bid = barrier.wait() logging.info("after barrier {}".format(bid)) except threading.BrokenBarrierError: logging.info("Broken Barrier in {}".format(threading.current_thread())) barrier = threading.Barrier(3) for x in range(1,12): #12個 if x == 3: barrier.abort() #有一個人壞了規矩 elif x == 6: barrier.reset() threading.Event().wait(1) threading.Thread(target=work,args=(barrier,),name="Barrier-{}".format(x)).start() 運行結果: [-] Barrier-1 n_waiting = 0 #0,1 [-] Barrier-2 n_waiting = 1 [-] Barrier-2 Broken Barrier in <Thread(Barrier-2, started 3124)> [-] Barrier-1 Broken Barrier in <Thread(Barrier-1, started 8036)> [-] Barrier-3 n_waiting = 0 [-] Barrier-3 Broken Barrier in <Thread(Barrier-3, started 7428)> [-] Barrier-4 n_waiting = 0 [-] Barrier-4 Broken Barrier in <Thread(Barrier-4, started 1828)> [-] Barrier-5 n_waiting = 0 [-] Barrier-5 Broken Barrier in <Thread(Barrier-5, started 7416)> [-] Barrier-6 n_waiting = 0 #6,7,8 [-] Barrier-7 n_waiting = 1 [-] Barrier-8 n_waiting = 2 [-] Barrier-8 after barrier 2 [-] Barrier-7 after barrier 1 [-] Barrier-6 after barrier 0 [-] Barrier-9 n_waiting = 0 #9,10,11 [-] Barrier-10 n_waiting = 1 [-] Barrier-11 n_waiting = 2 [-] Barrier-11 after barrier 2 [-] Barrier-9 after barrier 0 [-] Barrier-10 after barrier 1
一共有12個參與者,依次開始,1和2處於等待狀態,到達第3的時候,進入了broken狀態,則直到第6個,才恢復柵欄,從6開始繼續攔截,達到3個(6,7,8)就放行,9,10,11也放行。
例3:
wait方法
#Barrier 柵欄 import threading,logging logging.basicConfig(level=logging.INFO,format="[-] %(threadName)s %(message)s") def work(barrier:threading.Barrier,i:int): logging.info("n_waiting = {}".format(barrier.n_waiting)) try: if i < 3: bid = barrier.wait(1) #超時1秒就將柵欄置於broken狀態,拋出異常后續語句不會執行 else: if i == 6: barrier.reset() #恢復柵欄 bid = barrier.wait() # logging.info("broken status = {}".format(barrier.broken)) #是否處於broken狀態 logging.info("after barrier {}".format(bid)) except threading.BrokenBarrierError: logging.info("Broken Barrier in {}".format(threading.current_thread())) barrier = threading.Barrier(3) for i in range(1,11): #10個 threading.Event().wait(2) #強制延遲2秒,讓出時間片 threading.Thread(target=work,args=(barrier,i),name="Barrier-{}".format(i)).start() 運行結果: [-] Barrier-1 n_waiting = 0 [-] Barrier-1 Broken Barrier in <Thread(Barrier-1, started 3100)> [-] Barrier-2 n_waiting = 0 [-] Barrier-2 Broken Barrier in <Thread(Barrier-2, started 8836)> [-] Barrier-3 n_waiting = 0 [-] Barrier-3 Broken Barrier in <Thread(Barrier-3, started 8428)> [-] Barrier-4 n_waiting = 0 [-] Barrier-4 Broken Barrier in <Thread(Barrier-4, started 1204)> [-] Barrier-5 n_waiting = 0 [-] Barrier-5 Broken Barrier in <Thread(Barrier-5, started 1556)> [-] Barrier-6 n_waiting = 0 [-] Barrier-7 n_waiting = 1 [-] Barrier-8 n_waiting = 2 [-] Barrier-8 after barrier 2 [-] Barrier-7 after barrier 1 [-] Barrier-6 after barrier 0 [-] Barrier-9 n_waiting = 0 [-] Barrier-10 n_waiting = 1 阻塞中
wait方法在等待超時1秒后,就強制將柵欄置於broken狀態,直到第6個的時候才reset恢復,然后6,7,8放行,9,10,繼續阻塞。如果此時有第11個,就會9,10,11放行。
應用場景:
並發初始化
所有線程都必須初始化完成后,才能繼續工作,例如運行前加載數據,檢查,如果這些工作沒完成就不能正常工作運行。
10個線程做10種工作准備,每個線程負責一種工作,只有10個線程都完成后,才能繼續工作,先完成的要等待后完成的線程。
例如,啟動一個程序,需要先加載磁盤文件、緩存預熱、初始化連接池等工作,這些工作可以齊頭並進,不過只有都滿足了,程序才能繼續向后執行。假設數據庫鏈接失敗,則初始化工作失敗,就要abort,柵欄broken,所有線程收到異常退出。
工作量
有10個計算任務,完成6個,就算工作完成。
PPTBYG