python的Lock鎖,線程同步


一、Lock鎖

  • 凡是存在共享資源爭搶的地方都可以使用鎖,從而保證只有一個使用者可以完全使用這個資源一旦線程獲得鎖,其他試圖獲取鎖的線程將被阻塞
  • acquire(blocking=True,timeout=-1): 默認阻塞,阻塞可以設置超時時間,非阻塞時,timeout禁止設置,成功獲取鎖,返回True,否則返回False
  • releas() : 釋放鎖,可以從任何線程調用釋放,已上鎖的鎖,會被重置為unlocked未上鎖的鎖上調用,拋出RuntimeError異常
例如:
    訂單要求生成1000個杯子,組織10個工人生產
    
舉例1:

    import threading
    from threading import Thread, Lock
    import time
    import logging

    FORAMT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORAMT, level=logging.INFO)

    cups = []

    def worker(count=10):
        logging.info("I'm working for u")
        flag = False
        while True:
            if len(cups) >= count:
                flag = True
            time.sleep(0.001)   # 為了看出現場切換效果
            if not flag:
                cups.append(1)
            if flag:
                break
        logging.info("{} finished.cups= {}".format(threading.current_thread().name, len(cups)))

    for _ in range(10):  # 開啟10個線程
        Thread(target=worker, args=(1000,)).start()

    從上例的運行結果看出,多線程調度,導致了判斷失效,多生成了杯子
    如何修改,加鎖
    
    
舉例2:
    import threading
    from threading import Thread, Lock
    import time
    import logging

    FORAMT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORAMT, level=logging.INFO)

    cups = []
    lock = Lock()

    def worker(count=10):
        logging.info("I'm working for u")
        flag = False
        while True:
            lock.acquire()
            if len(cups) >= count:
                flag = True
            #lock.release()   # 這里釋放鎖合適嗎?
            time.sleep(0.001)   # 為了看出現場切換效果
            if not flag:
                cups.append(1)
            lock.release()  # 這里釋放鎖對不對
            if flag:
                break
        logging.info("{} finished.cups= {}".format(threading.current_thread().name, len(cups)))

    for _ in range(10):  # 開啟10個線程
        Thread(target=worker, args=(1000,)).start()

    假設第一句lock.release()合適,分析如下:
    有一個時刻len(cups),正好是999,flag=True,釋放鎖,線程被打斷,另一個線程判斷發現是
    999,flag=True,可能線程被打斷,可能另一個線程判斷也是999,flag也設置為True
    這三個線程只要繼續執行到cups.append(1),一定會導致cups的長度超過1000
    
    假設第二句lock.release()合適,分析如下:
    在某一時刻len(cups),正好是999,flag=True,其他線程試圖訪問這段代碼的線程都被阻塞獲取
    不到鎖,直到當前線程安全的增加了一個數據,然后釋放鎖,其它線程有一個搶到鎖,但發現
    已經是1000了,只好break打印退出,所有其他線程接着都退出
    


二、加鎖、解鎖

 

  • 一般來說加鎖后還要一些代碼實現,在釋放鎖之前還有可能拋異常,一旦出現異常所無法釋放,但是當前線程可能因為這個異常被終止了,這就產生了死鎖
  • 加鎖、解鎖常用語句:
    • 使用try...finally語句保證鎖的釋放
    • with上下文管理,鎖對象支持上下文管理
import threading
from threading import Thread, Lock
import time

class Counter:
    def __init__(self):
        self._val = 0
        self.__lock = Lock()

    @property
    def value(self):
        with self.__lock:
            return self._val

    def inc(self):
        try:
            self.__lock.acquire()
            self._val += 1
        finally:
            self.__lock.release()

    def dec(self):
        with self.__lock:
            self._val -= 1

def run(c:Counter, count=100):
    for _ in range(count):
        for i in range(-50,50):
            if i < 0:
                c.dec()
            else:
                c.inc()

c = Counter()
c1 = 10
c2 = 100
for i in range(c1):
    Thread(target=run, args=(c,c2)).start()

while True:
    time.sleep(1)
    if threading.active_count() == 1:
        print(threading.enumerate())
        print(c.value)
    else:
        print(threading.enumerate())


三、鎖的應用場景

  • 鎖適用於訪問和修改同一個共享資源的時候,即讀寫同一個資源的時候,如果全部都是讀取同一個共享資源需要鎖嗎,不需要,因為這時可以認為共享資源是不可變的,每一次讀取它都是一樣的值,所有不用加鎖
  •  使用鎖的注意事項
    • 少用鎖,必要時用鎖,使用了鎖,多線程訪問被鎖的資源時,就成了串行,要么排隊,要么爭搶
    • 加鎖時間越短越好,不需要就立即釋放鎖
    • 一定要避免死鎖

 1、可重入鎖Rlock

  • 可重入鎖,是線程相關的鎖,線程A獲得可重復鎖,並可以多次成功獲取,不會阻塞,最后要在線程A中做和acquire次數相同的relea
import threading
import time

lock = threading.RLock()
print(lock.acquire())
print('-------------------------')

print(lock.acquire(blocking=False))
print(lock.acquire())
print(lock.acquire(timeout=3.55))
print(lock.acquire(blocking=False))

lock.release()
lock.release()
lock.release()
lock.release()
lock.release()


四、Condition

  • 構造方法Condition(lock=None),可以傳入一個Lock或RLock對象,默認是RLock
  • Conditon用於生產者,消費者模型,為了解決生產者消費者速度匹配問題
  •  acquire(*args) : 獲取鎖
  •  wait(self,time=None) : 等待或超市
  •  notify(n=1) : 喚醒至多指定數目個數的等待線程,沒有等待的線程就沒有任何操作
  •  notif_all() : 喚醒所有等待的線程
舉例1:  消費者消費速度大於生產者生成速度


    from threading import Thread, Lock, Event, Condition
    import time, random
    import logging

    FORAMT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORAMT, level=logging.INFO)

    # 此例只是為了演示,不考慮線程安全問題
    class Dispatcher:
        def __init__(self):
            self.data = None
            self.event = Event()  #event只是為了使用方便,與邏輯無關
            self.cond = Condition()

        def produce(self, total):
            for _ in range(total):
                data = random.randint(0,100)
                with self.cond:
                    logging.info(data)
                    self.data = data
                    self.cond.notify_all()   #  通知消費者消費,啟動所有線程
                self.event.wait(1)  # 模擬產生數據速度
            self.event.set()

        def consume(self):
            while not self.event.is_set():
                with self.cond:   # 消費者等待
                    self.cond.wait()
                    logging.info("received {}".format(self.data))
                    self.data = None
                self.event.wait(0.5)  #模擬消費的速度

    d = Dispatcher()
    p = Thread(target=d.produce, args=(10,), name='prodcer')
    c = Thread(target=d.consume, name='consumer')  
    c.start()
    p.start()

    消費者等待數據,如果生產者准備好了會通知消費者消費,省得消費者反復來查看數據是否就緒
    
    Condition用於生產者消費者模型中,解決生產者消費者速度匹配的問題
    采用了通知機制,非常有效率
    使用方式,使用Condition,必須先acquire,用完了要release,因為內部使用了鎖,默認使用了RLock鎖,
    最好方式是使用with上下文管理
    消費者wait,等待通知,生產者生產好消息,對消費者發通知,可以使用notify或者notify_all方法


五、線程同步Barrier

  • 有人翻譯成柵欄,建設理解成屏障,可以想象成路障,道閘
  • Barrier(parties, action=None, timeout=None): 構建Barrier對象,指定參與方數目,timeout是wait方法未指定超時的默認值
  •  n_waiting : 當前在屏障中等待的線程數
  •  parties : 各方數,就是需要多少個等待
  •  wait(timeout=None) : 等待通過屏障,返回0到線程數-1的整數,每個線程返回不同,如果wait方法設置了超時,並超時發送,屏障將處於broken狀態

1、Barrier實例

  •  broken :如果屏障處於打破的狀態,返回True
  •  abort() : 將屏障置於broken狀態,等待中的線程或者調用等待方法的線程中都會拋出BrokenBarrierError異常,直到reset方法來恢復屏障
  •  reset(): 恢復屏障,重新開始攔截
import threading
    import logging

    #輸出格式定義
    FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'
    logging.basicConfig(level=logging.INFO, format=FORMAT)

    def worker(barrier:threading.Barrier):
        logging.info('waiting for {} threads'.format(barrier.n_waiting))
        try:
            barrier_id = barrier.wait()
            logging.info('after barrier {}'.format(barrier_id))
        except threading.BrokenBarrierError:
            logging.info('Broken Barrier')

    barrier = threading.Barrier(3)  #設置攔截線程數

    for x in range(3):
        threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,)).start()

    logging.info('started')

從運行結果看出:
    所有線程沖到了Barrier前等待,直到到達parties的數目,屏障打開,所有線程停止等待,繼續執行
    再有線程wait,屏障就緒等到到達參數方數目
    舉例 ,賽馬比賽所有馬匹就位,開閘
    
舉例2:

    import threading
    import logging

    #輸出格式定義
    FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'
    logging.basicConfig(level=logging.INFO, format=FORMAT)

    def worker(barrier:threading.Barrier):
        logging.info('waiting for {} threads'.format(barrier.n_waiting))
        try:
            barrier_id = barrier.wait()
            logging.info('after barrier {}'.format(barrier_id))
        except threading.BrokenBarrierError:
            logging.info('Broken Barrier run')

    barrier = threading.Barrier(3)

    for x in range(0,9):
        if x == 2:
            barrier.abort()
        if x == 6:
            barrier.reset()
        threading.Event().wait(1)
        threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,)).start()

    logging.info('started')

    上例中等待了2個,屏障就被break了,waiting的線程拋了BrokenBarrierError異常
    新wait的線程也拋異常,直到屏障恢復,才繼續按照parties數目要求繼續攔截線程
    


2、wait方法超時實例

  • 如果wait方法超時發生,屏障將處於broken狀態,直到reset
import threading
    import logging

    #輸出格式定義
    FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'
    logging.basicConfig(level=logging.INFO, format=FORMAT)

    def worker(barrier:threading.Barrier, i:int):

        logging.info('waiting for {} threads'.format(barrier.n_waiting))
        try:
            logging.info(barrier.broken)  # 是否broken
            if i < 3:
                barrier_id = barrier.wait(1)  # 超時后,屏障broken
            else:
                if i == 6:
                    barrier.reset() #恢復屏障
                barrier_id = barrier.wait(1)
            logging.info('after barrier {}'.format(barrier_id))
        except threading.BrokenBarrierError:
            logging.info('Broken Barrier run')

    barrier = threading.Barrier(3)

    for x in range(0,9):

        threading.Event().wait(2)
        threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,x)).start()

    logging.info('started')


3、Barrier應用

  • 所有線程都必須初始化完成,才能繼續工作,例如運行前加載數據,檢查,如果這些工作沒有完成,就開始運行,將不能正常工作
  • 10個線程做10種工作准備,每個線程負責一種工作,只有這10個線程都完成后,才能繼續工作,先完成的要等待后完成的線程

 

六、semaphore信號量

  • 和Lock很像,信號量對內部維護一個到計數器,每一次acquire都會減1,
  • 當acquire方法發現計數為0就阻塞請求的線程,直到其他線程對信號量release后,計數大於0,恢復阻塞線程
  • Semaphore(value=1): 構造方法,value小於0,拋出異常ValueError異常
  • acquire(blocking=True,time=None) : 獲取信號量,計數器減1,獲取成功返回True
  • release() : 釋放信號量,計數器加1
  • 計數器永遠不會低於0,因為acquire的時候,發現是0,都會被阻塞
import threading
    import logging
    import time

    #輸出格式定義
    FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'
    logging.basicConfig(level=logging.INFO, format=FORMAT)

    def worker(barrier:threading.Semaphore):
        logging.info('in sub thread')
        logging.info(s.acquire())
        logging.info('sub thread over')

    #信號量
    s = threading.Semaphore(3)
    logging.info(s.acquire())
    logging.info(s.acquire())
    logging.info(s.acquire())

    threading.Thread(target=worker, args=(s,)).start()
    time.sleep(2)

    logging.info(s.acquire(False))
    logging.info(s.acquire(timeout=3))   #阻塞

    #釋放
    logging.info('released')
    s.release()




應用舉例:
    實現一個簡單的連接池,連接池應該有容量,有一個工廠方法可以獲取連接,能夠被不要的連接返回,供其他調用者使用


    import threading
    import logging
    import random

    #輸出格式定義
    FORMAT = '%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s'
    logging.basicConfig(level=logging.INFO, format=FORMAT)

    class Conn:
        def __init__(self, name):
            self.name = name

        def __repr__(self):
            return self.name

    class Pool:
        def __init__(self, count:int):
            self.count = count
            #池中是連接對象列表
            self.pool = [self._connect("conn-{}".format(x)) for x in range(self.count)]
            self.semaphore = threading.Semaphore(count)

        def _connect(self, conn_name):
            #返回一個名稱
            return Conn(conn_name)

        def get_conn(self):
            #從池中拿走一個連接
            self.semaphore.acquire()
            conn = self.pool.pop()
            return conn

        def return_conn(self, conn:Conn):
            #向池中添加一個連接
            self.pool.append(conn)
            self.semaphore.release()

    # 連接池初始化
    pool = Pool(3)

    def worker(pool:Pool):
        conn = pool.get_conn()
        logging.info(conn)
        #模擬使用了一段時間
        threading.Event().wait(random.randint(1,4))
        pool.return_conn(conn)

    for i in range(6):
        threading.Thread(target=worker, name="worker-{}".format(i), args=(pool,)).start()


    上例中,使用信號量解決資源有限的問題
    如果池中有資源,請求者獲取資源時信號量減1,拿走資源,當請求超過資源數,請求者只能等待,
    當使用者用完歸還資源后信號量加1,等待線程拿到就可以喚醒拿走資源
    


1、信號量和鎖

  •  鎖,只允許同一個時間一個線程獨占資源,它是特殊的信號量,即信號量計數器初值為1
  •  信號量,允許多個線程訪問共享資源,但是這個共享資源數量有限
  •  鎖可以看做是特殊的信號量

2、數據結構和GIL

  •  Queue :標准庫queue模塊,提供FIFO的Queue,LIFO的隊列,優先隊列
  •  Queue類是線程安全的,適用於多線程間安全的交換數據,內部使用了Lock和Condition

3、GIL全局解釋器鎖

  • Cpython在解釋器進程級別有一把鎖,叫做GIL全局解釋器
  • GIL保證CPython進程中,只有一個線程執行字節碼,甚至是在多核CPU的情況下,也是如此
  •  CPython中,IO密集型,由於線程阻塞,就會調度其他線程,CPU密集型,當前線程可能會連續的獲得GIL,導致其他線程幾乎無法使用CPU

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM