golang實現生產者消費者模型


生產者消費者模型分析

操作系統中的經典模型,由若干個消費者和生產者,消費者消耗系統資源,生產者創造系統資源,資源的數量要保持在一個合理范圍(小於數量上限,大約0)。而消費者和生產者是通過並發或並行方式訪問系統資源的,需要保持資源的原子操作。
其實就是生產者線程增加資源數,如果資源數大於最大值則生產者線程掛起等待,當收到消費者線程的通知后繼續生產。
消費者線程減少資源數,如果資源數為0,則消費者線程掛起,等待生產者通知后繼續生產。
將該模型提煉成偽代碼如下:

func consume(){
    Lock()
    if count <= 0
        掛起等待(解鎖,並等待資源數大於0)
        收到系統通知資源數大約0,搶占加鎖
    count--
    如果當前資源數由最大值變少則通知生產者生產
    ULock()
}

func produce(){
    Lock()
    if count >= 最大值
        掛起等待(解鎖,並等待資源數小於最大值)
        收到系統通知資源小於最大值,搶占加鎖
    count++
    如果當前資源數由最小值0增加則通知消費者可以消耗
    ULock()
}

  

consume()消耗資源,produce()生產資源,之前實現過C版本的該模型 

http://www.limerence2017.com/2017/08/08/pthreadwait/
C方式實現的是搶占式的,線程切換開銷較大。下面給出golang協程方式的實現。

先實現資源的互斥訪問

對於資源的互斥訪問,其他語言提供了線程鎖,golang也有線程鎖,當然可以通過channel實現,這里我給出加鎖訪問資源的方式,因為channel內部也是通過加鎖實現的,而且我習慣用channel做協程通信,對於共享資源的控制習慣用鎖來控制,也比較高效。

先定義幾個全局變量

const (
	PRODUCER_MAX = 5
	CONSUMER_MAX = 2
	PRODUCT_MAX  = 20
)

var productcount = 0
var lock sync.Mutex
var wgrp sync.WaitGroup

  

productcount為資源的數量,需要互斥處理。 

wgrp主要是主協程用來等待其他協程退出。
PRODUCT_MAX 表示資源的上限,達到該值,生產者停止生產。
PRODUCER_MAX 表示生產者協程數量
CONSUMER_MAX 表示消費者協程數量
我們實現生產者代碼

//生產者
func Produce(index int, wgrp *sync.WaitGroup) {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println("Producer ", index, " panic")
		}
		wgrp.Done()
	}()

	for {
		time.Sleep(time.Second)
		lock.Lock()
		fmt.Println("Producer ", index, " begin produce")
		if productcount >= PRODUCT_MAX {
			fmt.Println("Products are full")
			lock.Unlock()
			return
		}
		productcount++
		fmt.Println("Products count is ", productcount)
		lock.Unlock()
	}
}

defer 的匿名函數主要是用來回收資源,不是重點 

for循環內部生產者循環增加資源,為保證productcount的互斥訪問,我們加了鎖。
當productcount達到上限后解鎖並返回,否則就增加數量,然后釋放鎖。
同樣的道理我們實現了消費者

func Consume(index int, wgrp *sync.WaitGroup) {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println("Consumer ", index, " panic")
		}
		wgrp.Done()
	}()

	for {
		time.Sleep(time.Second)
		lock.Lock()
		fmt.Println("Consumer ", index, " begin consume")
		if productcount <= 0 {
			fmt.Println("Products are empty")
			lock.Unlock()
			return
		}
		productcount--
		fmt.Println("Products count is ", productcount)
		lock.Unlock()
	}
}

  

消費者加鎖減少productcount數量,當productcount為0,則解鎖並返回。 

然后我們實現主函數

func main() {
	wgrp.Add(PRODUCER_MAX + CONSUMER_MAX)
	for i := 0; i < PRODUCER_MAX; i++ {
		go Produce(i, &wgrp)
	}

	for i := 0; i < CONSUMER_MAX; i++ {
		go Consume(i, &wgrp)
	}
	wgrp.Wait()
}


我們創建了若干生產者和消費者,主協程通過wgrp等待其他協程退出。 

我們看下效果
1.jpg
可以看出並發的訪問實現了,但是並沒有實現條件等待和控制,比如當數量上限后其他生產者也可以訪問。
接下來我們實現的是當數量上限是生產者掛起等待,直到消費者通知其生產。數量為0時消費者掛起,
等待生產者激活。也就是條件等待和異步協同。

實現條件等待和異步協同

協程之間的同步和等待可以使用channel,我們增加了兩個全局非緩沖channel

var produce_wait chan struct{}
var consume_wait chan struct{}

produce_wait 用來控制生產者阻塞等待 

consume_wait 用來控制消費者阻塞等待

我們修改下生產者

//生產者
func Produce(index int, wgrp *sync.WaitGroup) {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println("Producer ", index, " panic")
		}
		wgrp.Done()
	}()

	for {
		time.Sleep(time.Second)
		lock.Lock()
		fmt.Println("Producer ", index, " begin produce")
		if productcount >= PRODUCT_MAX {
			fmt.Println("Products are full")
			lock.Unlock()
			//產品滿了,生產者wait
			<-produce_wait
			continue
		}
		lastcount := productcount
		productcount++
		fmt.Println("Products count is ", productcount)
		lock.Unlock()
		//產品數由0到1,激活消費者
		if lastcount == 0 {
			var consumActive struct{}
			consume_wait <- consumActive
		}

	}
}

在18行增加了<-produce_wait,這樣生產者會掛起,等待消費者向produce_wait寫入,從而得到激活。 

另外26行增加了判斷,當資源數由0到1時,激活消費者。
同樣消費者實現類似

//消費者
func Consume(index int, wgrp *sync.WaitGroup) {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println("Consumer ", index, " panic")
		}
		wgrp.Done()
	}()

	for {
		time.Sleep(time.Second)
		lock.Lock()
		fmt.Println("Consumer ", index, " begin consume")
		if productcount <= 0 {
			fmt.Println("Products are empty")
			lock.Unlock()
			//產品空了,消費者等待
			<-consume_wait
			continue
		}
		lastcount := productcount
		productcount--
		fmt.Println("Products count is ", productcount)
		lock.Unlock()
		//產品數由PRODUCT_MAX變少,激活生產者
		if lastcount == PRODUCT_MAX {
			var productActive struct{}
			produce_wait <- productActive
		}

	}
}

這里我們要有並發的思想,考慮這樣一個場景,當前產品數達到上限,Produce運行完16行,剛剛解鎖,還沒來得及運行18行掛起, 

Consume搶占到鎖正常運行消耗資源,運行到28行,優先對produce_wait寫入,此時該消費者掛起,生產者收到信號后,
他們都會繼續執行。
我們完善下main函數

func main() {
	wgrp.Add(PRODUCER_MAX + CONSUMER_MAX)
	produce_wait = make(chan struct{})
	consume_wait = make(chan struct{})
	for i := 0; i < CONSUMER_MAX; i++ {
		go Consume(i, &wgrp)
	}
	for i := 0; i < PRODUCER_MAX; i++ {
		go Produce(i, &wgrp)
	}

	wgrp.Wait()
}

執行golang的鎖檢測並運行 

go run -race main.go
可以看到是可以正常運行的
2.jpg
我們繼續用並發思想分析,我們實現了基本功能,但是有個瑕疵,我們的生產者協程較多,比如生產者協程1判斷生產上限在18行掛起,其他生產者如果搶占鎖后進入生產判斷數量上限,也會在18行掛起,由於我們的produce_wait是非緩沖的,那么當消費者來激活時,只有一個生產者被激活,另一個一直掛着,等到消費者激活才能繼續生產。這么做在一定程度限制了生產者,我們可以通過引入兩個bool變量通知其他協程睡眠,避免此問題。

增加bool變量實現休眠

我們可以引入兩個bool變量

var stopProduce = false
var stopConsume = false

當資源達到上限或下限時,掛起單個協程,通過這兩個變量休眠同類協程。 

由於golang沒有提供給我們休眠的api,我們就讓同類型的協程sleep一會,這樣也是可以提高模型並發的。
改進的生產者

//生產者
func Produce(index int, wgrp *sync.WaitGroup) {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println("Producer ", index, " panic")
		}
		wgrp.Done()
	}()

	for {
		time.Sleep(time.Second)
		lock.Lock()
		if stopProduce {
			fmt.Println("Producer ", index, " stop produce, sleep 5 seconds")
			lock.Unlock()
			time.Sleep(time.Second * 5)
			continue
		}
		fmt.Println("Producer ", index, " begin produce")
		if productcount >= PRODUCT_MAX {
			fmt.Println("Products are full")
			stopProduce = true
			lock.Unlock()
			//產品滿了,生產者wait
			<-produce_wait
			lock.Lock()
			stopProduce = false
			lock.Unlock()
			continue
		}
		productcount++
		fmt.Println("Products count is ", productcount)
		if stopConsume {
			var consumActive struct{}
			consume_wait <- consumActive
		}
		lock.Unlock()
	}
}

我們在22行設置了stopProduce為true,然后在25行掛起了該協程,其他生產者協程發現stopProduce為true,則睡眠5秒。 

此辦法保證了資源數臨界值后僅有單個協程掛起,不會影響到其他同類協程。
同樣實現消費者,這里不做贅述。
考慮這樣一個場景,如果在生產者設置bool解鎖后,其他消費者搶占鎖后為了激活生產者,優先寫入信道produce_wait,
此時生產者還沒有從produce_wait讀取,也不會有問題,畢竟生產者遲早要讀取。
接下來我們測試下
3.jpg
可以看到當生產者1生產數上限后,其他生產者會進入休眠。當消費者激活后,生產者繼續生產,其他生產者休眠后同樣可以生產。
提高了並發效率。

源碼下載

完整版源碼地址
https://github.com/secondtonone1/golang-/tree/master/producerconsumer

感謝關注公眾號

wxgzh.jpg


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM