python多線程編程—同步原語入門(鎖Lock、信號量(Bounded)Semaphore)


摘錄python核心編程

一般的,多線程代碼中,總有一些特定的函數或者代碼塊不希望(或不應該)被多個線程同時執行(比如兩個線程運行的順序發生變化,就可能造成代碼的執行軌跡或者行為不相同,或者產生不一致的數據),比如修改數據庫、更新文件或其他會產生競態條件的類似情況。此時就需要同步了。

同步:任意數量的線程可以訪問臨界區的代碼,但在給定的時刻又只有一個線程可以通過時。

這里介紹兩個基本的同步類型原語:鎖/互斥、信號量

鎖有兩種狀態:鎖定和未鎖定。與之對應的是兩個函數:獲得鎖和釋放鎖。

  當多線程爭奪鎖時,允許第一個獲得鎖的線程進入臨界區,並執行代碼;所有之后到達的線程都將被阻塞,直到第一個線程執行結束,退出臨界區,並釋放鎖。此時其他的線程可以獲得鎖並進入臨界區。注意:那些被阻塞的線程是沒有順序的(並不是先到先得),意味着下一個獲得鎖的線程的順序並不是確定的。

mtsleepF.py腳本中派生了隨機數量的線程(沒有使用鎖):

from atexit import register
from random import randrange
from threading import Thread,currentThread
from time import ctime,sleep

#自定義一個集合對象,重寫__str__方法
class CleanOutputSet(set):
    def __str__(self):
        return ', '.join(x for x in self)

#列表生成式   randrange()用於生成一個隨機數,range()返回一個列表
loops = (randrange(2,5) for x in range(randrange(3,7)))
remaining = CleanOutputSet()

def loop(nsec):
    myname = currentThread().name
    remaining.add(myname)
    print('[%s] 開始了 %s' % (ctime(),myname))
    sleep(nsec)
    remaining.remove(myname)
    print('[%s] 結束了 %s (%s second)' % (ctime(),myname,nsec))
    print('  (還存在:%s)' % (remaining or 'NONE'))

def _main():
    #創建3~6個線程,每個線程睡眠2~4秒
    for pause in loops:
        Thread(target = loop,args = (pause,)).start()
#裝飾器,在腳本的最后執行       
@register
def _atexit():
    print('所有的完成於:',ctime())
    
if __name__ == '__main__':
    _main()

正常情況下,執行的結果:

PS C:\Users\WC> python E:\Python3.6.3\workspace\mtsleepF.py
[Mon Apr 16 17:47:31 2018] 開始了 Thread-1
[Mon Apr 16 17:47:31 2018] 開始了 Thread-2
[Mon Apr 16 17:47:31 2018] 開始了 Thread-3
[Mon Apr 16 17:47:31 2018] 開始了 Thread-4
[Mon Apr 16 17:47:33 2018] 結束了 Thread-1 (2 second)
  (還存在:Thread-4, Thread-3, Thread-2)
[Mon Apr 16 17:47:33 2018] 結束了 Thread-2 (2 second)
  (還存在:Thread-4, Thread-3)
[Mon Apr 16 17:47:34 2018] 結束了 Thread-3 (3 second)
  (還存在:Thread-4)
[Mon Apr 16 17:47:34 2018] 結束了 Thread-4 (3 second)
  (還存在:NONE)
所有的完成於: Mon Apr 16 17:47:34 2018

我們多運行幾次,有時會得到下面錯亂的結果:

PS C:\Users\WC> python E:\Python3.6.3\workspace\mtsleepF.py
[Mon Apr 16 17:50:09 2018] 開始了 Thread-1
[Mon Apr 16 17:50:09 2018] 開始了 Thread-2
[Mon Apr 16 17:50:09 2018] 開始了 Thread-3
[Mon Apr 16 17:50:12 2018] 結束了 Thread-3 (3 second)
  (還存在:Thread-2, Thread-1)
[Mon Apr 16 17:50:13 2018] 結束了 Thread-1 (4 second)
[Mon Apr 16 17:50:13 2018] 結束了 Thread-2 (4 second)
  (還存在:NONE)
  (還存在:NONE)
所有的完成於: Mon Apr 16 17:50:13 2018

我們發現輸出存在部分混亂的情況(多個線程可能並行執行IO),還有就是兩個線程修改同一個變量(剩余線程名集合)。IO和訪問相同的數據結構都屬於臨界區,因此需要引入鎖防止多個線程同時進入臨界區。

下面是引入鎖的腳本實例(mtsleepG.py):

# python 3.6
from atexit import register
from random import randrange
from threading import Thread,Lock,currentThread #2.6版本后重命名為current_thread()
from time import ctime,sleep

#自定義一個集合類,重寫—__str__方法,將默認輸出改變為將其所有元素按照逗號分隔的字符串
class CleanOutputSet(set):
    def __str__(self):
        return ', '.join(x for x in self)
#三個全局變量   
lock = Lock()#
loops = (randrange(2,5) for x in range(randrange(3,7)))#隨機數量的線程(3~6個),每個線程暫停2~4秒
remaining = CleanOutputSet()#自定義集合類的實例

def loop(nsec):
    myname = currentThread().name#獲得當前線程的名稱
    lock.acquire()#獲取鎖,阻止其他線程進入到臨界區
    remaining.add(myname)#將線程名添加到集合中
    print('[%s] 開始 %s' % (ctime(),myname))
    lock.release()#釋放鎖
    sleep(nsec)#線程睡眠操作
    lock.acquire()#重新獲得鎖
    remaining.remove(myname)#從集合中刪除當前線程
    print('[%s] 完成 %s (%s secs)' % (ctime(),myname,nsec))
    print('   (remaining: %s )' % (remaining or 'NONE'))
    lock.release()#最后釋放鎖
    
def _main():    #main函數前面添加‘_’是為了不在其他地方使用而導入。_main只能在命令行模式下才能執行
    for pause in loops:
        Thread(target = loop,args = (pause,)).start()   #循環派生並執行每個線程
#裝飾器,注冊_atexit()函數,使得解釋器在腳本退出的時候執行此函數      
@register
def _atexit():
    print('所有線程完成於:',ctime())
    
if __name__ == '__main__':
    _main()

多次執行,結果沒有再出現混亂的情況:

PS C:\Users\WC> python E:\Python3.6.3\workspace\mtsleepG.py
[Tue Apr 17 19:54:31 2018] 開始 Thread-1
[Tue Apr 17 19:54:31 2018] 開始 Thread-2
[Tue Apr 17 19:54:31 2018] 開始 Thread-3
[Tue Apr 17 19:54:31 2018] 開始 Thread-4
[Tue Apr 17 19:54:31 2018] 開始 Thread-5
[Tue Apr 17 19:54:31 2018] 開始 Thread-6
[Tue Apr 17 19:54:33 2018] 完成 Thread-1 (2 secs)
   (remaining: Thread-5, Thread-3, Thread-4, Thread-6, Thread-2 )
[Tue Apr 17 19:54:33 2018] 完成 Thread-5 (2 secs)
   (remaining: Thread-3, Thread-4, Thread-6, Thread-2 )
[Tue Apr 17 19:54:34 2018] 完成 Thread-3 (3 secs)
   (remaining: Thread-4, Thread-6, Thread-2 )
[Tue Apr 17 19:54:34 2018] 完成 Thread-2 (3 secs)
   (remaining: Thread-4, Thread-6 )
[Tue Apr 17 19:54:35 2018] 完成 Thread-4 (4 secs)
   (remaining: Thread-6 )
[Tue Apr 17 19:54:35 2018] 完成 Thread-6 (4 secs)
   (remaining: NONE )
所有線程完成於: Tue Apr 17 19:54:35 2018

信號量

當情況更加復雜的時候,還可以考慮使用信號量這個同步原語來代替鎖。

信號量(Semaphore),是一個計數器,當資源消耗時遞減(調用acquire),計數器會減1;當資源釋放是遞增(調用release),計數器會加1。計數器的值不會小於0;當等於0的時候,再調用acquire會阻塞,直到其他線程調用release為止。可以認為信號量代表他們的資源可用或不可用。

兩個函數簡介如下:

acquire(blocking=布爾值,timeout=None):

  • 本方法用於獲得Semaphore
  • blocking默認值是True,此時,如果內部計數器值大於0,則減一,並返回;如果等於0,則阻塞,等待其他線程調用release()以使計數器加1;本方法返回True,或無線阻塞
  • 如果blocking=False,則不阻塞,如若獲取失敗,則返回False
  • 當設定了timeout的值,最多阻塞timeout秒,如果超時,返回False。

release():

  • 釋放Semaphore,內部計數器加1,可以喚醒等待的線程

BoundedSemaphore正好和Semaphore相反:一個工廠函數,返回一個新的有界信號量對象。有界信號量會確保他的值不會超過初始值;如果超出則會拋出ValueError異常。初始值默認為1。

消耗資源使計數器遞減的操作習慣上成為P(),也稱為wait、try、acquire、pend、procure.

相對的,當一個線程對一個資源完成操作時,該資源需要返回資源池中,這種操作一般稱為V(),也稱為signal、increment、release、post、Vacate。

python簡化了所有的命名,使用和鎖的函數一樣的名字:acquire和release。信號量比鎖更加靈活,因為可以有多個線程,每個線程擁有有限資源的一個實例

下面,我們模仿一個簡化的糖果機:該糖果機中只有5個可用的槽來保持庫存(糖果),如果所有的槽都滿了,糖果就不能再加到這個機器中了;相似的,如果每個槽都空了,消費者就不能再購買到了。我們使用信號量來追蹤這些有限的資源(糖果槽)。

腳本實例candy.py:

#python 3.6
from atexit import register
from random import randrange
from threading import BoundedSemaphore,Lock,Thread#增加了信號量
from time import sleep,ctime
#3個全局變量
lock = Lock()#
MAX = 5 #表示庫存糖果最大值的常量
candytray = BoundedSemaphore(MAX)#‘糖果托盤’,一個信號量
#向庫存中添加糖果。這段代碼是一個臨界區,輸出用戶的行動,並在糖果超過最大庫存的時候給出警告
def refill():
    lock.acquire()
    print('重裝糖果……')
    try:
        candytray.release()
    except ValueError:
        print('滿了,跳過')
    else:
        print('成功')
    lock.release()
#購買糖果。也是一個臨界區,效果和refill函數相反   
def buy():
    lock.acquire()
    print('購買糖果中………')
    #檢查是否所有的資源都已經消費完。
    #計數器的值不能小於0,所以這個調用一般會在計數器再次增加之前被阻塞。傳入非阻塞標志False,讓調用不再阻塞,而在應當阻塞的時候返回一個false,表示沒有更多資源了。
    if candytray.acquire(False): 
        print('成功')
    else:
        print('空,跳過')
    lock.release()
#模擬糖果機的所有者   
def producer(loops):
    for i  in range(loops):
        refill()
        sleep(randrange(3))
#模擬消費者    
def consumer(loops):
    for i in range(loops):
        buy()
        sleep(randrange(3))
#_main表示從命令行執行    
def _main():
    print('開始於:',ctime())
    nloops = randrange(2,6)
    print('糖果機(一共 %s 個槽)' % MAX)
    #創建消費者和所有者線程
    #其中消費者線程中,增加了額外的操作,用於隨機給出正偏差,使得消費者真正消費的糖果數可能會比供應者放入機器的更多;否則代碼永遠不會進入消費者嘗試從空機器購買糖果的情況
    Thread(target = consumer,args = (randrange(nloops,nloops+MAX+2),)).start()#
    Thread(target = producer,args = (nloops,)).start()
#注冊退出函數
@register
def _atexit():
    print('結束於:',ctime())
    
if __name__ == '__main__':
    _main()

執行結果類似:

PS C:\Users\WC> python E:\Python3.6.3\workspace\candy.py
開始於: Wed Apr 18 19:56:19 2018
糖果機(一共 5 個槽)
購買糖果中………
成功
購買糖果中………
成功
重裝糖果……
成功
重裝糖果……
成功
重裝糖果……
滿了,跳過
購買糖果中………
成功
購買糖果中………
成功
購買糖果中………
成功
購買糖果中………
成功
購買糖果中………
成功
購買糖果中………
空,跳過
購買糖果中………
空,跳過
結束於: Wed Apr 18 19:56:27 2018

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM