Python並發編程05 /死鎖現象、遞歸鎖、信號量、GIL鎖、計算密集型/IO密集型效率驗證、進程池/線程池


Python並發編程05 /死鎖現象、遞歸鎖、信號量、GIL鎖、計算密集型/IO密集型效率驗證、進程池/線程池

1. 死鎖現象

  • 死鎖:
    是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。

  • 死鎖現象:
    ①連續鎖多次,②鎖嵌套引起的死鎖現象

  • 代碼示例:

    from threading import Thread
    from threading import Lock
    import time
    
    lock_A = Lock()
    lock_B = Lock()
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
            
        def f1(self):
            lock_A.acquire()
            print(f'{self.name}拿到了A鎖')
            lock_B.acquire()
            print(f'{self.name}拿到了B鎖')
            lock_B.release()
            lock_A.release()
    
        def f2(self):
            lock_B.acquire()
            print(f'{self.name}拿到了B鎖')
            time.sleep(0.1)
            lock_A.acquire()
            print(f'{self.name}拿到了A鎖')
            lock_A.release()
            lock_B.release()
    
    if __name__ == '__main__':
        for i in range(3):
            t = MyThread()
            t.start()
    

2. 遞歸鎖

  • 遞歸鎖

    作用:遞歸鎖可以解決死鎖現象,業務需要多個鎖時,先要考慮遞歸鎖

    工作原理:遞歸鎖有一個計數的功能, 原數字為0,上一次鎖,計數+1,釋放一次鎖,計數-1,
    只要遞歸鎖上面的數字不為零,其他線程就不能搶鎖.

  • 代碼示例

    使用方式一:

    from threading import Thread
    from threading import RLock
    import time
    
    lock_A = lock_B = RLock()
    class MyThread(Thread):
    
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            lock_A.acquire()
            print(f'{self.name}拿到了A鎖')
            lock_B.acquire()
            print(f'{self.name}拿到了B鎖')
            lock_B.release()
            lock_A.release()
    
        def f2(self):
            lock_B.acquire()
            print(f'{self.name}拿到了B鎖')
            time.sleep(0.1)
            lock_A.acquire()
            print(f'{self.name}拿到了A鎖')
            lock_A.release()
            lock_B.release()
    
    if __name__ == '__main__':
        for i in range(3):
            t = MyThread()
            t.start()
    

    使用方式二:利用上下文管理

    from threading import RLock
    
    def task():
        with RLock:
            print(111)
            print(222)
    
    # 執行完with內的語句會釋放鎖
    

3. 信號量

  • 可以並發的數量,本質上也是一種鎖,可以設置同一時刻搶鎖線程的數量

  • 代碼示例

    from threading import Thread, Semaphore, current_thread
    import time
    import random
    sem = Semaphore(5)
    
    def task():
        sem.acquire()
        print(f'{current_thread().name} 吃飯中...')
        time.sleep(random.randint(1,3))
        sem.release()
    
    if __name__ == '__main__':
        for i in range(20):
            t = Thread(target=task,)
            t.start()
    

4. GIL全局解釋器鎖

1. 背景

  • 理論上來說:單個進程的多線程可以利用多核.

  • 但是,開發Cpython解釋器的程序員,給進入解釋器的線程加了鎖.

2. 加鎖的原因:

  1. 當時都是單核時代,而且cpu價格非常貴.

  2. 如果不加全局解釋器鎖, 開發Cpython解釋器的程序員就會在源碼內部各種主動加鎖,解鎖,非常麻煩,各種死鎖現象等等.為了省事兒,直接進入解釋器時給線程加一個鎖.

  3. 優缺點:

    優點: 保證了Cpython解釋器的數據資源的安全.
    缺點: 單個進程的多線程不能利用多核.

  4. Jpython沒有GIL鎖,pypy也沒有GIL鎖

  5. 現在多核時代, 我將Cpython的GIL鎖去掉行么?

    因為Cpython解釋器所有的業務邏輯都是圍繞着單個線程實現的,去掉這個GIL鎖,幾乎不可能.

  6. 單個進程的多線程可以並發,但是不能利用多核,不能並行,多個進程可以並發,並行.

3. GIL與Lock鎖的區別

  • 相同點: 都是同種鎖,互斥鎖.
  • 不同點:
    GIL鎖全局解釋器鎖,保護解釋器內部的資源數據的安全.
    GIL鎖 上鎖,釋放無需手動操作.
    自己代碼中定義的互斥鎖保護進程線程中的資源數據的安全.
    自己定義的互斥鎖必須自己手動上鎖,釋放鎖.

4. 為什么GIL保證不了自己數據的安全?

  • 一個線程去修改一個數據的時候,由於網絡延遲或者其它原因,被另一個線程搶到GIL鎖,拿到這個數據,此時就造成了該數據的不安全。

5. 驗證計算密集型、IO密集型的效率

  • IO密集型:單個進程的多線程並發 vs 多個進程的並發並行

    def task():
        count = 0
        time.sleep(random.randint(1,3))
        count += 1
    
    if __name__ == '__main__':
        # 多進程的並發,並行
        start_time = time.time()
        l1 = []
        for i in range(50):
            p = Process(target=task,)
            l1.append(p)
            p.start()
        for p in l1:
            p.join()
        print(f'執行效率:{time.time()- start_time}')  #  8.000000000
    
        # 多線程的並發
        start_time = time.time()
        l1 = []
        for i in range(50):
            p = Thread(target=task,)
            l1.append(p)
            p.start()
        for p in l1:
            p.join()
        print(f'執行效率:{time.time()- start_time}')  # 3.0294392108917236
        
    # 結論:對於IO密集型: 單個進程的多線程的並發效率高.
    
  • 計算密集型:單個進程的多線程並發 vs 多個進程的並發並行

    from threading import Thread
    from multiprocessing import Process
    import time
    import random
    
    def task():
        count = 0
        for i in range(10000000):
            count += 1
    
    if __name__ == '__main__':
        # 多進程的並發,並行
        start_time = time.time()
        l1 = []
        for i in range(4):
            p = Process(target=task,)
            l1.append(p)
            p.start()
        for p in l1:
            p.join()
        print(f'執行效率:{time.time()- start_time}')  # 3.1402080059051514
    
        # 多線程的並發
        start_time = time.time()
        l1 = []
        for i in range(4):
            p = Thread(target=task,)
            l1.append(p)
            p.start()
        for p in l1:
            p.join()
        print(f'執行效率:{time.time()- start_time}')  # 4.5913777351379395
        
    # 結論:對於計算密集型: 多進程的並發並行效率高.
    

6. 多線程實現socket通信

  • 無論是多線程還是多進程,都是一樣的寫法,來一個客戶端請求,我就開一個線程,來一個請求開一個線程,在計算機允許范圍內,開啟的線程進程數量越多越好.

  • 服務端

    import socket
    from threading import Thread
    
    def communicate(conn,addr):
        while 1:
            try:
                from_client_data = conn.recv(1024)
                print(f'來自客戶端{addr[1]}的消息: {from_client_data.decode("utf-8")}')
                to_client_data = input('>>>').strip()
                conn.send(to_client_data.encode('utf-8'))
            except Exception:
                break
        conn.close()
    
    def _accept():
        server = socket.socket()
        server.bind(('127.0.0.1', 8848))
        server.listen(5)
        while 1:
            conn, addr = server.accept()
            t = Thread(target=communicate,args=(conn,addr))
            t.start()
    
    if __name__ == '__main__':
        _accept()
    
  • 客戶端

    import socket
    client = socket.socket()
    client.connect(('127.0.0.1',8848))
    
    while 1:
        try:
            to_server_data = input('>>>').strip()
            client.send(to_server_data.encode('utf-8'))
            from_server_data = client.recv(1024)
            print(f'來自服務端的消息: {from_server_data.decode("utf-8")}')
        except Exception:
            break
    client.close()
    
    

7. 進程池,線程池

  • 定義:進程池線程池就是:控制開啟線程或者進程的數量

    線程池: 一個容器,這個容器限制住開啟線程的數量,比如4個,第一次肯定只能並發的處理4個任務,只要有任務完成,線程馬上就會接下一個任務.

  • 代碼示例

    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    import os
    import time
    import random
    
    def task(n):
        print(f'{os.getpid()} 接客')
        time.sleep(random.randint(1,3))
    
    if __name__ == '__main__':
        # 開啟進程池 
        p = ProcessPoolExecutor()  # 默認不寫,進程池里面的進程數與cpu個數相等
        for i in range(20):
            p.submit(task,i)
        
        # 開啟線程池
        t = ThreadPoolExecutor(100)  # 100個線程,不寫默認是cpu個數*5 線程數
        for i in range(20):
            t.submit(task,i)
    

總結:

  • 信號量與進程池、線程池的區別

    1.使用Seamphore,你創建了多少線程,實際就會有多少線程進行執行,只是可同時執行的線程數量會受到限制。但使用線程池,你創建的線程只是作為任務提交給線程池執行,實際工作的線程由線程池創建,並且實際工作的線程數量由線程池自己管理。

    2.簡單來說,線程池實際工作的線程是work線程,不是你自己創建的,是由線程池創建的,並由線程池自動控制實際並發的work線程數量。而Seamphore相當於一個信號燈,作用是對線程做限流,Seamphore可以對你自己創建的的線程做限流(也可以對線程池的work線程做限流),Seamphore的限流必須通過手動acquire和release來實現。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM