[Python 多線程] Barrier (十一)


 

Barrier

柵欄,也叫屏障。可以想象成路障、道閘。

Python 3.2引入的新功能。

 

構造方法:

threading.Barrier(parties, action=None, timeout=None)

構建Barrier對象,parties 指定參與方數目,timeout是wait方法未指定時超時的默認值。

n_waiting    當前在柵欄中等待的線程數

parties        通過柵欄所需的線程數

wait(timeout=None) 等待通過柵欄,返回0到線程數-1的整數(barrier_id),每個線程返回不同。如果wait方法設置了超時,並超時發送,柵欄將處於broken狀態。

 

例1:

#Barrier 柵欄
import threading,logging
logging.basicConfig(level=logging.INFO,format="[-] %(threadName)s %(message)s")

def work(barrier:threading.Barrier):
    logging.info("n_waiting = {}".format(barrier.n_waiting))   # 等待的線程數
    bid = barrier.wait()   # 參與者的id,返回0到線程數減1的數值
    logging.info("after barrier {}".format(bid))  # 柵欄之后

barrier = threading.Barrier(3) # 3個參與者,每3個開閘放行,0,1,2  4,5,6

for x in range(1,4):  # 所有參數者個數,4,5,6,10,15
    threading.Event().wait(1)
    threading.Thread(target=work,args=(barrier,),name="Barrier-{}".format(x)).start()

運行結果:
[-] Barrier-1 n_waiting = 0
[-] Barrier-2 n_waiting = 1
[-] Barrier-3 n_waiting = 2
[-] Barrier-3 after barrier 2
[-] Barrier-2 after barrier 1
[-] Barrier-1 after barrier 0

  每一個進來就等待,不夠3個就阻塞,直到夠3個就開閘放行。

 

Barrier實例的方法:

broken  檢測柵欄是否處於打破的狀態,返回True或False

abort()  將柵欄置於broken狀態,等待中的線程或者調用等待方法的線程都會拋出threading.BrokenBarrieError異常,直到reset方法來恢復柵欄

reset()  恢復柵欄,重新開始攔截

 

例2:

#Barrier 柵欄
import threading,logging
logging.basicConfig(level=logging.INFO,format="[-] %(threadName)s %(message)s")

def work(barrier:threading.Barrier):
    logging.info("n_waiting = {}".format(barrier.n_waiting))
    try:
        bid = barrier.wait()
        logging.info("after barrier {}".format(bid))
    except threading.BrokenBarrierError:
        logging.info("Broken Barrier in {}".format(threading.current_thread()))

barrier = threading.Barrier(3)

for x in range(1,12): #12個
    if x == 3:
        barrier.abort() #有一個人壞了規矩
    elif x == 6:
        barrier.reset()
    threading.Event().wait(1)
    threading.Thread(target=work,args=(barrier,),name="Barrier-{}".format(x)).start()

運行結果:
[-] Barrier-1 n_waiting = 0 #0,1
[-] Barrier-2 n_waiting = 1
[-] Barrier-2 Broken Barrier in <Thread(Barrier-2, started 3124)>
[-] Barrier-1 Broken Barrier in <Thread(Barrier-1, started 8036)>
[-] Barrier-3 n_waiting = 0
[-] Barrier-3 Broken Barrier in <Thread(Barrier-3, started 7428)>
[-] Barrier-4 n_waiting = 0
[-] Barrier-4 Broken Barrier in <Thread(Barrier-4, started 1828)>
[-] Barrier-5 n_waiting = 0
[-] Barrier-5 Broken Barrier in <Thread(Barrier-5, started 7416)>
[-] Barrier-6 n_waiting = 0 #6,7,8
[-] Barrier-7 n_waiting = 1
[-] Barrier-8 n_waiting = 2
[-] Barrier-8 after barrier 2
[-] Barrier-7 after barrier 1
[-] Barrier-6 after barrier 0
[-] Barrier-9 n_waiting = 0  #9,10,11
[-] Barrier-10 n_waiting = 1
[-] Barrier-11 n_waiting = 2
[-] Barrier-11 after barrier 2
[-] Barrier-9 after barrier 0
[-] Barrier-10 after barrier 1

  一共有12個參與者,依次開始,1和2處於等待狀態,到達第3的時候,進入了broken狀態,則直到第6個,才恢復柵欄,從6開始繼續攔截,達到3個(6,7,8)就放行,9,10,11也放行。

 

例3:

wait方法

#Barrier 柵欄
import threading,logging
logging.basicConfig(level=logging.INFO,format="[-] %(threadName)s %(message)s")

def work(barrier:threading.Barrier,i:int):
    logging.info("n_waiting = {}".format(barrier.n_waiting))
    try:
        if i < 3:
            bid = barrier.wait(1)  #超時1秒就將柵欄置於broken狀態,拋出異常后續語句不會執行
        else:
            if i == 6:
                barrier.reset() #恢復柵欄
            bid = barrier.wait()
        # logging.info("broken status = {}".format(barrier.broken))  #是否處於broken狀態
        logging.info("after barrier {}".format(bid))
    except threading.BrokenBarrierError:
        logging.info("Broken Barrier in {}".format(threading.current_thread()))

barrier = threading.Barrier(3)

for i in range(1,11): #10個
    threading.Event().wait(2) #強制延遲2秒,讓出時間片
    threading.Thread(target=work,args=(barrier,i),name="Barrier-{}".format(i)).start()

運行結果:
[-] Barrier-1 n_waiting = 0
[-] Barrier-1 Broken Barrier in <Thread(Barrier-1, started 3100)>
[-] Barrier-2 n_waiting = 0
[-] Barrier-2 Broken Barrier in <Thread(Barrier-2, started 8836)>
[-] Barrier-3 n_waiting = 0
[-] Barrier-3 Broken Barrier in <Thread(Barrier-3, started 8428)>
[-] Barrier-4 n_waiting = 0
[-] Barrier-4 Broken Barrier in <Thread(Barrier-4, started 1204)>
[-] Barrier-5 n_waiting = 0
[-] Barrier-5 Broken Barrier in <Thread(Barrier-5, started 1556)>
[-] Barrier-6 n_waiting = 0
[-] Barrier-7 n_waiting = 1
[-] Barrier-8 n_waiting = 2
[-] Barrier-8 after barrier 2
[-] Barrier-7 after barrier 1
[-] Barrier-6 after barrier 0
[-] Barrier-9 n_waiting = 0
[-] Barrier-10 n_waiting = 1
阻塞中

  wait方法在等待超時1秒后,就強制將柵欄置於broken狀態,直到第6個的時候才reset恢復,然后6,7,8放行,9,10,繼續阻塞。如果此時有第11個,就會9,10,11放行。

 

應用場景:

並發初始化

所有線程都必須初始化完成后,才能繼續工作,例如運行前加載數據,檢查,如果這些工作沒完成就不能正常工作運行。

10個線程做10種工作准備,每個線程負責一種工作,只有10個線程都完成后,才能繼續工作,先完成的要等待后完成的線程。

例如,啟動一個程序,需要先加載磁盤文件、緩存預熱、初始化連接池等工作,這些工作可以齊頭並進,不過只有都滿足了,程序才能繼續向后執行。假設數據庫鏈接失敗,則初始化工作失敗,就要abort,柵欄broken,所有線程收到異常退出。

 

工作量

有10個計算任務,完成6個,就算工作完成。

 

 

 

PPTBYG

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM