生產者消費者模型
生產者消費者模型
在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
為什么要使用生產者和消費者模式
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
什么是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
基於隊列實現生產者消費者模型
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
def producer(q):
for i in range(10):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
if __name__ == '__main__':
q=Queue()
#生產者們:即廚師們
p1=Process(target=producer,args=(q,))
#消費者們:即吃貨們
c1=Process(target=consumer,args=(q,))
#開始
p1.start()
c1.start()
print('主')
生產者消費者模型總結:
#程序中有兩類角色
一類負責生產數據(生產者)
一類負責處理數據(消費者)
#引入生產者消費者模型為了解決的問題是:
平衡生產者與消費者之間的工作能力,從而提高程序整體處理數據的速度
#如何實現:
生產者<-->隊列<——>消費者
#生產者消費者模型實現程序的解耦和
多線程
什么是線程
線程指的是一條流水線的工作過程的總稱
線程是CPU的基本執行單位
對比進程而言,進程僅僅是一個資源單位其包含了程序運行所需的資源,就像一個車間
而單有資源是無法生產出產品的,必須有具體的生產產品的邏輯代碼
線程就相當於車間中的一條流水線,而你的代碼就是流水線上的一道道工序
特點
1.每個進程都會有一個默認的線程
2.每個進程可以存在多個線程
3.同一進程中的所有線程之間數據是共享的
4.創建線程的開銷遠比創建進程小的多
主線程與子線程的區別
1.線程之間是沒有父子之分,是平等的
2.主線程是由操作系統自動開啟的,而子線是由程序主動開啟
3.即時主線程的代碼執行完畢,也不會結束進程,會等待所有線程執行完畢,進程才結束
開啟線程的兩種方式
1.實例化Tread類,target參數用於指定子線程要執行的任務
from threading import Thread
def task():
print("子線程 run........")
t = Thread(target=task)
t.start()
print("over")
2.繼承Tread類,覆蓋run方法
from threading import Thread
class MyThread(Thread):
def run(self):
print("子線程 run........")
t = MyThread()
t.start()
print("over")
與進程在使用方法上沒有任何區別,不同的是開啟子線程的代碼可以寫在任意位置
之所以使用方法完全相同是因為,多進程其實是為了彌補多線程的缺憾而誕生的。詳見GIL鎖
線程與進程區別
1.同一進程中 線程之間數據共享
a = 100
def task():
global a
print("子線程 run........")
a = 1
t = Thread(target=task)
t.start()
print(a) # 1
print("over")
2.創建線程的開銷遠比創建進程小的多
from threading import Thread
from multiprocessing import Process
import time
def task():
pass
if __name__ == '__main__':
start = time.time()
for i in range(100):
p = Thread(target=task)
p.start()
print(time.time()-start)
# 修改Thread 為Process類 查看結果
3.無論開啟了多少子線程PID是不會變的
from threading import Thread
import os
def task():
print(os.getpid())
for i in range(100):
p = Thread(target=task)
p.start()
Tread類的常用屬性
# threading模塊包含的常用方法
import threading
print(threading.current_thread().name) #獲取當前線程對象
print(threading.active_count()) # 獲取目前活躍的線程數量
print(threading.enumerate()) # 獲取所有線程對象
t = Thread(name="aaa")
# t.join() # 主線程等待子線程執行完畢
print(t.name) # 線程名稱
print(t.is_alive()) # 是否存活
print(t.isDaemon()) # 是否為守護線程
守護線程
設置守護線程的語法與進程相同,相同的是也必須放在線程開啟前設置,否則拋出異常。
守護線程的特點:
守護線程會在被守護線程結束后立即結束
from threading import Thread
import time
def task():
print("start......")
time.sleep(5)
print("end......")
t = Thread(target=task)
# t.setDaemon(True)
t.daemon = True
t.start()
print("main over!")
疑惑:
from threading import Thread
import time
def task():
print("start....1")
time.sleep(3)
print("end......1")
def task2():
print("start....2")
time.sleep(4)
print("end......2")
t = Thread(target=task)
t.daemon = True
t.start()
t2 = Thread(target=task2)
t2.start()
print("main over!")
打印main over后主線程代碼執行完畢,但是守護線程t1並沒有立即結束,這是什么原因呢?
答:主線程會等待所有子線程執行完畢后結束
在上述例子中,一共有三個線程,主線程 ,t1,t2
雖然t1是守護線程 ,但是t2並不是所以主線程會等待t2執行結束才結束
順序是:守護線程 等待 主線程 等待 其余子線程
換句話說,守護線程會隨着所有非守護線程結束而結束。
線程鎖
互斥鎖
多線程的最主要特征之一是:同一進程中所有線程數據共享
一旦共享必然出現競爭問題。
a = 10
#lock = Lock()
def task():
global a
#lock.acquire()
b = a - 1
time.sleep(0.1)
a = b
#lock.release()
for i in range(10):
t = Thread(target=task)
t.start()
for t in threading.enumerate():
if t != threading.current_thread():
t.join()
print(a)
# 輸出 9
當多個線程要並發修改同一資源時,也需要加互斥鎖來保證數據安全。
同樣的一旦加鎖,就意味着串行,效率必然降低。
死鎖
現有兩把鎖l1和l2 用於表示盤子和筷子
兩個線程的目標是吃飯,要吃飯的前提是同時拿到筷子和盤子,但是兩個人的目標不同一個先拿筷子 ,一個先拿盤子最終造成死鎖
l1 = Lock()
l2 = Lock()
def task():
l1.acquire()
print(threading.current_thread().name,"拿到了筷子")
time.sleep(0.1)
l2.acquire()
print(threading.current_thread().name, "拿到了盤子")
print("吃飯")
l1.release()
l2.release()
def task2():
l2.acquire()
print(threading.current_thread().name, "拿到了盤子")
l1.acquire()
print(threading.current_thread().name,"拿到了筷子")
print("吃飯")
l2.release()
l1.release()
t1 = Thread(target=task)
t1.start()
t2 = Thread(target=task2)
t2.start()
共有兩把鎖,但是一人拿到了一把,並且互不釋放,相互等待,導致程序卡死,這就死鎖。
要發生死鎖只有兩種情況
1.有不止一把鎖,不同線程或進程分別拿到了不同的鎖不放
2.對同一把鎖執行了多次acquire
其中第二種情況我們可以通過可重入鎖來解決
遞歸鎖
Rlock 同一個線程可以多次執行acquire,釋放鎖時,有幾次acquire就要release幾次。
但是本質上同一個線程多次執行acquire時沒有任何意義的,其他線程必須等到RLock全部release之后才能訪問共享資源。
所以Rlock僅僅是幫你解決了代碼邏輯上的錯誤導致的死鎖,並不能解決多個鎖造成的死鎖問題
# 同一把RLock 多次acquire
#l1 = RLock()
#l2 = l1
# 不同的RLock 依然會鎖死
#l1 = RLock()
#l2 = RLock()
def task():
l1.acquire()
print(threading.current_thread().name,"拿到了筷子")
time.sleep(0.1)
l2.acquire()
print(threading.current_thread().name, "拿到了盤子")
print("吃飯")
l1.release()
l2.release()
def task2():
l2.acquire()
print(threading.current_thread().name, "拿到了盤子")
l1.acquire()
print(threading.current_thread().name,"拿到了筷子")
print("吃飯")
l2.release()
l1.release()
t1 = Thread(target=task)
t1.start()
t2 = Thread(target=task2)
t2.start()
忠告:在處理並發安全時 用完公共資源后一定要釋放鎖
信號量
Semaphore
信號量也是一種鎖,其特殊之處在於可以讓一個資源同時被多個線程共享,並控制最大的並發訪問線程數量。
如果把Lock比喻為家用洗手間,同一時間只能一個人使用。
那信號量就可以看做公共衛生間,同一時間可以有多個人同時使用。
from threading import Thread,Semaphore,current_thread
import time
s = Semaphore(3)
def task():
s.acquire()
print("%s running........" % current_thread())
time.sleep(1)
s.release()
for i in range(20):
Thread(target=task).start()