摘錄python核心編程
一般的,多線程代碼中,總有一些特定的函數或者代碼塊不希望(或不應該)被多個線程同時執行(比如兩個線程運行的順序發生變化,就可能造成代碼的執行軌跡或者行為不相同,或者產生不一致的數據),比如修改數據庫、更新文件或其他會產生競態條件的類似情況。此時就需要同步了。
同步:任意數量的線程可以訪問臨界區的代碼,但在給定的時刻又只有一個線程可以通過時。
這里介紹兩個基本的同步類型原語:鎖/互斥、信號量
鎖
鎖有兩種狀態:鎖定和未鎖定。與之對應的是兩個函數:獲得鎖和釋放鎖。
當多線程爭奪鎖時,允許第一個獲得鎖的線程進入臨界區,並執行代碼;所有之后到達的線程都將被阻塞,直到第一個線程執行結束,退出臨界區,並釋放鎖。此時其他的線程可以獲得鎖並進入臨界區。注意:那些被阻塞的線程是沒有順序的(並不是先到先得),意味着下一個獲得鎖的線程的順序並不是確定的。
mtsleepF.py腳本中派生了隨機數量的線程(沒有使用鎖):
from atexit import register from random import randrange from threading import Thread,currentThread from time import ctime,sleep #自定義一個集合對象,重寫__str__方法 class CleanOutputSet(set): def __str__(self): return ', '.join(x for x in self) #列表生成式 randrange()用於生成一個隨機數,range()返回一個列表 loops = (randrange(2,5) for x in range(randrange(3,7))) remaining = CleanOutputSet() def loop(nsec): myname = currentThread().name remaining.add(myname) print('[%s] 開始了 %s' % (ctime(),myname)) sleep(nsec) remaining.remove(myname) print('[%s] 結束了 %s (%s second)' % (ctime(),myname,nsec)) print(' (還存在:%s)' % (remaining or 'NONE')) def _main(): #創建3~6個線程,每個線程睡眠2~4秒 for pause in loops: Thread(target = loop,args = (pause,)).start() #裝飾器,在腳本的最后執行 @register def _atexit(): print('所有的完成於:',ctime()) if __name__ == '__main__': _main()
正常情況下,執行的結果:
PS C:\Users\WC> python E:\Python3.6.3\workspace\mtsleepF.py [Mon Apr 16 17:47:31 2018] 開始了 Thread-1 [Mon Apr 16 17:47:31 2018] 開始了 Thread-2 [Mon Apr 16 17:47:31 2018] 開始了 Thread-3 [Mon Apr 16 17:47:31 2018] 開始了 Thread-4 [Mon Apr 16 17:47:33 2018] 結束了 Thread-1 (2 second) (還存在:Thread-4, Thread-3, Thread-2) [Mon Apr 16 17:47:33 2018] 結束了 Thread-2 (2 second) (還存在:Thread-4, Thread-3) [Mon Apr 16 17:47:34 2018] 結束了 Thread-3 (3 second) (還存在:Thread-4) [Mon Apr 16 17:47:34 2018] 結束了 Thread-4 (3 second) (還存在:NONE) 所有的完成於: Mon Apr 16 17:47:34 2018
我們多運行幾次,有時會得到下面錯亂的結果:
PS C:\Users\WC> python E:\Python3.6.3\workspace\mtsleepF.py [Mon Apr 16 17:50:09 2018] 開始了 Thread-1 [Mon Apr 16 17:50:09 2018] 開始了 Thread-2 [Mon Apr 16 17:50:09 2018] 開始了 Thread-3 [Mon Apr 16 17:50:12 2018] 結束了 Thread-3 (3 second) (還存在:Thread-2, Thread-1) [Mon Apr 16 17:50:13 2018] 結束了 Thread-1 (4 second) [Mon Apr 16 17:50:13 2018] 結束了 Thread-2 (4 second) (還存在:NONE) (還存在:NONE) 所有的完成於: Mon Apr 16 17:50:13 2018
我們發現輸出存在部分混亂的情況(多個線程可能並行執行IO),還有就是兩個線程修改同一個變量(剩余線程名集合)。IO和訪問相同的數據結構都屬於臨界區,因此需要引入鎖防止多個線程同時進入臨界區。
下面是引入鎖的腳本實例(mtsleepG.py):
# python 3.6 from atexit import register from random import randrange from threading import Thread,Lock,currentThread #2.6版本后重命名為current_thread() from time import ctime,sleep #自定義一個集合類,重寫—__str__方法,將默認輸出改變為將其所有元素按照逗號分隔的字符串 class CleanOutputSet(set): def __str__(self): return ', '.join(x for x in self) #三個全局變量 lock = Lock()#鎖 loops = (randrange(2,5) for x in range(randrange(3,7)))#隨機數量的線程(3~6個),每個線程暫停2~4秒 remaining = CleanOutputSet()#自定義集合類的實例 def loop(nsec): myname = currentThread().name#獲得當前線程的名稱 lock.acquire()#獲取鎖,阻止其他線程進入到臨界區 remaining.add(myname)#將線程名添加到集合中 print('[%s] 開始 %s' % (ctime(),myname)) lock.release()#釋放鎖 sleep(nsec)#線程睡眠操作 lock.acquire()#重新獲得鎖 remaining.remove(myname)#從集合中刪除當前線程 print('[%s] 完成 %s (%s secs)' % (ctime(),myname,nsec)) print(' (remaining: %s )' % (remaining or 'NONE')) lock.release()#最后釋放鎖 def _main(): #main函數前面添加‘_’是為了不在其他地方使用而導入。_main只能在命令行模式下才能執行 for pause in loops: Thread(target = loop,args = (pause,)).start() #循環派生並執行每個線程 #裝飾器,注冊_atexit()函數,使得解釋器在腳本退出的時候執行此函數 @register def _atexit(): print('所有線程完成於:',ctime()) if __name__ == '__main__': _main()
多次執行,結果沒有再出現混亂的情況:
PS C:\Users\WC> python E:\Python3.6.3\workspace\mtsleepG.py [Tue Apr 17 19:54:31 2018] 開始 Thread-1 [Tue Apr 17 19:54:31 2018] 開始 Thread-2 [Tue Apr 17 19:54:31 2018] 開始 Thread-3 [Tue Apr 17 19:54:31 2018] 開始 Thread-4 [Tue Apr 17 19:54:31 2018] 開始 Thread-5 [Tue Apr 17 19:54:31 2018] 開始 Thread-6 [Tue Apr 17 19:54:33 2018] 完成 Thread-1 (2 secs) (remaining: Thread-5, Thread-3, Thread-4, Thread-6, Thread-2 ) [Tue Apr 17 19:54:33 2018] 完成 Thread-5 (2 secs) (remaining: Thread-3, Thread-4, Thread-6, Thread-2 ) [Tue Apr 17 19:54:34 2018] 完成 Thread-3 (3 secs) (remaining: Thread-4, Thread-6, Thread-2 ) [Tue Apr 17 19:54:34 2018] 完成 Thread-2 (3 secs) (remaining: Thread-4, Thread-6 ) [Tue Apr 17 19:54:35 2018] 完成 Thread-4 (4 secs) (remaining: Thread-6 ) [Tue Apr 17 19:54:35 2018] 完成 Thread-6 (4 secs) (remaining: NONE ) 所有線程完成於: Tue Apr 17 19:54:35 2018
信號量
當情況更加復雜的時候,還可以考慮使用信號量這個同步原語來代替鎖。
信號量(Semaphore),是一個計數器,當資源消耗時遞減(調用acquire),計數器會減1;當資源釋放是遞增(調用release),計數器會加1。計數器的值不會小於0;當等於0的時候,再調用acquire會阻塞,直到其他線程調用release為止。可以認為信號量代表他們的資源可用或不可用。
兩個函數簡介如下:
acquire(blocking=布爾值,timeout=None):
- 本方法用於獲得Semaphore
- blocking默認值是True,此時,如果內部計數器值大於0,則減一,並返回;如果等於0,則阻塞,等待其他線程調用release()以使計數器加1;本方法返回True,或無線阻塞
- 如果blocking=False,則不阻塞,如若獲取失敗,則返回False
- 當設定了timeout的值,最多阻塞timeout秒,如果超時,返回False。
release():
- 釋放Semaphore,內部計數器加1,可以喚醒等待的線程
BoundedSemaphore正好和Semaphore相反:一個工廠函數,返回一個新的有界信號量對象。有界信號量會確保他的值不會超過初始值;如果超出則會拋出ValueError異常。初始值默認為1。
消耗資源使計數器遞減的操作習慣上成為P(),也稱為wait、try、acquire、pend、procure.
相對的,當一個線程對一個資源完成操作時,該資源需要返回資源池中,這種操作一般稱為V(),也稱為signal、increment、release、post、Vacate。
python簡化了所有的命名,使用和鎖的函數一樣的名字:acquire和release。信號量比鎖更加靈活,因為可以有多個線程,每個線程擁有有限資源的一個實例
下面,我們模仿一個簡化的糖果機:該糖果機中只有5個可用的槽來保持庫存(糖果),如果所有的槽都滿了,糖果就不能再加到這個機器中了;相似的,如果每個槽都空了,消費者就不能再購買到了。我們使用信號量來追蹤這些有限的資源(糖果槽)。
腳本實例candy.py:
#python 3.6 from atexit import register from random import randrange from threading import BoundedSemaphore,Lock,Thread#增加了信號量 from time import sleep,ctime #3個全局變量 lock = Lock()#鎖 MAX = 5 #表示庫存糖果最大值的常量 candytray = BoundedSemaphore(MAX)#‘糖果托盤’,一個信號量 #向庫存中添加糖果。這段代碼是一個臨界區,輸出用戶的行動,並在糖果超過最大庫存的時候給出警告 def refill(): lock.acquire() print('重裝糖果……') try: candytray.release() except ValueError: print('滿了,跳過') else: print('成功') lock.release() #購買糖果。也是一個臨界區,效果和refill函數相反 def buy(): lock.acquire() print('購買糖果中………') #檢查是否所有的資源都已經消費完。 #計數器的值不能小於0,所以這個調用一般會在計數器再次增加之前被阻塞。傳入非阻塞標志False,讓調用不再阻塞,而在應當阻塞的時候返回一個false,表示沒有更多資源了。 if candytray.acquire(False): print('成功') else: print('空,跳過') lock.release() #模擬糖果機的所有者 def producer(loops): for i in range(loops): refill() sleep(randrange(3)) #模擬消費者 def consumer(loops): for i in range(loops): buy() sleep(randrange(3)) #_main表示從命令行執行 def _main(): print('開始於:',ctime()) nloops = randrange(2,6) print('糖果機(一共 %s 個槽)' % MAX) #創建消費者和所有者線程 #其中消費者線程中,增加了額外的操作,用於隨機給出正偏差,使得消費者真正消費的糖果數可能會比供應者放入機器的更多;否則代碼永遠不會進入消費者嘗試從空機器購買糖果的情況 Thread(target = consumer,args = (randrange(nloops,nloops+MAX+2),)).start()# Thread(target = producer,args = (nloops,)).start() #注冊退出函數 @register def _atexit(): print('結束於:',ctime()) if __name__ == '__main__': _main()
執行結果類似:
PS C:\Users\WC> python E:\Python3.6.3\workspace\candy.py 開始於: Wed Apr 18 19:56:19 2018 糖果機(一共 5 個槽) 購買糖果中……… 成功 購買糖果中……… 成功 重裝糖果…… 成功 重裝糖果…… 成功 重裝糖果…… 滿了,跳過 購買糖果中……… 成功 購買糖果中……… 成功 購買糖果中……… 成功 購買糖果中……… 成功 購買糖果中……… 成功 購買糖果中……… 空,跳過 購買糖果中……… 空,跳過 結束於: Wed Apr 18 19:56:27 2018
