一、Lock鎖
- 凡是存在共享資源爭搶的地方都可以使用鎖,從而保證只有一個使用者可以完全使用這個資源一旦線程獲得鎖,其他試圖獲取鎖的線程將被阻塞
- acquire(blocking=True,timeout=-1): 默認阻塞,阻塞可以設置超時時間,非阻塞時,timeout禁止設置,成功獲取鎖,返回True,否則返回False
- releas() : 釋放鎖,可以從任何線程調用釋放,已上鎖的鎖,會被重置為unlocked未上鎖的鎖上調用,拋出RuntimeError異常
例如: 訂單要求生成1000個杯子,組織10個工人生產 舉例1: import threading from threading import Thread, Lock import time import logging FORAMT = '%(asctime)s %(threadName)s %(thread)d %(message)s' logging.basicConfig(format=FORAMT, level=logging.INFO) cups = [] def worker(count=10): logging.info("I'm working for u") flag = False while True: if len(cups) >= count: flag = True time.sleep(0.001) # 為了看出現場切換效果 if not flag: cups.append(1) if flag: break logging.info("{} finished.cups= {}".format(threading.current_thread().name, len(cups))) for _ in range(10): # 開啟10個線程 Thread(target=worker, args=(1000,)).start() 從上例的運行結果看出,多線程調度,導致了判斷失效,多生成了杯子 如何修改,加鎖 舉例2: import threading from threading import Thread, Lock import time import logging FORAMT = '%(asctime)s %(threadName)s %(thread)d %(message)s' logging.basicConfig(format=FORAMT, level=logging.INFO) cups = [] lock = Lock() def worker(count=10): logging.info("I'm working for u") flag = False while True: lock.acquire() if len(cups) >= count: flag = True #lock.release() # 這里釋放鎖合適嗎? time.sleep(0.001) # 為了看出現場切換效果 if not flag: cups.append(1) lock.release() # 這里釋放鎖對不對 if flag: break logging.info("{} finished.cups= {}".format(threading.current_thread().name, len(cups))) for _ in range(10): # 開啟10個線程 Thread(target=worker, args=(1000,)).start() 假設第一句lock.release()合適,分析如下: 有一個時刻len(cups),正好是999,flag=True,釋放鎖,線程被打斷,另一個線程判斷發現是 999,flag=True,可能線程被打斷,可能另一個線程判斷也是999,flag也設置為True 這三個線程只要繼續執行到cups.append(1),一定會導致cups的長度超過1000 假設第二句lock.release()合適,分析如下: 在某一時刻len(cups),正好是999,flag=True,其他線程試圖訪問這段代碼的線程都被阻塞獲取 不到鎖,直到當前線程安全的增加了一個數據,然后釋放鎖,其它線程有一個搶到鎖,但發現 已經是1000了,只好break打印退出,所有其他線程接着都退出
二、加鎖、解鎖
- 一般來說加鎖后還要一些代碼實現,在釋放鎖之前還有可能拋異常,一旦出現異常所無法釋放,但是當前線程可能因為這個異常被終止了,這就產生了死鎖
- 加鎖、解鎖常用語句:
- 使用try...finally語句保證鎖的釋放
- with上下文管理,鎖對象支持上下文管理
import threading from threading import Thread, Lock import time class Counter: def __init__(self): self._val = 0 self.__lock = Lock() @property def value(self): with self.__lock: return self._val def inc(self): try: self.__lock.acquire() self._val += 1 finally: self.__lock.release() def dec(self): with self.__lock: self._val -= 1 def run(c:Counter, count=100): for _ in range(count): for i in range(-50,50): if i < 0: c.dec() else: c.inc() c = Counter() c1 = 10 c2 = 100 for i in range(c1): Thread(target=run, args=(c,c2)).start() while True: time.sleep(1) if threading.active_count() == 1: print(threading.enumerate()) print(c.value) else: print(threading.enumerate())
三、鎖的應用場景
- 鎖適用於訪問和修改同一個共享資源的時候,即讀寫同一個資源的時候,如果全部都是讀取同一個共享資源需要鎖嗎,不需要,因為這時可以認為共享資源是不可變的,每一次讀取它都是一樣的值,所有不用加鎖
- 使用鎖的注意事項
- 少用鎖,必要時用鎖,使用了鎖,多線程訪問被鎖的資源時,就成了串行,要么排隊,要么爭搶
- 加鎖時間越短越好,不需要就立即釋放鎖
- 一定要避免死鎖
1、可重入鎖Rlock
- 可重入鎖,是線程相關的鎖,線程A獲得可重復鎖,並可以多次成功獲取,不會阻塞,最后要在線程A中做和acquire次數相同的relea
import threading import time lock = threading.RLock() print(lock.acquire()) print('-------------------------') print(lock.acquire(blocking=False)) print(lock.acquire()) print(lock.acquire(timeout=3.55)) print(lock.acquire(blocking=False)) lock.release() lock.release() lock.release() lock.release() lock.release()
四、Condition
- 構造方法Condition(lock=None),可以傳入一個Lock或RLock對象,默認是RLock
- Conditon用於生產者,消費者模型,為了解決生產者消費者速度匹配問題
- acquire(*args) : 獲取鎖
- wait(self,time=None) : 等待或超市
- notify(n=1) : 喚醒至多指定數目個數的等待線程,沒有等待的線程就沒有任何操作
- notif_all() : 喚醒所有等待的線程
舉例1: 消費者消費速度大於生產者生成速度 from threading import Thread, Lock, Event, Condition import time, random import logging FORAMT = '%(asctime)s %(threadName)s %(thread)d %(message)s' logging.basicConfig(format=FORAMT, level=logging.INFO) # 此例只是為了演示,不考慮線程安全問題 class Dispatcher: def __init__(self): self.data = None self.event = Event() #event只是為了使用方便,與邏輯無關 self.cond = Condition() def produce(self, total): for _ in range(total): data = random.randint(0,100) with self.cond: logging.info(data) self.data = data self.cond.notify_all() # 通知消費者消費,啟動所有線程 self.event.wait(1) # 模擬產生數據速度 self.event.set() def consume(self): while not self.event.is_set(): with self.cond: # 消費者等待 self.cond.wait() logging.info("received {}".format(self.data)) self.data = None self.event.wait(0.5) #模擬消費的速度 d = Dispatcher() p = Thread(target=d.produce, args=(10,), name='prodcer') c = Thread(target=d.consume, name='consumer') c.start() p.start() 消費者等待數據,如果生產者准備好了會通知消費者消費,省得消費者反復來查看數據是否就緒 Condition用於生產者消費者模型中,解決生產者消費者速度匹配的問題 采用了通知機制,非常有效率 使用方式,使用Condition,必須先acquire,用完了要release,因為內部使用了鎖,默認使用了RLock鎖, 最好方式是使用with上下文管理 消費者wait,等待通知,生產者生產好消息,對消費者發通知,可以使用notify或者notify_all方法
五、線程同步Barrier
- 有人翻譯成柵欄,建設理解成屏障,可以想象成路障,道閘
- Barrier(parties, action=None, timeout=None): 構建Barrier對象,指定參與方數目,timeout是wait方法未指定超時的默認值
- n_waiting : 當前在屏障中等待的線程數
- parties : 各方數,就是需要多少個等待
- wait(timeout=None) : 等待通過屏障,返回0到線程數-1的整數,每個線程返回不同,如果wait方法設置了超時,並超時發送,屏障將處於broken狀態
1、Barrier實例
- broken :如果屏障處於打破的狀態,返回True
- abort() : 將屏障置於broken狀態,等待中的線程或者調用等待方法的線程中都會拋出BrokenBarrierError異常,直到reset方法來恢復屏障
- reset(): 恢復屏障,重新開始攔截
import threading import logging #輸出格式定義 FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) def worker(barrier:threading.Barrier): logging.info('waiting for {} threads'.format(barrier.n_waiting)) try: barrier_id = barrier.wait() logging.info('after barrier {}'.format(barrier_id)) except threading.BrokenBarrierError: logging.info('Broken Barrier') barrier = threading.Barrier(3) #設置攔截線程數 for x in range(3): threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,)).start() logging.info('started') 從運行結果看出: 所有線程沖到了Barrier前等待,直到到達parties的數目,屏障打開,所有線程停止等待,繼續執行 再有線程wait,屏障就緒等到到達參數方數目 舉例 ,賽馬比賽所有馬匹就位,開閘 舉例2: import threading import logging #輸出格式定義 FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) def worker(barrier:threading.Barrier): logging.info('waiting for {} threads'.format(barrier.n_waiting)) try: barrier_id = barrier.wait() logging.info('after barrier {}'.format(barrier_id)) except threading.BrokenBarrierError: logging.info('Broken Barrier run') barrier = threading.Barrier(3) for x in range(0,9): if x == 2: barrier.abort() if x == 6: barrier.reset() threading.Event().wait(1) threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,)).start() logging.info('started') 上例中等待了2個,屏障就被break了,waiting的線程拋了BrokenBarrierError異常 新wait的線程也拋異常,直到屏障恢復,才繼續按照parties數目要求繼續攔截線程
2、wait方法超時實例
- 如果wait方法超時發生,屏障將處於broken狀態,直到reset
import threading import logging #輸出格式定義 FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) def worker(barrier:threading.Barrier, i:int): logging.info('waiting for {} threads'.format(barrier.n_waiting)) try: logging.info(barrier.broken) # 是否broken if i < 3: barrier_id = barrier.wait(1) # 超時后,屏障broken else: if i == 6: barrier.reset() #恢復屏障 barrier_id = barrier.wait(1) logging.info('after barrier {}'.format(barrier_id)) except threading.BrokenBarrierError: logging.info('Broken Barrier run') barrier = threading.Barrier(3) for x in range(0,9): threading.Event().wait(2) threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,x)).start() logging.info('started')
3、Barrier應用
- 所有線程都必須初始化完成,才能繼續工作,例如運行前加載數據,檢查,如果這些工作沒有完成,就開始運行,將不能正常工作
- 10個線程做10種工作准備,每個線程負責一種工作,只有這10個線程都完成后,才能繼續工作,先完成的要等待后完成的線程
六、semaphore信號量
- 和Lock很像,信號量對內部維護一個到計數器,每一次acquire都會減1,
- 當acquire方法發現計數為0就阻塞請求的線程,直到其他線程對信號量release后,計數大於0,恢復阻塞線程
- Semaphore(value=1): 構造方法,value小於0,拋出異常ValueError異常
- acquire(blocking=True,time=None) : 獲取信號量,計數器減1,獲取成功返回True
- release() : 釋放信號量,計數器加1
- 計數器永遠不會低於0,因為acquire的時候,發現是0,都會被阻塞
import threading import logging import time #輸出格式定義 FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) def worker(barrier:threading.Semaphore): logging.info('in sub thread') logging.info(s.acquire()) logging.info('sub thread over') #信號量 s = threading.Semaphore(3) logging.info(s.acquire()) logging.info(s.acquire()) logging.info(s.acquire()) threading.Thread(target=worker, args=(s,)).start() time.sleep(2) logging.info(s.acquire(False)) logging.info(s.acquire(timeout=3)) #阻塞 #釋放 logging.info('released') s.release() 應用舉例: 實現一個簡單的連接池,連接池應該有容量,有一個工廠方法可以獲取連接,能夠被不要的連接返回,供其他調用者使用 import threading import logging import random #輸出格式定義 FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) class Conn: def __init__(self, name): self.name = name def __repr__(self): return self.name class Pool: def __init__(self, count:int): self.count = count #池中是連接對象列表 self.pool = [self._connect("conn-{}".format(x)) for x in range(self.count)] self.semaphore = threading.Semaphore(count) def _connect(self, conn_name): #返回一個名稱 return Conn(conn_name) def get_conn(self): #從池中拿走一個連接 self.semaphore.acquire() conn = self.pool.pop() return conn def return_conn(self, conn:Conn): #向池中添加一個連接 self.pool.append(conn) self.semaphore.release() # 連接池初始化 pool = Pool(3) def worker(pool:Pool): conn = pool.get_conn() logging.info(conn) #模擬使用了一段時間 threading.Event().wait(random.randint(1,4)) pool.return_conn(conn) for i in range(6): threading.Thread(target=worker, name="worker-{}".format(i), args=(pool,)).start() 上例中,使用信號量解決資源有限的問題 如果池中有資源,請求者獲取資源時信號量減1,拿走資源,當請求超過資源數,請求者只能等待, 當使用者用完歸還資源后信號量加1,等待線程拿到就可以喚醒拿走資源
1、信號量和鎖
- 鎖,只允許同一個時間一個線程獨占資源,它是特殊的信號量,即信號量計數器初值為1
- 信號量,允許多個線程訪問共享資源,但是這個共享資源數量有限
- 鎖可以看做是特殊的信號量
2、數據結構和GIL
- Queue :標准庫queue模塊,提供FIFO的Queue,LIFO的隊列,優先隊列
- Queue類是線程安全的,適用於多線程間安全的交換數據,內部使用了Lock和Condition
3、GIL全局解釋器鎖
- Cpython在解釋器進程級別有一把鎖,叫做GIL全局解釋器
- GIL保證CPython進程中,只有一個線程執行字節碼,甚至是在多核CPU的情況下,也是如此
- CPython中,IO密集型,由於線程阻塞,就會調度其他線程,CPU密集型,當前線程可能會連續的獲得GIL,導致其他線程幾乎無法使用CPU