Python並發編程05 /死鎖現象、遞歸鎖、信號量、GIL鎖、計算密集型/IO密集型效率驗證、進程池/線程池
1. 死鎖現象
-
死鎖:
是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。 -
死鎖現象:
①連續鎖多次,②鎖嵌套引起的死鎖現象 -
代碼示例:
from threading import Thread from threading import Lock import time lock_A = Lock() lock_B = Lock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): lock_A.acquire() print(f'{self.name}拿到了A鎖') lock_B.acquire() print(f'{self.name}拿到了B鎖') lock_B.release() lock_A.release() def f2(self): lock_B.acquire() print(f'{self.name}拿到了B鎖') time.sleep(0.1) lock_A.acquire() print(f'{self.name}拿到了A鎖') lock_A.release() lock_B.release() if __name__ == '__main__': for i in range(3): t = MyThread() t.start()
2. 遞歸鎖
-
遞歸鎖
作用:遞歸鎖可以解決死鎖現象,業務需要多個鎖時,先要考慮遞歸鎖
工作原理:遞歸鎖有一個計數的功能, 原數字為0,上一次鎖,計數+1,釋放一次鎖,計數-1,
只要遞歸鎖上面的數字不為零,其他線程就不能搶鎖. -
代碼示例
使用方式一:
from threading import Thread from threading import RLock import time lock_A = lock_B = RLock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): lock_A.acquire() print(f'{self.name}拿到了A鎖') lock_B.acquire() print(f'{self.name}拿到了B鎖') lock_B.release() lock_A.release() def f2(self): lock_B.acquire() print(f'{self.name}拿到了B鎖') time.sleep(0.1) lock_A.acquire() print(f'{self.name}拿到了A鎖') lock_A.release() lock_B.release() if __name__ == '__main__': for i in range(3): t = MyThread() t.start()
使用方式二:利用上下文管理
from threading import RLock def task(): with RLock: print(111) print(222) # 執行完with內的語句會釋放鎖
3. 信號量
-
可以並發的數量,本質上也是一種鎖,可以設置同一時刻搶鎖線程的數量
-
代碼示例
from threading import Thread, Semaphore, current_thread import time import random sem = Semaphore(5) def task(): sem.acquire() print(f'{current_thread().name} 吃飯中...') time.sleep(random.randint(1,3)) sem.release() if __name__ == '__main__': for i in range(20): t = Thread(target=task,) t.start()
4. GIL全局解釋器鎖
1. 背景
-
理論上來說:單個進程的多線程可以利用多核.
-
但是,開發Cpython解釋器的程序員,給進入解釋器的線程加了鎖.
2. 加鎖的原因:
-
當時都是單核時代,而且cpu價格非常貴.
-
如果不加全局解釋器鎖, 開發Cpython解釋器的程序員就會在源碼內部各種主動加鎖,解鎖,非常麻煩,各種死鎖現象等等.為了省事兒,直接進入解釋器時給線程加一個鎖.
-
優缺點:
優點: 保證了Cpython解釋器的數據資源的安全.
缺點: 單個進程的多線程不能利用多核. -
Jpython沒有GIL鎖,pypy也沒有GIL鎖
-
現在多核時代, 我將Cpython的GIL鎖去掉行么?
因為Cpython解釋器所有的業務邏輯都是圍繞着單個線程實現的,去掉這個GIL鎖,幾乎不可能.
-
單個進程的多線程可以並發,但是不能利用多核,不能並行,多個進程可以並發,並行.
3. GIL與Lock鎖的區別
- 相同點: 都是同種鎖,互斥鎖.
- 不同點:
GIL鎖全局解釋器鎖,保護解釋器內部的資源數據的安全.
GIL鎖 上鎖,釋放無需手動操作.
自己代碼中定義的互斥鎖保護進程線程中的資源數據的安全.
自己定義的互斥鎖必須自己手動上鎖,釋放鎖.
4. 為什么GIL保證不了自己數據的安全?
-
一個線程去修改一個數據的時候,由於網絡延遲或者其它原因,被另一個線程搶到GIL鎖,拿到這個數據,此時就造成了該數據的不安全。
5. 驗證計算密集型、IO密集型的效率
-
IO密集型:單個進程的多線程並發 vs 多個進程的並發並行
def task(): count = 0 time.sleep(random.randint(1,3)) count += 1 if __name__ == '__main__': # 多進程的並發,並行 start_time = time.time() l1 = [] for i in range(50): p = Process(target=task,) l1.append(p) p.start() for p in l1: p.join() print(f'執行效率:{time.time()- start_time}') # 8.000000000 # 多線程的並發 start_time = time.time() l1 = [] for i in range(50): p = Thread(target=task,) l1.append(p) p.start() for p in l1: p.join() print(f'執行效率:{time.time()- start_time}') # 3.0294392108917236 # 結論:對於IO密集型: 單個進程的多線程的並發效率高.
-
計算密集型:單個進程的多線程並發 vs 多個進程的並發並行
from threading import Thread from multiprocessing import Process import time import random def task(): count = 0 for i in range(10000000): count += 1 if __name__ == '__main__': # 多進程的並發,並行 start_time = time.time() l1 = [] for i in range(4): p = Process(target=task,) l1.append(p) p.start() for p in l1: p.join() print(f'執行效率:{time.time()- start_time}') # 3.1402080059051514 # 多線程的並發 start_time = time.time() l1 = [] for i in range(4): p = Thread(target=task,) l1.append(p) p.start() for p in l1: p.join() print(f'執行效率:{time.time()- start_time}') # 4.5913777351379395 # 結論:對於計算密集型: 多進程的並發並行效率高.
6. 多線程實現socket通信
-
無論是多線程還是多進程,都是一樣的寫法,來一個客戶端請求,我就開一個線程,來一個請求開一個線程,在計算機允許范圍內,開啟的線程進程數量越多越好.
-
服務端
import socket from threading import Thread def communicate(conn,addr): while 1: try: from_client_data = conn.recv(1024) print(f'來自客戶端{addr[1]}的消息: {from_client_data.decode("utf-8")}') to_client_data = input('>>>').strip() conn.send(to_client_data.encode('utf-8')) except Exception: break conn.close() def _accept(): server = socket.socket() server.bind(('127.0.0.1', 8848)) server.listen(5) while 1: conn, addr = server.accept() t = Thread(target=communicate,args=(conn,addr)) t.start() if __name__ == '__main__': _accept()
-
客戶端
import socket client = socket.socket() client.connect(('127.0.0.1',8848)) while 1: try: to_server_data = input('>>>').strip() client.send(to_server_data.encode('utf-8')) from_server_data = client.recv(1024) print(f'來自服務端的消息: {from_server_data.decode("utf-8")}') except Exception: break client.close()
7. 進程池,線程池
-
定義:進程池線程池就是:控制開啟線程或者進程的數量
線程池: 一個容器,這個容器限制住開啟線程的數量,比如4個,第一次肯定只能並發的處理4個任務,只要有任務完成,線程馬上就會接下一個任務.
-
代碼示例
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor import os import time import random def task(n): print(f'{os.getpid()} 接客') time.sleep(random.randint(1,3)) if __name__ == '__main__': # 開啟進程池 p = ProcessPoolExecutor() # 默認不寫,進程池里面的進程數與cpu個數相等 for i in range(20): p.submit(task,i) # 開啟線程池 t = ThreadPoolExecutor(100) # 100個線程,不寫默認是cpu個數*5 線程數 for i in range(20): t.submit(task,i)
總結:
-
信號量與進程池、線程池的區別
1.使用Seamphore,你創建了多少線程,實際就會有多少線程進行執行,只是可同時執行的線程數量會受到限制。但使用線程池,你創建的線程只是作為任務提交給線程池執行,實際工作的線程由線程池創建,並且實際工作的線程數量由線程池自己管理。
2.簡單來說,線程池實際工作的線程是work線程,不是你自己創建的,是由線程池創建的,並由線程池自動控制實際並發的work線程數量。而Seamphore相當於一個信號燈,作用是對線程做限流,Seamphore可以對你自己創建的的線程做限流(也可以對線程池的work線程做限流),Seamphore的限流必須通過手動acquire和release來實現。