前言
本章節繼續探討threading
模塊下關於鎖的應用,注意。這一期很重要,依然是圍繞着理論篇來講,這一章節主要圍繞理論篇中的線程切換做講解,因此一定要有一些線程切換方面的知識。
線程安全
線程安全是多線程編程時的計算機程序代碼中的一個概念。在擁有共享數據的多條線程並行執行的程序中,線程安全的代碼會通過同步機制保證各個線程都可以正常且正確的執行,不會出現數據污染等意外情況。
在聊線程安全之前,我還是決定拿之前的那個例子來描述它。
這里關於線程安全的代碼測試我決定不用上面例舉的這個例子,而是用多線程做密集型計算做演示,數值越大效果越明顯:
import threading num = 0 def add(): global num for i in range(10000000): # 一千萬次 num += 1 def sub(): global num for i in range(10000000): # 一千萬次 num -= 1 if __name__ == '__main__': t1 = threading.Thread(target=add,) t2 = threading.Thread(target=sub,) t1.start() t2.start() t1.join() t2.join() print("最終結果:",num) # ==== 執行結果 ==== 三次采集 """ 最終結果: -1472151 最終結果: -2814372 最終結果: -5149396 """
一個加一千萬次,一個減一千萬次,按理來說最后的結果應該是0,但是為什么相差這么大?更加恐怖的是每次的結果都完全不一樣呢?
其實這就是由於線程切換導致出的線程安全問題,因為我們不能精確的知道CPU會在什么時候進行線程的切換,那么如何控制它呢?我們就需要用到鎖,其實通俗點來講你可以這么認為他:
線程安全的目標:
線程之間共同操作一個資源 如何保持資源的同步性
因為線程的切換是由CPU控制的,所以我們要控制它的切換
線程安全的定義:
線程安全:多線程操作時,內部會讓所有線程排隊處理
線程不安全:我們需要一個機制來讓所有線程進行排隊處理
那么什么時候我們要考慮線程安全的問題呢?
多個線程對同一數據源進行寫入操作時
Ps:在CPython的八大基本數據類型中,
list
和dict
本身就是屬於線程安全的容器。
鎖的作用
鎖就提供給我們能夠自行操控線程切換的一種手段,而並非系統自帶的切換機制進行切換。
Lock同步鎖
方法大全
Lock同步(互斥)鎖方法方法大全 | |
---|---|
方法/屬性名稱 | 功能描述 |
acquire(blocking=True, timeout=-1) | 上鎖,在上鎖狀態中的代碼塊運行時不允許切換至其他線程運行。 |
release() | 解鎖,解鎖后系統可以切換至其他線程運行。 |
locked() | 如果獲得了鎖則返回真值。 |
使用方式
被上鎖和解鎖期間的代碼塊執行時,不會切換至其他線程,有一點要注意的是對於
lock
鎖而言,一次acquire()
必須對應一次release()
,不能出現重復的使用兩次acquire()
的操作,這會造成死鎖!注意:同步鎖是一次只能放行一個線程。
import threading num = 0 def add(): lock.acquire() # 上鎖 global num for i in range(10000000): # 一千萬次 num += 1 lock.release() # 解鎖 def sub(): lock.acquire() # 上鎖 global num for i in range(10000000): # 一千萬次 num -= 1 lock.release() # 解鎖 if __name__ == '__main__': lock = threading.Lock() # 實例化同步鎖對象 t1 = threading.Thread(target=add, ) t2 = threading.Thread(target=sub, ) t1.start() t2.start() t1.join() t2.join() print("最終結果:", num) # ==== 執行結果 ==== 三次采集 """ 最終結果: 0 最終結果: 0 最終結果: 0 """

import threading num = 0 def add(): lock.acquire() # 上鎖 lock.acquire() # 死鎖 global num for i in range(10000000): # 一千萬次 num += 1 lock.release() lock.release() def sub(): lock.acquire() # 上鎖 lock.acquire() # 死鎖 global num for i in range(10000000): # 一千萬次 num -= 1 lock.release() lock.release() if __name__ == '__main__': lock = threading.Lock() # 實例化同步鎖對象 t1 = threading.Thread(target=add,) t2 = threading.Thread(target=sub,) t1.start() t2.start() t1.join() t2.join() print("最終結果:",num) # ==== 執行結果 ==== 三次采集 """ 卡住不動了 """
上下文管理
threading.Lock()
對象中實現了__enter__
與__exit__
方法,因此我們可以使用with
語句進行上下文管理式的加鎖。

import threading num = 0 def add(): with lock: # 自動加鎖與解鎖 global num for i in range(10000000): # 一千萬次 num += 1 def sub(): with lock: # 自動加鎖與解鎖 global num for i in range(10000000): # 一千萬次 num -= 1 if __name__ == '__main__': lock = threading.Lock() t1 = threading.Thread(target=add,) t2 = threading.Thread(target=sub,) t1.start() t2.start() t1.join() t2.join() print("最終結果:",num) # ==== 執行結果 ==== 三次采集 """ 最終結果: 0 最終結果: 0 最終結果: 0 """
RLock遞歸鎖
方法大全
Rlock遞歸鎖方法方法大全 | |
---|---|
方法/屬性名稱 | 功能描述 |
acquire(blocking=True, timeout=-1) | 上鎖,在上鎖狀態中的代碼塊運行時不允許切換至其他線程運行。如果已上鎖,遞歸層數增加一層。 |
release() | 解鎖,解鎖后系統可以切換至其他線程運行。 |
使用方式
RLock
遞歸鎖的使用方式與同步鎖相同,唯一的不同點就是可以多次acquire()
,這並不會產生死鎖,但是有幾次acquire()
就應該有相應的幾次release()
,否則依然會造成死鎖!注意:遞歸鎖與同步鎖相同,也是一次只能放行一個線程。並且也支持上下文管理。
import threading num = 0 def add(): lock.acquire() # 上鎖 + 1 lock.acquire() # 上鎖 + 1 global num for i in range(10000000): # 一千萬次 num += 1 lock.release() # 解鎖 - 1 lock.release() # 解鎖 - 1 def sub(): lock.acquire() # 上鎖 lock.acquire() # 死鎖 global num for i in range(10000000): # 一千萬次 num -= 1 lock.release() # 解鎖 - 1 lock.release() # 解鎖 - 1 if __name__ == '__main__': lock = threading.RLock() # 實例化遞歸鎖對象 t1 = threading.Thread(target=add,) t2 = threading.Thread(target=sub,) t1.start() t2.start() t1.join() t2.join() print("最終結果:",num) # ==== 執行結果 ==== 三次采集 """ 最終結果: 0 最終結果: 0 最終結果: 0 """
Condition條件鎖
方法大全
Condition條件鎖方法方法大全 | |
---|---|
方法/屬性名稱 | 功能描述 |
acquire(blocking=True, timeout=-1) | 上鎖,在上鎖狀態中的代碼塊運行時不允許切換至其他線程運行。 |
release() | 解鎖,解鎖后系統可以切換至其他線程運行。 |
wait(timeout=None) | 等待喚醒,此時該線程是處於暫停運行狀態,要么等待通知要么等待到超時時間后繼續執行該線程。 |
wait_for(predicate, timeout=None) | 等待喚醒,直到返回了一個True 。predicate 是一個可調用的對象,其返回值應該是True 或者False 。 |
notify(n=1) | 通知喚醒,可以喚醒多個處於wait() 的線程,默認為1個。 |
notify_all() | 通知喚醒,喚醒所有處於wait() 的線程。 |
使用方式
Condition
條件鎖是在遞歸鎖的基礎上增加了能夠暫停線程運行的功能。並且我們可以使用wait()
與notify()
來控制每個線程執行的個數。注意:條件鎖可以自由設定一次放行幾個線程。
import threading num = 0 def task(): obj = threading.current_thread() print("當前是線程[{0}],已經開始運行了...".format(obj.getName())) cond.acquire() # 上鎖 global num print("當前是線程[{0}],處於等待狀態...".format(obj.getName())) cond.wait() # 暫停,等待喚醒 num += 1 print("當前是線程[{0}],等待狀態結束,繼續運行...".format(obj.getName())) cond.release() # 解鎖 if __name__ == '__main__': cond = threading.Condition() # 實例條件鎖對象 for i in range(10): t1 = threading.Thread(target=task,) # 開啟10條線程 t1.start() # 等待CPU調度執行 while num < 10: task_num = int(input("請輸入你要執行的線程數量:")) cond.acquire() cond.notify(task_num) # 通知喚醒 cond.release() print("最終結果:",num) # ==== 執行結果 ==== """ 當前是線程[Thread-1],已經開始運行了... 當前是線程[Thread-1],處於等待狀態... 當前是線程[Thread-2],已經開始運行了... 當前是線程[Thread-2],處於等待狀態... 當前是線程[Thread-3],已經開始運行了... 當前是線程[Thread-3],處於等待狀態... 當前是線程[Thread-4],已經開始運行了... 當前是線程[Thread-4],處於等待狀態... 當前是線程[Thread-5],已經開始運行了... 當前是線程[Thread-5],處於等待狀態... 當前是線程[Thread-6],已經開始運行了... 當前是線程[Thread-6],處於等待狀態... 當前是線程[Thread-7],已經開始運行了... 當前是線程[Thread-7],處於等待狀態... 當前是線程[Thread-8],已經開始運行了... 當前是線程[Thread-8],處於等待狀態... 當前是線程[Thread-9],已經開始運行了... 當前是線程[Thread-9],處於等待狀態... 當前是線程[Thread-10],已經開始運行了... 當前是線程[Thread-10],處於等待狀態... 請輸入你要執行的線程數量:2 當前是線程[Thread-1],等待狀態結束,繼續運行... 當前是線程[Thread-2],等待狀態結束,繼續運行... 請輸入你要執行的線程數量:3 當前是線程[Thread-3],等待狀態結束,繼續運行... 當前是線程[Thread-5],等待狀態結束,繼續運行... 當前是線程[Thread-4],等待狀態結束,繼續運行... 請輸入你要執行的線程數量:5 當前是線程[Thread-6],等待狀態結束,繼續運行... 當前是線程[Thread-7],等待狀態結束,繼續運行... 當前是線程[Thread-8],等待狀態結束,繼續運行... 當前是線程[Thread-10],等待狀態結束,繼續運行... 當前是線程[Thread-9],等待狀態結束,繼續運行... 請輸入你要執行的線程數量:1 最終結果: 10 """
上下文管理

import threading num = 0 def task(): obj = threading.current_thread() print("當前是線程[{0}],已經開始運行了...".format(obj.getName())) with cond: global num print("當前是線程[{0}],處於等待狀態...".format(obj.getName())) cond.wait() # 暫停,等待喚醒 num += 1 print("當前是線程[{0}],等待狀態結束,繼續運行...".format(obj.getName())) if __name__ == '__main__': cond = threading.Condition() # 實例化遞歸鎖對象 for i in range(10): t1 = threading.Thread(target=task,) # 開啟10條線程 t1.start() # 等待CPU調度執行 while num < 10: task_num = int(input("請輸入你要執行的線程數量:")) with cond: cond.notify(task_num) # 通知喚醒 print("最終結果:",num) # ==== 執行結果 ==== """ 當前是線程[Thread-1],已經開始運行了... 當前是線程[Thread-1],處於等待狀態... 當前是線程[Thread-2],已經開始運行了... 當前是線程[Thread-2],處於等待狀態... 當前是線程[Thread-3],已經開始運行了... 當前是線程[Thread-3],處於等待狀態... 當前是線程[Thread-4],已經開始運行了... 當前是線程[Thread-4],處於等待狀態... 當前是線程[Thread-5],已經開始運行了... 當前是線程[Thread-5],處於等待狀態... 當前是線程[Thread-6],已經開始運行了... 當前是線程[Thread-6],處於等待狀態... 當前是線程[Thread-7],已經開始運行了... 當前是線程[Thread-7],處於等待狀態... 當前是線程[Thread-8],已經開始運行了... 當前是線程[Thread-8],處於等待狀態... 當前是線程[Thread-9],已經開始運行了... 當前是線程[Thread-9],處於等待狀態... 當前是線程[Thread-10],已經開始運行了... 當前是線程[Thread-10],處於等待狀態... 請輸入你要執行的線程數量:2 當前是線程[Thread-1],等待狀態結束,繼續運行... 當前是線程[Thread-2],等待狀態結束,繼續運行... 請輸入你要執行的線程數量:3 當前是線程[Thread-3],等待狀態結束,繼續運行... 當前是線程[Thread-5],等待狀態結束,繼續運行... 當前是線程[Thread-4],等待狀態結束,繼續運行... 請輸入你要執行的線程數量:5 當前是線程[Thread-6],等待狀態結束,繼續運行... 當前是線程[Thread-7],等待狀態結束,繼續運行... 當前是線程[Thread-8],等待狀態結束,繼續運行... 當前是線程[Thread-10],等待狀態結束,繼續運行... 當前是線程[Thread-9],等待狀態結束,繼續運行... 請輸入你要執行的線程數量:1 最終結果: 10 """
Event事件鎖
方法大全
Event事件鎖方法方法大全 | |
---|---|
方法/屬性名稱 | 功能描述 |
is_set() | 用來判斷當前紅綠燈(標志位)的狀態,紅燈為False ,綠燈為True 。 |
set() | 通知所有處於紅燈狀態的線程開始運行,這相當於將標志位改為True 。 |
clear() | 將所有處於綠燈狀態的線程暫停,這相當於將標志位改為False 。 |
wait(timeout=None) | 阻塞當前線程直到被放行,即等待紅綠燈的通知,紅燈停綠燈行。 |
使用方式
這玩意兒是基於
Condition
條件鎖做的,跟條件鎖的區別就在於他是非常干脆利落的。注意:事件鎖只能一次全部放行,相當於紅綠燈一樣,等車的紅燈全停,綠燈全過。
此外,事件鎖不支持上下文管理協議。
import threading def task(): obj = threading.current_thread() print("當前是線程[{0}],已經開始運行了...".format(obj.getName())) print("當前是線程[{0}],紅燈了,停車...".format(obj.getName())) event.wait() # 暫停,等待綠燈通行 print("當前是線程[{0}],綠燈了放行...".format(obj.getName())) print("當前是線程[{0}],卧槽怎么又是紅燈,停車...".format(obj.getName())) event.wait() # 暫停,等待綠燈通行 print("當前是線程[{0}],繼續運行...".format(obj.getName())) if __name__ == '__main__': event = threading.Event() # 實例化事件鎖對象 for i in range(10): t1 = threading.Thread(target=task,) # 開啟10條線程 t1.start() # 等待CPU調度執行 event.set() # 設置為綠燈 針對第一次 event.clear() # 設置為紅燈,如果沒有他那么上面不管wait()多少次都沒用了。因為全都是綠燈 event.set() # 再次設置為綠燈,針對第二次 # ==== 執行結果 ==== """ 當前是線程[Thread-1],已經開始運行了... 當前是線程[Thread-1],紅燈了,停車... 當前是線程[Thread-2],已經開始運行了... 當前是線程[Thread-2],紅燈了,停車... 當前是線程[Thread-3],已經開始運行了... 當前是線程[Thread-3],紅燈了,停車... 當前是線程[Thread-4],已經開始運行了... 當前是線程[Thread-4],紅燈了,停車... 當前是線程[Thread-5],已經開始運行了... 當前是線程[Thread-5],紅燈了,停車... 當前是線程[Thread-6],已經開始運行了... 當前是線程[Thread-6],紅燈了,停車... 當前是線程[Thread-7],已經開始運行了... 當前是線程[Thread-7],紅燈了,停車... 當前是線程[Thread-8],已經開始運行了... 當前是線程[Thread-8],紅燈了,停車... 當前是線程[Thread-9],已經開始運行了... 當前是線程[Thread-9],紅燈了,停車... 當前是線程[Thread-10],已經開始運行了... 當前是線程[Thread-10],紅燈了,停車... 當前是線程[Thread-10],綠燈了放行... 當前是線程[Thread-10],卧槽怎么又是紅燈,停車... 當前是線程[Thread-5],綠燈了放行... 當前是線程[Thread-9],綠燈了放行... 當前是線程[Thread-9],卧槽怎么又是紅燈,停車... 當前是線程[Thread-1],綠燈了放行... 當前是線程[Thread-1],卧槽怎么又是紅燈,停車... 當前是線程[Thread-6],綠燈了放行... 當前是線程[Thread-6],卧槽怎么又是紅燈,停車... 當前是線程[Thread-3],綠燈了放行... 當前是線程[Thread-4],綠燈了放行... 當前是線程[Thread-2],綠燈了放行... 當前是線程[Thread-1],繼續運行... 當前是線程[Thread-5],卧槽怎么又是紅燈,停車... 當前是線程[Thread-3],卧槽怎么又是紅燈,停車... 當前是線程[Thread-7],綠燈了放行... 當前是線程[Thread-7],卧槽怎么又是紅燈,停車... 當前是線程[Thread-7],繼續運行... 當前是線程[Thread-4],卧槽怎么又是紅燈,停車... 當前是線程[Thread-3],繼續運行... 當前是線程[Thread-10],繼續運行... 當前是線程[Thread-5],繼續運行... 當前是線程[Thread-8],綠燈了放行... 當前是線程[Thread-6],繼續運行... 當前是線程[Thread-4],繼續運行... """
Semaphore信號量鎖
方法大全
Semaphore信號量鎖方法方法大全 | |
---|---|
方法/屬性名稱 | 功能描述 |
acquire(blocking=True, timeout=-1) | 上鎖,在上鎖狀態中的代碼塊運行時不允許切換至其他線程運行。 |
release() | 解鎖,解鎖后系統可以切換至其他線程運行。 |
使用方式
這玩意兒是可以規定一次最多跑多少線程的一個東西。也是基於
Condition
條件鎖做的。注意:區分與
Condition
條件鎖的區別,Semaphore
信號量鎖不能自由規定,只能規定一次,而條件鎖可以多次規定。
import threading import time def task(): sema.acquire() time.sleep(1) obj = threading.current_thread() print("當前是線程[{0}],已經開始運行了...".format(obj.getName())) sema.release() if __name__ == '__main__': sema = threading.Semaphore(3) # 實例化信號量鎖對象,代表每次都跑3條。 for i in range(10): t1 = threading.Thread(target=task,) # 開啟10條線程 t1.start() # 等待CPU調度執行 # ==== 執行結果 ==== """ 當前是線程[Thread-1],已經開始運行了... 當前是線程[Thread-3],已經開始運行了... 當前是線程[Thread-2],已經開始運行了... 當前是線程[Thread-4],已經開始運行了... 當前是線程[Thread-6],已經開始運行了... 當前是線程[Thread-5],已經開始運行了... 當前是線程[Thread-7],已經開始運行了... 當前是線程[Thread-9],已經開始運行了... 當前是線程[Thread-8],已經開始運行了... 當前是線程[Thread-10],已經開始運行了... """
上下文管理

import threading import time def task(): with sema: time.sleep(1) obj = threading.current_thread() print("當前是線程[{0}],已經開始運行了...".format(obj.getName())) if __name__ == '__main__': sema = threading.Semaphore(3) # 實例化信號量鎖對象,代表每次都跑3條。 for i in range(10): t1 = threading.Thread(target=task,) # 開啟10條線程 t1.start() # 等待CPU調度執行 # ==== 執行結果 ==== """ 當前是線程[Thread-1],已經開始運行了... 當前是線程[Thread-3],已經開始運行了... 當前是線程[Thread-2],已經開始運行了... 當前是線程[Thread-4],已經開始運行了... 當前是線程[Thread-6],已經開始運行了... 當前是線程[Thread-5],已經開始運行了... 當前是線程[Thread-7],已經開始運行了... 當前是線程[Thread-9],已經開始運行了... 當前是線程[Thread-8],已經開始運行了... 當前是線程[Thread-10],已經開始運行了... """
擴展:練習題
Condition條件鎖的應用
需求:一個空列表,兩個線程輪番往里面加值(一個加偶數,一個加奇數),讓該列表中的值為 1 - 100。
import threading import time li = [] def even(): """加偶數""" with cond: # 加鎖 for i in range(2, 101, 2): if len(li) % 2 != 0: li.append(i) cond.notify() # notify()並不會立即終止當前線程的執行,而是告訴另一線程。你可以走了,不過得等我wait()之后 cond.wait() # 阻塞住,執行另一線程,直到另一線程發送了notify()並且它wait()了之后。 else: cond.wait() li.append(i) cond.notify() else: cond.notify() def odd(): """加奇數""" with cond: for i in range(1, 101, 2): if len(li) %2 == 0: li.append(i) cond.notify() cond.wait() else: cond.notify() if __name__ == '__main__': cond = threading.Condition() t1 = threading.Thread(target=odd) t2 = threading.Thread(target=even) t1.start() t2.start() t1.join() t2.join() print(li)
Event事件鎖的應用
有兩個線程,如何讓他們一人一句對答?文本如下:
杜甫:老李啊,來喝酒!
李白:老杜啊,不喝了我喝不下了!
杜甫:老李啊,再來一壺?
杜甫:...老李?
李白:呼呼呼...睡着了..
import threading def libai(): event.wait() print("李白:老杜啊,不喝了我喝不下了!") event.set() event.clear() event.wait() print("李白:呼呼呼...睡着了..") def dufu(): print("杜甫:老李啊,來喝酒!") event.set() event.clear() event.wait() print("杜甫:老李啊,再來一壺?") print("杜甫:...老李?") event.set() if __name__ == '__main__': event = threading.Event() t1 = threading.Thread(target=libai) t2 = threading.Thread(target=dufu) t1.start() t2.start() t1.join() t2.join()
擴展:鎖的關系淺析
這里我們來聊一聊鎖的關系。
Rlock
遞歸鎖,Condition
條件鎖,Event
事件鎖以及Semaphore
信號量鎖內部都是以同步鎖為基礎的。
RLock
遞歸鎖的實現方式非常簡單,因為內部維護着一個計數器。當計數器不為0的時候該線程不能被I/O
操作和時間輪詢機制切換。但是當計數器為0的時候便不會如此了。
我們可以看一下遞歸鎖的源碼:
def __init__(self): self._block = _allocate_lock() self._owner = None self._count = 0 # 計數器
而Condition
條件鎖的內部其實是有兩把鎖的,一把底層鎖(同步鎖)一把高級鎖(遞歸鎖)。而低層鎖的解鎖方式有兩種,使用wait()
方法會暫時解開底層鎖同時加上一把高級鎖,只有當接收到別的線程里的notfiy()
后才會解開高級鎖和重新上鎖低層鎖.
def __init__(self, lock=None): if lock is None: lock = RLock() # 可以看到條件鎖的內部是基於遞歸鎖,而遞歸鎖又是基於同步鎖來做的 self._lock = lock self.acquire = lock.acquire self.release = lock.release try: self._release_save = lock._release_save except AttributeError: pass try: self._acquire_restore = lock._acquire_restore except AttributeError: pass try: self._is_owned = lock._is_owned except AttributeError: pass self._waiters = _deque()
Event
事件鎖內部是基於條件鎖來做的。
class Event: def __init__(self): self._cond = Condition(Lock()) # 實例化出了一個條件鎖。 self._flag = False def _reset_internal_locks(self): # private! called by Thread._reset_internal_locks by _after_fork() self._cond.__init__(Lock()) def is_set(self): """Return true if and only if the internal flag is true.""" return self._flag isSet = is_set
Semaphore
信號量鎖內部也是基於條件鎖來做的。
class Semaphore: def __init__(self, value=1): if value < 0: raise ValueError("semaphore initial value must be >= 0") self._cond = Condition(Lock()) # 可以看到,這里是實例化出了一個條件鎖 self._value = value