作者:Gundy_
鏈接:https://www.jianshu.com/p/dc94f2099277
生產者消費者模型
並發編程中最常見的例子就是生產者消費者模式,該模式主要通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。簡單地說,就是生產者生產一些數據,然后放到成果隊列中,同時消費者從成果隊列中來取這些數據。這樣就讓生產消費變成了異步的兩個過程。當成果隊列中沒有數據時,消費者就進入飢餓的等待中;而當成果隊列中數據已滿時,生產者則面臨因產品擠壓導致CPU被剝奪的下崗問題。
// 生產者: 生成 factor 整數倍的序列
func Producer(factor int, out chan<- int) {
for i := 0; ; i++ {
out <- i*factor
}
}
// 消費者
func Consumer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
ch := make(chan int, 64) // 成果隊列
go Producer(3, ch) // 生成 3 的倍數的序列
go Producer(5, ch) // 生成 5 的倍數的序列
go Consumer(ch) // 消費 生成的隊列
// 運行一定時間后退出
time.Sleep(5 * time.Second)
}
我們開啟了2個Producer生產流水線,分別用於生成3和5的倍數的序列。然后開啟1個Consumer消費者線程,打印獲取的結果。我們通過在main函數休眠一定的時間來讓生產者和消費者工作一定時間。正如前面一節說的,這種靠休眠方式是無法保證穩定的輸出結果的。
我們可以讓main函數保存阻塞狀態不退出,只有當用戶輸入Ctrl-C時才真正退出程序:
func main() {
ch := make(chan int, 64) // 成果隊列
go Producer(3, ch) // 生成 3 的倍數的序列
go Producer(5, ch) // 生成 5 的倍數的序列
go Consumer(ch) // 消費 生成的隊列
// Ctrl+C 退出
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
fmt.Printf("quit (%v)\n", <-sig)
}
我們這個例子中有2個生產者,並且2個生產者之間並無同步事件可參考,它們是並發的。因此,消費者輸出的結果序列的順序是不確定的,這並沒有問題,生產者和消費者依然可以相互配合工作。
發布訂閱模型
發布訂閱(publish-and-subscribe)模型通常被簡寫為pub/sub模型。在這個模型中,消息生產者成為發布者(publisher),而消息消費者則成為訂閱者(subscriber),生產者和消費者是M:N的關系。在傳統生產者和消費者模型中,是將消息發送到一個隊列中,而發布訂閱模型則是將消息發布給一個主題。
為此,我們構建了一個名為pubsub的發布訂閱模型支持包:
// Package pubsub implements a simple multi-topic pub-sub library.
package pubsub
import (
"sync"
"time"
)
type (
subscriber chan interface{} // 訂閱者為一個管道
topicFunc func(v interface{}) bool // 主題為一個過濾器
)
// 發布者對象
type Publisher struct {
m sync.RWMutex // 讀寫鎖
buffer int // 訂閱隊列的緩存大小
timeout time.Duration // 發布超時時間
subscribers map[subscriber]topicFunc // 訂閱者信息
}
// 構建一個發布者對象, 可以設置發布超時時間和緩存隊列的長度
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
// 添加一個新的訂閱者,訂閱全部主題
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}
// 添加一個新的訂閱者,訂閱過濾器篩選后的主題
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// 退出訂閱
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
defer p.m.Unlock()
delete(p.subscribers, sub)
close(sub)
}
// 發布一個主題
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
defer p.m.RUnlock()
var wg sync.WaitGroup
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, &wg)
}
wg.Wait()
}
// 關閉發布者對象,同時關閉所有的訂閱者管道。
func (p *Publisher) Close() {
p.m.Lock()
defer p.m.Unlock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
}
// 發送主題,可以容忍一定的超時
func (p *Publisher) sendTopic(
sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup,
) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
select {
case sub <- v:
case <-time.After(p.timeout):
}
}
下面的例子中,有兩個訂閱者分別訂閱了全部主題和含有"golang"的主題:
import "path/to/pubsub"
func main() {
p := pubsub.NewPublisher(100*time.Millisecond, 10)
defer p.Close()
all := p.Subscribe()
golang := p.SubscribeTopic(func(v interface{}) bool {
if s, ok := v.(string); ok {
return strings.Contains(s, "golang")
}
return false
})
p.Publish("hello, world!")
p.Publish("hello, golang!")
go func() {
for msg := range all {
fmt.Println("all:", msg)
}
} ()
go func() {
for msg := range golang {
fmt.Println("golang:", msg)
}
} ()
// 運行一定時間后退出
time.Sleep(3 * time.Second)
}
在發布訂閱模型中,每條消息都會傳送給多個訂閱者。發布者通常不會知道、也不關心哪一個訂閱者正在接收主題消息。訂閱者和發布者可以在運行時動態添加,是一種松散的耦合關系,這使得系統的復雜性可以隨時間的推移而增長。在現實生活中,像天氣預報之類的應用就可以應用這個並發模式。
