一.死鎖現象與遞歸鎖
鎖:Lock線程安全,多線程操作時,內部會讓所有線程排隊處理。如:list/dict/Queue
線程不安全 + 人 => 排隊處理。
import threading import time v = [] lock = threading.Lock() def func(arg): lock.acquire() v.append(arg) time.sleep(0.01) m = v[-1] print(arg,m) lock.release() for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
鎖:RLock
import threading import time v = [] lock = threading.RLock() def func(arg): lock.acquire() lock.acquire() v.append(arg) time.sleep(0.01) m = v[-1] print(arg,m) lock.release() lock.release() for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
鎖:BoundedSemaphore
import time import threading lock = threading.BoundedSemaphore(3) def func(arg): lock.acquire() print(arg) time.sleep(1) lock.release() for i in range(20): t =threading.Thread(target=func,args=(i,)) t.start()
鎖:condition
import time import threading lock = threading.Condition() # ############## 方式一 ############## def func(arg): print('線程進來了') lock.acquire() lock.wait() # 加鎖 print(arg) time.sleep(1) lock.release() for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() while True: inp = int(input('>>>')) lock.acquire() lock.notify(inp) lock.release() # ############## 方式二 ############## """ def xxxx(): print('來執行函數了') input(">>>") # ct = threading.current_thread() # 獲取當前線程 # ct.getName() return True def func(arg): print('線程進來了') lock.wait_for(xxxx) print(arg) time.sleep(1) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() """
鎖:Event
import time import threading lock = threading.Event() def func(arg): print('線程來了') lock.wait() # 加鎖:紅燈 print(arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() input(">>>>") lock.set() # 綠燈 lock.clear() # 再次變紅燈 for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() input(">>>>") lock.set()
二.threading.local 的作用及原理
作用:內部自動為每個線程維護一個空間(字典),用於存取屬於自己的值.保證線程之間的數據隔離.
import time import threading v = threading.local() def func(arg): # 內部會為當前線程創建一個空間用於存儲:phone=自己的值 v.phone = arg time.sleep(2) print(v.phone,arg) # 去當前線程自己空間取值 for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
原理:
#原理1 import time import threading DATA_DICT = {} def func(arg): ident = threading.get_ident() DATA_DICT[ident] = arg time.sleep(1) print(DATA_DICT[ident],arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
#原理2 import time import threading INFO = {} class Local(object): def __getattr__(self, item): ident = threading.get_ident() return INFO[ident][item] def __setattr__(self, key, value): ident = threading.get_ident() if ident in INFO: INFO[ident][key] = value else: INFO[ident] = {key:value} obj = Local() def func(arg): obj.phone = arg # 調用對象的 __setattr__方法(“phone”,1) time.sleep(2) print(obj.phone,arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
三.線程池
線程池是一種多線程處理形式,處理過程中將任務添加到隊列,然后在創建線程后自動啟動這些任務。線程池線程都是后台線程。每個線程都使用默認的堆棧大小,以默認的優先級運行,並處於多線程單元中。如果某個線程在托管代碼中空閑(如正在等待某個事件),則線程池將插入另一個輔助線程來使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊列中包含掛起的工作,則線程池將在一段時間后創建另一個輔助線程但線程的數目永遠不會超過最大值。超過最大值的線程可以排隊,但他們要等到其他線程完成后才啟動。
from concurrent.futures import ThreadPoolExecutor import time def task(a1,a2): time.sleep(2) print(a1,a2) # 創建了一個線程池(最多5個線程) pool = ThreadPoolExecutor(5) for i in range(40): # 去線程池中申請一個線程,讓線程執行task函數。 pool.submit(task,i,8)
四.生產者消費者模型
產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者;生產者和消費者之間的中介就叫做緩沖區。

import time import queue import threading q = queue.Queue() # 線程安全 def producer(id): """ 生產者 :return: """ while True: time.sleep(2) q.put('包子') print('廚師%s 生產了一個包子' %id ) for i in range(1,4): t = threading.Thread(target=producer,args=(i,)) t.start() def consumer(id): """ 消費者 :return: """ while True: time.sleep(1) v1 = q.get() print('顧客 %s 吃了一個包子' % id) for i in range(1,3): t = threading.Thread(target=consumer,args=(i,)) t.start()
