Python內置庫:threading(多線程操作)


Python的線程操作在舊版本中使用的是thread模塊,在Python27和Python3中引入了threading模塊,同時thread模塊在Python3中改名為_thread模塊,threading模塊相較於thread模塊,對於線程的操作更加的豐富,而且threading模塊本身也是相當於對thread模塊的進一步封裝而成,thread模塊有的功能threading模塊也都有,所以涉及到對線程的操作,推薦使用threading模塊。

threading模塊中包含了關於線程操作的豐富功能,包括:常用線程函數,線程對象,鎖對象,遞歸鎖對象,事件對象,條件變量對象,信號量對象,定時器對象,柵欄對象。

注:本文使用的Python版本是Python3.6

 

 一、with語法

這個模塊中所有帶有acquire()和release()方法的對象,都可以使用with語句。當進入with語句塊時,acquire()方法被自動調用,當離開with語句塊時,release()語句塊被自動調用。包括Lock、RLock、Condition、Semaphore。

以下語句:

with some_lock:
    # do something
    pass

 

相當於:

some_lock.acquire()
try:
    # do something
    pass
finally:
    some_lock.release()

 

 

二、threading函數

在Python3中方法名和函數名統一成了以字母小寫加下划線的命令方式,但是Python2.x中threading模塊的某些以駝峰命名的方法和函數仍然可用,如threading.active_count()和threading.activeCount()是一樣的。

通常情況下,Python程序啟動時,Python解釋器會啟動一個繼承自threading.Thread的threading._MainThread線程對象作為主線程,所以涉及到threading.Thread的方法和函數時通常都算上了這個主線程的,比如在啟動程序時打印threading.active_count()的結果就已經是1了。

  • threading.active_count():返回當前存活的threading.Thread線程對象數量,等同於len(threading.enumerate())。
  • threading.current_thread():返回此函數的調用者控制的threading.Thread線程對象。如果當前調用者控制的線程不是通過threading.Thread創建的,則返回一個功能受限的虛擬線程對象。
  • threading.get_ident():返回當前線程的線程標識符。注意當一個線程退出時,它的線程標識符可能會被之后新創建的線程復用。
  • threading.enumerate():返回當前存活的threading.Thread線程對象列表。
  • threading.main_thread():返回主線程對象,通常情況下,就是程序啟動時Python解釋器創建的threading._MainThread線程對象。
  • threading.stack_size([size]):返回創建線程時使用的堆棧大小。也可以使用可選參數size指定之后創建線程時的堆棧大小,size可以是0或者一個不小於32KiB的正整數。如果參數沒有指定,則默認為0。如果系統或者其他原因不支持改變堆棧大小,則會報RuntimeError錯誤;如果指定的堆棧大小不合法,則會報ValueError,但並不會修改這個堆棧的大小。32KiB是保證能解釋器運行的最小堆棧大小,當然這個值會因為系統或者其他原因有限制,比如它要求的值是大於32KiB的某個值,只需根據要求修改即可。

 

三、threading常量

threading.TIMEOUT_MAX:指定阻塞函數(如Lock.acquire()、RLock.acquire()、Condition.wait()等)中參數timeout的最大值,在給這些阻塞函數傳參時如果超過了這個指定的最大值會拋出OverflowError錯誤。

 

四、線程對象:threading.Thread

threading.Thread目前還沒有優先級和線程組的功能,而且創建的線程也不能被銷毀、停止、暫定、恢復或中斷。

守護線程:只有所有守護線程都結束,整個Python程序才會退出,但並不是說Python程序會等待守護線程運行完畢,相反,當程序退出時,如果還有守護線程在運行,程序會去強制終結所有守護線程,當守所有護線程都終結后,程序才會真正退出。可以通過修改daemon屬性或者初始化線程時指定daemon參數來指定某個線程為守護線程。

非守護線程:一般創建的線程默認就是非守護線程,包括主線程也是,即在Python程序退出時,如果還有非守護線程在運行,程序會等待直到所有非守護線程都結束后才會退出。

注:守護線程會在程序關閉時突然關閉(如果守護線程在程序關閉時還在運行),它們占用的資源可能沒有被正確釋放,比如正在修改文檔內容等,需要謹慎使用。

threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

如果這個類的初始化方法被重寫,請確保在重寫的初始化方法中做任何事之前先調用threading.Thread類的__init__方法。

    • group:應該設為None,即不用設置,使用默認值就好,因為這個參數是為了以后實現ThreadGroup類而保留的。
    • target:在run方法中調用的可調用對象,即需要開啟線程的可調用對象,比如函數或方法。
    • name:線程名稱,默認為“Thread-N”形式的名稱,N為較小的十進制數。
    • args:在參數target中傳入的可調用對象的參數元組,默認為空元組()。
    • kwargs:在參數target中傳入的可調用對象的關鍵字參數字典,默認為空字典{}。
    • daemon:默認為None,即繼承當前調用者線程(即開啟線程的線程,一般就是主線程)的守護模式屬性,如果不為None,則無論該線程是否為守護模式,都會被設置為“守護模式”。
  • start():開啟線程活動。它將使得run()方法在一個獨立的控制線程中被調用,需要注意的是同一個線程對象的start()方法只能被調用一次,如果調用多次,則會報RuntimeError錯誤。
  • run():此方法代表線程活動。
  • join(timeout=None):讓當前調用者線程(即開啟線程的線程,一般就是主線程)等待,直到線程結束(無論它是什么原因結束的),timeout參數是以秒為單位的浮點數,用於設置操作超時的時間,返回值為None。如果想要判斷線程是否超時,只能通過線程的is_alive方法來進行判斷。join方法可以被調用多次。如果對當前線程使用join方法(即線程在內部調用自己的join方法),或者在線程沒有開始前使用join方法,都會報RuntimeError錯誤。
  • name:線程的名稱字符串,並沒有什么實際含義,多個線程可以賦予相同的名稱,初始值由初始化方法來設置。
  • ident:線程的標識符,如果線程還沒有啟動,則為None。ident是一個非零整數,參見threading.get_ident()函數。當線程結束后,它的ident可能被其他新創建的線程復用,當然就算該線程結束了,它的ident依舊是可用的。
  • is_alive():線程是否存活,返回True或者False。在線程的run()運行之后直到run()結束,該方法返回True。
  • daemon:表示該線程是否是守護線程,True或者False。設置一個線程的daemon必須在線程的start()方法之前,否則會報RuntimeError錯誤。這個值默認繼承自創建它的線程,主線程默認是非守護線程的,所以在主線程中創建的線程默認都是非守護線程的,即daemon=False。

使用threading.Thread類創建線程簡單示例:

"""
通過實例化threading.Thread類創建線程
"""
import time
import threading


def test_thread(para='hi', sleep=3):
    """線程運行函數"""
    time.sleep(sleep)
    print(para)


def main():
    # 創建線程
    thread_hi = threading.Thread(target=test_thread)
    thread_hello = threading.Thread(target=test_thread, args=('hello', 1))
    # 啟動線程
    thread_hi.start()
    thread_hello.start()
    print('Main thread has ended!')


if __name__ == '__main__':
    main()
Main thread has ended!
hello
hi

使用threading.Thread類的子類創建線程簡單示例:

"""
通過繼承threading.Thread的子類創建線程
"""
import time
import threading


class TestThread(threading.Thread):
    def __init__(self, para='hi', sleep=3):
        # 重寫threading.Thread的__init__方法時,確保在所有操作之前先調用threading.Thread.__init__方法
        super().__init__()
        self.para = para
        self.sleep = sleep

    def run(self):
        """線程內容"""
        time.sleep(self.sleep)
        print(self.para)


def main():
    # 創建線程
    thread_hi = TestThread()
    thread_hello = TestThread('hello', 1)
    # 啟動線程
    thread_hi.start()
    thread_hello.start()
    print('Main thread has ended!')


if __name__ == '__main__':
    main()
Main thread has ended!
hello
hi

join方法簡單示例:

"""
使用join方法阻塞主線程
"""
import time
import threading


def test_thread(para='hi', sleep=5):
    """線程運行函數"""
    time.sleep(sleep)
    print(para)


def main():
    # 創建線程
    thread_hi = threading.Thread(target=test_thread)
    thread_hello = threading.Thread(target=test_thread, args=('hello', 1))
    # 啟動線程
    thread_hi.start()
    thread_hello.start()
    time.sleep(2)
    print('馬上執行join方法了')
    # 執行join方法會阻塞調用線程(主線程),直到調用join方法的線程(thread_hi)結束
    thread_hi.join()
    print('線程thread_hi已結束')
    # 這里不會阻塞主線程,因為運行到這里的時候,線程thread_hello已經運行結束了
    thread_hello.join()
    print('Main thread has ended!')

    # 以上代碼只是為了展示join方法的效果
    # 如果想要等所有線程都運行完成后再做其他操作,可以使用for循環
    # for thd in (thread_hi, thread_hello):
    #     thd.join()
    #
    # print('所有線程執行結束后的其他操作')


if __name__ == '__main__':
    main()
hello
馬上執行join方法了
hi
線程thread_hi已結束
Main thread has ended!

 

五、鎖對象:threading.Lock

threading.Lock是直接通過_thread模塊擴展實現的。

當鎖在被鎖定時,它並不屬於某一個特定的線程。

鎖只有“鎖定”和“非鎖定”兩種狀態,當鎖被創建時,是處於“非鎖定”狀態的。當鎖已經被鎖定時,其他線程再次調用acquire()方法會被阻塞執行,直到鎖被獲得鎖的線程調用release()方法釋放掉鎖並將其狀態改為“非鎖定”。

同一個線程獲取鎖后,如果在釋放鎖之前再次獲取鎖會導致當前線程阻塞,除非有另外的線程來釋放鎖,如果只有一個線程,並且發生了這種情況,會導致這個線程一直阻塞下去,即形成了死鎖。所以在獲取鎖時需要保證鎖已經被釋放掉了,或者使用遞歸鎖來解決這種情況。

  • acquire(blocking=True, timeout=-1):獲取鎖,並將鎖的狀態改為“鎖定”,成功返回True,失敗返回False。當一個線程獲得鎖時,會阻塞其他嘗試獲取鎖的線程,直到這個鎖被釋放掉。timeout默認值為-1,即將無限阻塞等待直到獲得鎖,如果設為其他的值時(單位為秒的浮點數),將最多阻塞等待timeout指定的秒數。當blocking為False時,timeout參數被忽略,即沒有獲得鎖也不進行阻塞。
  • release():釋放一個鎖,並將其狀態改為“非鎖定”,需要注意的是任何線程都可以釋放鎖,不只是獲得鎖的線程(因為鎖不屬於特定的線程)。release()方法只能在鎖處於“鎖定”狀態時調用,如果在“非鎖定”狀態時調用則會報RuntimeError錯誤。

使用鎖實現線程同步的簡單示例:

"""
使用鎖實現線程同步
"""
import time
import threading

# 創建鎖
lock = threading.Lock()

# 全局變量
global_resource = [None] * 5


def change_resource(para, sleep):
    # 請求鎖
    lock.acquire()

    # 這段代碼如果不加鎖,第一個線程運行結束后global_resource中是亂的,輸出為:修改全局變量為: ['hello', 'hi', 'hi', 'hello', 'hello']
    # 第二個線程運行結束后,global_resource中還是亂的,輸出為:修改全局變量為: ['hello', 'hi', 'hi', 'hi', 'hi']
    global global_resource
    for i in range(len(global_resource)):
        global_resource[i] = para
        time.sleep(sleep)
    print("修改全局變量為:", global_resource)

    # 釋放鎖
    lock.release()


def main():
    thread_hi = threading.Thread(target=change_resource, args=('hi', 2))
    thread_hello = threading.Thread(target=change_resource, args=('hello', 1))
    thread_hi.start()
    thread_hello.start()


if __name__ == '__main__':
    main()
修改全局變量為: ['hi', 'hi', 'hi', 'hi', 'hi']
修改全局變量為: ['hello', 'hello', 'hello', 'hello', 'hello']

 

六、遞歸鎖對象:threading.RLock

遞歸鎖和普通鎖的差別在於加入了“所屬線程”和“遞歸等級”的概念,釋放鎖必須有獲取鎖的線程來進行釋放,同時,同一個線程在釋放鎖之前再次獲取鎖將不會阻塞當前線程,只是在鎖的遞歸等級上加了1(獲得鎖時的初始遞歸等級為1)。

使用普通鎖時,對於一些可能造成死鎖的情況,可以考慮使用遞歸鎖來解決。

  • acquire(blocking=True, timeout=-1):與普通鎖的不同之處在於:當使用默認值時,如果這個線程已經擁有鎖,那么鎖的遞歸等級加1。線程獲得鎖時,該鎖的遞歸等級被初始化為1。當多個線程被阻塞時,只有一個線程能在鎖被解時獲得鎖,這種情況下,acquire()是沒有返回值的。
  • release():沒有返回值,調用一次則遞歸等級減1,遞歸等級為零時表示這個線程的鎖已經被釋放掉,其他線程可以獲取鎖了。可能在一個線程中調用了多次acquire(),導致鎖的遞歸等級大於了1,那么就需要調用對應次數的release()來完全釋放鎖,並將它的遞歸等級減到零,其他的線程才能獲取鎖,不然就會一直被阻塞着。

遞歸鎖使用簡單示例:

"""
在普通鎖中可能造成死鎖的情況,可以考慮使用遞歸鎖解決
"""
import time
import threading


# 如果是使用的兩個普通鎖,那么就會造成死鎖的情況,程序一直阻塞而不會退出
# rlock_hi = threading.Lock()
# rlock_hello = threading.Lock()

# 使用成一個遞歸鎖就可以解決當前這種死鎖情況
rlock_hi = rlock_hello = threading.RLock()


def test_thread_hi():
    # 初始時鎖內部的遞歸等級為1
    rlock_hi.acquire()
    print('線程test_thread_hi獲得了鎖rlock_hi')
    time.sleep(2)
    # 如果再次獲取同樣一把鎖,則不會阻塞,只是內部的遞歸等級加1
    rlock_hello.acquire()
    print('線程test_thread_hi獲得了鎖rlock_hello')
    # 釋放一次鎖,內部遞歸等級減1
    rlock_hello.release()
    # 這里再次減,當遞歸等級為0時,其他線程才可獲取到此鎖
    rlock_hi.release()


def test_thread_hello():
    rlock_hello.acquire()
    print('線程test_thread_hello獲得了鎖rlock_hello')
    time.sleep(2)
    rlock_hi.acquire()
    print('線程test_thread_hello獲得了鎖rlock_hi')
    rlock_hi.release()
    rlock_hello.release()


def main():
    thread_hi = threading.Thread(target=test_thread_hi)
    thread_hello = threading.Thread(target=test_thread_hello)
    thread_hi.start()
    thread_hello.start()


if __name__ == '__main__':
    main()
線程test_thread_hi獲得了鎖rlock_hi
線程test_thread_hi獲得了鎖rlock_hello
線程test_thread_hello獲得了鎖rlock_hello
線程test_thread_hello獲得了鎖rlock_hi

 

 

七、條件變量對象:threading.Condition

它的wait()方法釋放鎖,並阻塞程序直到其他線程調用notify()或者notify_all()方法喚醒,然后wait()方法重新獲取鎖,這個方法也可以指定timeout超時時間。
它的notify()方法喚醒一個正在等待的線程,notify_all()則是喚醒所有正在等待的線程。notify()或者notify_all()並不會釋放鎖,所以被喚醒的線程並不會立即從它們的wait()方法出返回並執行,只有在調用了notify()或者notify_all()方法的線程放棄了鎖的所有權后才會返回對應的線程並執行,即先通知再釋放鎖。

threading.Condition(lock=None):一個條件變量對象允許一個或多個線程等待,直到被另一個線程通知。lock參數必須是一個Lock對象或者RLock對象,並且會作為底層鎖使用,默認使用RLock。

  • acquire(*args):請求底層鎖。此方法調用底層鎖對應的方法和返回對應方法的返回值。
  • release():釋放底層鎖。此方法調用底層所對應的方法,沒有返回值。
  • wait(timeout=None):釋放鎖,等待直到被通知(再獲取鎖)或者發生超時事件。如果線程在調用此方法時本身並沒有鎖(即線程首先得有鎖),則會報RuntimeError錯誤。這個方法釋放底層鎖,然后阻塞線程,直到另一個線程中的同一個條件變量使用notify()或notify_all()喚醒,或者超時事件發生,一旦被喚醒或者超時,則會重新去獲取鎖並返回(成功返回True,否則返回False)。timeout參數為浮點類型的秒數。在RLock中使用一次release方法,可能並不能釋放鎖,因為鎖可能被acquire()了多次,但是在條件變量對象中,它調用了RLock類的內部方法,可以一次就完全釋放鎖,重新獲取鎖時也會重置鎖的遞歸等級。
  • wait_for(predicate, timeout=None):與wait方法相似,等待,直到條件計算為True,返回最后一次的predicate的返回值。predicate參數為一個返回值為布爾值的可調用對象。調用此方法的時候會先調用predicate對象,如果返回的就是True,則不會釋放鎖,直接往后執行。另一個線程通知后,在它釋放鎖時,才會觸發wait_for方法等待事件,這時如果predicate結果為True,則嘗試獲取鎖,獲取成功后則繼續往后執行,如果為False,則會一直阻塞下去。此方法如果忽略timeout參數,就相當於:while not predicate(): condition_lock.wait()。
  • notify(n=1):喚醒一個等待這個條件的線程,如果調用這個方法的線程在沒有獲得鎖的情況下調用這個方法,會報RuntimeError錯誤。默認喚醒一個線程,可以通過參數n設置喚醒n個正在等待這個條件變量的線程,如果沒有線程在等待,調用這個方法不會發生任何事。如果等待的線程中正好有n個線程,那么這個方法可以准確的喚醒這n個線程,但是等待的線程超過指定的n個,有時候可能會喚醒超過n個的線程,所以依賴參數n是不安全的行為。
  • notify_all():喚醒所有等待這個條件的線程。這個方法與notify()不同之處在於它喚醒所有線程,而不是特定n個。

條件變量簡單示例:

"""
讓一個線程等待,直到另一個線程通知
"""
import time
import threading


# 創建條件變量對象
condition_lock = threading.Condition()

PRE = 0


# predicate可調用函數
def pre():
    print(PRE)
    return PRE


def test_thread_hi():
    # 在使用wait/wait_for之前必須先獲得鎖
    condition_lock.acquire()

    print('等待線程test_thread_hello的通知')
    # 先執行一次pre,返回False后釋放掉鎖,等另一個線程釋放掉鎖后再次執行pre,返回True后再次獲取鎖
    # wait_for的返回值不是True和False,而是predicate參數的返回值
    condition_lock.wait_for(pre)
    # condition_lock.wait()
    print('繼續執行')

    # 不要忘記使用wait/wait_for之后要釋放鎖
    condition_lock.release()


def test_thread_hello():
    time.sleep(1)
    condition_lock.acquire()

    global PRE
    PRE = 1
    print('修改PRE值為1')

    print('通知線程test_thread_hi可以准備獲取鎖了')
    condition_lock.notify()
    
    # 先notify/notify_all之后在釋放鎖
    condition_lock.release()
    print('你獲取鎖吧')


def main():
    thread_hi = threading.Thread(target=test_thread_hi)
    thread_hello = threading.Thread(target=test_thread_hello)
    thread_hi.start()
    thread_hello.start()


if __name__ == '__main__':
    main()
等待線程test_thread_hello的通知
0
修改PRE值為1
通知線程test_thread_hi可以准備獲取鎖了
你獲取鎖吧
1
繼續執行

 

 

八、信號量對象:threading.Semaphore

一個信號量管理一個內部計數器,acquire()方法會減少計數器,release()方法則增加計數器,計數器的值永遠不會小於零,當調用acquire()時,如果發現該計數器為零,則阻塞線程,直到調用release()方法使計數器增加。

threading.Semaphore(value=1):value參數默認值為1,如果指定的值小於0,則會報ValueError錯誤。一個信號量對象管理一個原子性的計數器,代表release()方法調用的次數減去acquire()方法的調用次數,再加上一個初始值。

  • acquire(blocking=True, timeout=None):默認情況下,在進入時,如果計數器大於0,則減1並返回True,如果等於0,則阻塞直到使用release()方法喚醒,然后減1並返回True。被喚醒的線程順序是不確定的。如果blocking設置為False,調用這個方法將不會發生阻塞。timeout用於設置超時的時間,在timeout秒的時間內沒有獲取到信號量,則返回False,否則返回True。
  • release():釋放一個信號量,將內部計數器增加1。當計數器的值為0,且有其他線程正在等待它大於0時,喚醒這個線程。

信號量對象簡單示例:

"""
通過信號量對象管理一次性運行的線程數量
"""
import time
import threading

# 創建信號量對象,初始化計數器值為3
semaphore3 = threading.Semaphore(3)


def thread_semaphore(index):
    # 信號量計數器減1
    semaphore3.acquire()
    time.sleep(2)
    print('thread_%s is running...' % index)
    # 信號量計數器加1
    semaphore3.release()


def main():
    # 雖然會有9個線程運行,但是通過信號量控制同時只能有3個線程運行
    # 第4個線程啟動時,調用acquire發現計數器為0了,所以就會阻塞等待計數器大於0的時候
    for index in range(9):
        threading.Thread(target=thread_semaphore, args=(index, )).start()


if __name__ == '__main__':
    main()

 

 

九、事件對象:threading.Event

一個事件對象管理一個內部標志,初始狀態默認為False,set()方法可將它設置為True,clear()方法可將它設置為False,wait()方法將線程阻塞直到內部標志的值為True。

如果一個或多個線程需要知道另一個線程的某個狀態才能進行下一步的操作,就可以使用線程的event事件對象來處理。

  • is_set():當內部標志為True時返回True。
  • set():設置內部標志為True。此時所有等待中的線程將被喚醒,調用wait()方法的線程將不會被阻塞。
  • clear():將內部標志設置為False。所有調用wait()方法的線程將被阻塞,直到調用set()方法將內部標志設置為True。
  • wait(timeout=None):阻塞線程直到內部標志為True,或者發生超時事件。如果調用時內部標志就是True,那么不會被阻塞,否則將被阻塞。timeout為浮點類型的秒數。

事件對象簡單示例:

"""
事件對象使用實例
"""
import time
import threading

# 創建事件對象,內部標志默認為False
event = threading.Event()


def student_exam(student_id):
    print('學生%s等監考老師發卷。。。' % student_id)
    event.wait()
    print('開始考試了!')


def invigilate_teacher():
    time.sleep(5)
    print('考試時間到,學生們可以開始考試了!')
    # 設置內部標志為True,並喚醒所有等待的線程
    event.set()


def main():
    for student_id in range(3):
        threading.Thread(target=student_exam, args=(student_id, )).start()

    threading.Thread(target=invigilate_teacher).start()


if __name__ == '__main__':
    main()
學生0等監考老師發卷。。。
學生1等監考老師發卷。。。
學生2等監考老師發卷。。。
考試時間到,學生們可以開始考試了!
開始考試了!
開始考試了!
開始考試了!

 

 

十、定時器對象:threading.Timer

表示一個操作需要在等待一定時間之后執行,相當於一個定時器。Timer類是threading.Thread的子類,所以它可以像一個自定義線程一樣工作。
和線程一樣,可以通過start()方法啟動定時器,在定時器計時結束之前(線程開啟之前)可以使用cancel()方法停止計時器。計時器等待的時間可能與用戶設置的時間不完全一樣。

threading.Timer(interval, function, args=None, kwargs=None)

  • interval:間隔時間,即定時器秒數。
  • function:執行的函數。
  • args:傳入function的參數,如果為None,則會傳入一個空列表。
  • kwargs:傳入function的關鍵字參數,如果為None,則會傳入一個空字典。

cancel():停止計時器,並取消對應函數的執行,這個方法只有在計時器還沒有計時結束之前才會生效,如果已經開始執行函數,則不會生效。

 

十一、柵欄對象:threading.Barrier

柵欄對象用於一個固定數量的線程,而這些線程需要等待彼此的情況。這些線程中的每個線程都會嘗試調用wait()方法,然后阻塞,直到所有線程都調用了wait()方法,然后所有線程會被同時釋放。

threading.Barrier(parties, action=None, timeout=None)

    • parties:指定需要創建的柵欄對象的線程數。
    • action:一個可調用對象,當所有線程都被釋放時,在釋放前,其中一個線程(隨機)會自動調用這個對象。
    • timeout:設置wait()方法的超時時間。
  • wait(timeout=None):通過柵欄。當所有線程都調用了這個方法,則所有線程會被同時釋放。如果提供了timeout參數,那么此參數是優先於初始化方法中的timeout參數的。返回值為range(parties)中的一個整數,每個線程的返回值都不同。如果提供了action參數,那么在所有線程釋放前,其中一個線程(隨機)會調用這個action參數對象,並且如果這個action調用發生了異常,則柵欄對象將進入破損狀態。如果wait()方法發生了超時事件,那么柵欄對象也將進入破損狀態。如果柵欄對象進入破損狀態或者重置柵欄對象時仍有線程在等待釋放,則會報BrokenBarrierError異常。
  • reset():將一個柵欄對象重置為默認的初始態。如果此時有任何線程正在等待釋放,那么將會報BrokenBarrierError異常。如果barrier中有線程的狀態是未知的,那么可能需要外部的某種同步來確保線程已被釋放。如果柵欄對象已經破損,那么最好是丟棄它並重新創建一個新的柵欄對象。
  • abort():使柵欄對象進入破損狀態。這將導致所有已經調用和未調用的wait()方法引發BrokenBarrierError異常。比如,需要放棄一個線程,但又不想引發死鎖的情況,就可以調用這個方法。一個更好的辦法是提供一個合理的timeout參數值,來自動避免某個線程出錯。
  • parties:通過柵欄的線程數量。
  • n_waiting:在柵欄中正在等待的線程數量。
  • broken:如果柵欄對象為破損狀態則返回True。

柵欄對象簡單示例:

"""
柵欄對象使用示例
"""
import time
import threading


def test_action():
    print('所有柵欄線程釋放前調用此函數!')


# 創建線程數為3的柵欄對象,當“攔住”3個線程的wait后放行,然后又繼續“攔”(如果有的話)
barrier = threading.Barrier(3, test_action)


def barrier_thread(sleep):
    time.sleep(sleep)
    print('barrier thread-%s wait...' % sleep)
    # 阻塞線程,直到阻塞線程數達到柵欄指定數量
    barrier.wait()
    print('barrier thread-%s end!' % sleep)


def main():
    # 這里開啟了6個線程,則一次會攔截3個
    for sleep in range(6):
        threading.Thread(target=barrier_thread, args=(sleep, )).start()


if __name__ == '__main__':
    main()

 

barrier thread-0 wait...
barrier thread-1 wait...
barrier thread-2 wait...
所有柵欄線程釋放前調用此函數!
barrier thread-2 end!
barrier thread-0 end!
barrier thread-1 end!
barrier thread-3 wait...
barrier thread-4 wait...
barrier thread-5 wait...
所有柵欄線程釋放前調用此函數!
barrier thread-5 end!
barrier thread-3 end!
barrier thread-4 end!

 

 

十二、多線程與多進程的理解

在網上查多線程資料的時候,很多文章講到了對Python的多線程與多進程的理解,包括相同之處和它們之間的區別,我就整理了一些點放在這里:

進程ID:多線程的主進程和它的子線程的進程ID,即os.getpid(),都是相同的,都是主進程的進程ID。多進程則是主進程和它的子進程都有各自的進程ID,都不相同。

共享數據:多線程可以共享主進程內的數據,但是多進程用的都是各自的數據,無法共享。

主線程:由Python解釋器運行主py時,也就是開啟了一個Python進程,而這個py是這個進程內的一個線程,不過不同於其他線程,它是主線程,同時這個進程內還有其他的比如垃圾回收等解釋器級別的線程,所以進程就等於主線程這種理解是有誤的。

CPU多核利用:Python解釋器的線程只能在CPU單核上運行,開銷小,但是這也是缺點,因為沒有利用CPU多核的特點。Python的多進程是可以利用多個CPU核心的,但也有其他語言的多線程是可以利用多核的。

單核與多核:一個CPU的主要作用是用來做計算的,多個CPU核心如果都用來做計算,那么效率肯定會提高很多,但是對於IO來說,多個CPU核心也沒有太大用處,因為沒有輸入,后面的動作也無法執行。所以如果一個程序是計算密集型的,那么就該利用多核的優勢(比如使用Python的多進程),如果是IO密集型的,那么使用單核的多線程就完全夠了。

線程或進程間的切換:線程間的切換是要快於進程間的切換的。

死鎖:指的是兩個或兩個以上的線程或進程在請求鎖的時候形成了互相等待阻塞的情況,導致這些線程或進程無法繼續執行下去,這時候稱系統處於死鎖狀態或者系統產生了死鎖,這些線程或進程就稱為死鎖線程或死鎖進程。解決死鎖的辦法可以使用遞歸鎖,即threading.RLock,然后線程或進程就可以隨意請求和釋放鎖了,而不用擔心別的線程或進程也在請求鎖而產生死鎖的情況。

信號量與進程池:進程池Pool(n)只能是“池”中的n個進程運行,不能有新的進程,信號量只要保證最大線程數就行,而不是只有這幾個線程,舊的線程運行結束,就可以繼續來新的線程。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM