生產者消費者模型分析
操作系統中的經典模型,由若干個消費者和生產者,消費者消耗系統資源,生產者創造系統資源,資源的數量要保持在一個合理范圍(小於數量上限,大約0)。而消費者和生產者是通過並發或並行方式訪問系統資源的,需要保持資源的原子操作。
其實就是生產者線程增加資源數,如果資源數大於最大值則生產者線程掛起等待,當收到消費者線程的通知后繼續生產。
消費者線程減少資源數,如果資源數為0,則消費者線程掛起,等待生產者通知后繼續生產。
將該模型提煉成偽代碼如下:
func consume(){ Lock() if count <= 0 掛起等待(解鎖,並等待資源數大於0) 收到系統通知資源數大約0,搶占加鎖 count-- 如果當前資源數由最大值變少則通知生產者生產 ULock() } func produce(){ Lock() if count >= 最大值 掛起等待(解鎖,並等待資源數小於最大值) 收到系統通知資源小於最大值,搶占加鎖 count++ 如果當前資源數由最小值0增加則通知消費者可以消耗 ULock() }
consume()消耗資源,produce()生產資源,之前實現過C版本的該模型
http://www.limerence2017.com/2017/08/08/pthreadwait/
C方式實現的是搶占式的,線程切換開銷較大。下面給出golang協程方式的實現。
先實現資源的互斥訪問
對於資源的互斥訪問,其他語言提供了線程鎖,golang也有線程鎖,當然可以通過channel實現,這里我給出加鎖訪問資源的方式,因為channel內部也是通過加鎖實現的,而且我習慣用channel做協程通信,對於共享資源的控制習慣用鎖來控制,也比較高效。
先定義幾個全局變量
const ( PRODUCER_MAX = 5 CONSUMER_MAX = 2 PRODUCT_MAX = 20 ) var productcount = 0 var lock sync.Mutex var wgrp sync.WaitGroup
productcount為資源的數量,需要互斥處理。
wgrp主要是主協程用來等待其他協程退出。
PRODUCT_MAX 表示資源的上限,達到該值,生產者停止生產。
PRODUCER_MAX 表示生產者協程數量
CONSUMER_MAX 表示消費者協程數量
我們實現生產者代碼
//生產者 func Produce(index int, wgrp *sync.WaitGroup) { defer func() { if err := recover(); err != nil { fmt.Println("Producer ", index, " panic") } wgrp.Done() }() for { time.Sleep(time.Second) lock.Lock() fmt.Println("Producer ", index, " begin produce") if productcount >= PRODUCT_MAX { fmt.Println("Products are full") lock.Unlock() return } productcount++ fmt.Println("Products count is ", productcount) lock.Unlock() } }
defer 的匿名函數主要是用來回收資源,不是重點
for循環內部生產者循環增加資源,為保證productcount的互斥訪問,我們加了鎖。
當productcount達到上限后解鎖並返回,否則就增加數量,然后釋放鎖。
同樣的道理我們實現了消費者
func Consume(index int, wgrp *sync.WaitGroup) { defer func() { if err := recover(); err != nil { fmt.Println("Consumer ", index, " panic") } wgrp.Done() }() for { time.Sleep(time.Second) lock.Lock() fmt.Println("Consumer ", index, " begin consume") if productcount <= 0 { fmt.Println("Products are empty") lock.Unlock() return } productcount-- fmt.Println("Products count is ", productcount) lock.Unlock() } }
消費者加鎖減少productcount數量,當productcount為0,則解鎖並返回。
然后我們實現主函數
func main() { wgrp.Add(PRODUCER_MAX + CONSUMER_MAX) for i := 0; i < PRODUCER_MAX; i++ { go Produce(i, &wgrp) } for i := 0; i < CONSUMER_MAX; i++ { go Consume(i, &wgrp) } wgrp.Wait() }
我們創建了若干生產者和消費者,主協程通過wgrp等待其他協程退出。
我們看下效果
可以看出並發的訪問實現了,但是並沒有實現條件等待和控制,比如當數量上限后其他生產者也可以訪問。
接下來我們實現的是當數量上限是生產者掛起等待,直到消費者通知其生產。數量為0時消費者掛起,
等待生產者激活。也就是條件等待和異步協同。
實現條件等待和異步協同
協程之間的同步和等待可以使用channel,我們增加了兩個全局非緩沖channel
var produce_wait chan struct{} var consume_wait chan struct{}
produce_wait 用來控制生產者阻塞等待
consume_wait 用來控制消費者阻塞等待
我們修改下生產者
//生產者 func Produce(index int, wgrp *sync.WaitGroup) { defer func() { if err := recover(); err != nil { fmt.Println("Producer ", index, " panic") } wgrp.Done() }() for { time.Sleep(time.Second) lock.Lock() fmt.Println("Producer ", index, " begin produce") if productcount >= PRODUCT_MAX { fmt.Println("Products are full") lock.Unlock() //產品滿了,生產者wait <-produce_wait continue } lastcount := productcount productcount++ fmt.Println("Products count is ", productcount) lock.Unlock() //產品數由0到1,激活消費者 if lastcount == 0 { var consumActive struct{} consume_wait <- consumActive } } }
在18行增加了<-produce_wait,這樣生產者會掛起,等待消費者向produce_wait寫入,從而得到激活。
另外26行增加了判斷,當資源數由0到1時,激活消費者。
同樣消費者實現類似
//消費者 func Consume(index int, wgrp *sync.WaitGroup) { defer func() { if err := recover(); err != nil { fmt.Println("Consumer ", index, " panic") } wgrp.Done() }() for { time.Sleep(time.Second) lock.Lock() fmt.Println("Consumer ", index, " begin consume") if productcount <= 0 { fmt.Println("Products are empty") lock.Unlock() //產品空了,消費者等待 <-consume_wait continue } lastcount := productcount productcount-- fmt.Println("Products count is ", productcount) lock.Unlock() //產品數由PRODUCT_MAX變少,激活生產者 if lastcount == PRODUCT_MAX { var productActive struct{} produce_wait <- productActive } } }
這里我們要有並發的思想,考慮這樣一個場景,當前產品數達到上限,Produce運行完16行,剛剛解鎖,還沒來得及運行18行掛起,
Consume搶占到鎖正常運行消耗資源,運行到28行,優先對produce_wait寫入,此時該消費者掛起,生產者收到信號后,
他們都會繼續執行。
我們完善下main函數
func main() { wgrp.Add(PRODUCER_MAX + CONSUMER_MAX) produce_wait = make(chan struct{}) consume_wait = make(chan struct{}) for i := 0; i < CONSUMER_MAX; i++ { go Consume(i, &wgrp) } for i := 0; i < PRODUCER_MAX; i++ { go Produce(i, &wgrp) } wgrp.Wait() }
執行golang的鎖檢測並運行
go run -race main.go
可以看到是可以正常運行的
我們繼續用並發思想分析,我們實現了基本功能,但是有個瑕疵,我們的生產者協程較多,比如生產者協程1判斷生產上限在18行掛起,其他生產者如果搶占鎖后進入生產判斷數量上限,也會在18行掛起,由於我們的produce_wait是非緩沖的,那么當消費者來激活時,只有一個生產者被激活,另一個一直掛着,等到消費者激活才能繼續生產。這么做在一定程度限制了生產者,我們可以通過引入兩個bool變量通知其他協程睡眠,避免此問題。
增加bool變量實現休眠
我們可以引入兩個bool變量
var stopProduce = false var stopConsume = false
當資源達到上限或下限時,掛起單個協程,通過這兩個變量休眠同類協程。
由於golang沒有提供給我們休眠的api,我們就讓同類型的協程sleep一會,這樣也是可以提高模型並發的。
改進的生產者
//生產者 func Produce(index int, wgrp *sync.WaitGroup) { defer func() { if err := recover(); err != nil { fmt.Println("Producer ", index, " panic") } wgrp.Done() }() for { time.Sleep(time.Second) lock.Lock() if stopProduce { fmt.Println("Producer ", index, " stop produce, sleep 5 seconds") lock.Unlock() time.Sleep(time.Second * 5) continue } fmt.Println("Producer ", index, " begin produce") if productcount >= PRODUCT_MAX { fmt.Println("Products are full") stopProduce = true lock.Unlock() //產品滿了,生產者wait <-produce_wait lock.Lock() stopProduce = false lock.Unlock() continue } productcount++ fmt.Println("Products count is ", productcount) if stopConsume { var consumActive struct{} consume_wait <- consumActive } lock.Unlock() } }
我們在22行設置了stopProduce為true,然后在25行掛起了該協程,其他生產者協程發現stopProduce為true,則睡眠5秒。
此辦法保證了資源數臨界值后僅有單個協程掛起,不會影響到其他同類協程。
同樣實現消費者,這里不做贅述。
考慮這樣一個場景,如果在生產者設置bool解鎖后,其他消費者搶占鎖后為了激活生產者,優先寫入信道produce_wait,
此時生產者還沒有從produce_wait讀取,也不會有問題,畢竟生產者遲早要讀取。
接下來我們測試下
可以看到當生產者1生產數上限后,其他生產者會進入休眠。當消費者激活后,生產者繼續生產,其他生產者休眠后同樣可以生產。
提高了並發效率。
源碼下載
完整版源碼地址
https://github.com/secondtonone1/golang-/tree/master/producerconsumer