我們已經知道,對公共資源進行互斥訪問,可以使用Lock上鎖,或者使用RLock去重入鎖。
但是這些都只是方便於處理簡單的同步現象,我們甚至還不能很合理的去解決使用Lock鎖帶來的死鎖問題。
要解決更復雜的同步問題,就必須考慮別的辦法了。
threading提供的Condition對象提供了對復雜線程同步問題的支持。
Condition被稱為條件變量,除了提供與Lock類似的acquire和release方法外,還提供了wait和notify方法。
使用Condition的主要方式為:
線程首先acquire一個條件變量,然后判斷一些條件。如果條件不滿足則wait;如果條件滿足,進行一些 處理改變條件后,通過notify方法通知其他線程,其他處於wait狀態的線程接到通知后會重新判斷條件。不斷的重復這一過程,從而解決復雜的同步問 題。
下面我們通過很著名的“生產者-消費者”模型來來演示下,在Python中使用Condition實現復雜同步。
生產者和消費者,各一個線程,雙方將會圍繞products來產生同步問題,首先是2個生成者生產products ,而接下來的10個消費者將會消耗products
import threading import time condition = threading.Condition() products = 0 class Producer(threading.Thread): '''生產者''' ix = [0] # 生產者實例個數 # 閉包,必須是數組,不能直接 ix = 0 def __init__(self, ix=0): super().__init__() self.ix[0] += 1 self.setName('生產者' + str(self.ix[0])) def run(self): global condition, products while True: if condition.acquire(): if products < 10: products += 1; print("{}:庫存不足(10-)。我努力生產了1件產品,現在產品總數量 {}".format(self.getName(), products)) condition.notify() else: print("{}:庫存充足(10+)。讓我休息會兒,現在產品總數量 {}".format(self.getName(), products)) condition.wait(); condition.release() time.sleep(2) class Consumer(threading.Thread): '''消費者''' ix = [0] # 消費者實例個數 # 閉包,必須是數組,不能直接 ix = 0 def __init__(self): super().__init__() self.ix[0] += 1 self.setName('消費者' + str(self.ix[0])) def run(self): global condition, products while True: if condition.acquire(): if products > 1: products -= 1 print("{}:我消費了1件產品,現在產品數量 {}".format(self.getName(), products)) condition.notify() else: print("{}:只剩下1件產品,我停止消費。現在產品數量 {}".format(self.getName(), products)) condition.wait(); condition.release() time.sleep(2) if __name__ == "__main__": for i in range(2): p = Producer() p.start() for i in range(10): c = Consumer() c.start()
另外:
Condition對象的構造函數可以接受一個Lock/RLock對象作為參數,
如果沒有指定,則Condition對象會在內部自行創建一個 RLock;
除了notify方法外,Condition對象還提供了notifyAll方法,可以通知waiting池中的所有線程嘗試acquire 內部鎖。
由於上述機制,處於waiting狀態的線程只能通過notify方法喚醒,所以notifyAll的作用在於防止有線程永遠處於沉默狀態。