編寫一個簡單的EventBus
先放github地址
用go寫一個Pub/Sub比Java簡單多了,因為go有chan這機制。
總線(Bus)
管理所有專題(topic)和訂閱該專題的用戶。以map形式存儲。
這里加一把表級鎖。
type Bus struct {
subNode map[string]*node
rw sync.RWMutex
}
節點(node)
node內管理着訂閱同一專題的用戶序列。
這里加了一把序列鎖,在Bus的表級鎖被舉起時,node的鎖不會使用,這樣能減小鎖粒度,提高並發度。
// node contains a slice of subs that subscribe same topic
type node struct {
subs []Sub
// Note node's rw will not be used when bus's rw is helded.
rw sync.RWMutex
}
用戶(Sub)
Sub內有一個chan成員變量,每次有消息被發送到chan內時,由於chan的機制,Sub就能及時收到被推送的消息。
type Sub struct {
out chan interface{}
}
訂閱
把sub添加到bus中map便利的topic對應的序列中。
注意這里用到了兩種鎖的上鎖解鎖時機。
func (b *Bus) Subscribe(topic string, sub Sub) {
b.rw.Lock()
if n, ok := b.subNode[topic]; ok {
// found the node
b.rw.Unlock()
n.rw.Lock()
defer n.rw.Unlock()
n.subs = append(n.subs, sub)
} else {
defer b.rw.Unlock()
n := NewNode()
b.subNode[topic] = &n
}
}
發布
邏輯就是遍歷用戶序列,把消息發送到sub的chan中
func (b *Bus) Publish(topic string, msg interface{}) error {
b.rw.Lock()
if n, ok := b.subNode[topic]; ok {
// found the node
b.rw.Unlock()
n.rw.RLock()
defer n.rw.RUnlock()
// got the subs list and publish msg
go func(subs []Sub, msg interface{}) {
for _, sub := range subs {
sub.receive(msg)
}
}(n.subs, msg)
// successed return null
return nil
} else {
// topic not exist
return errors.New("topic not exist")
}
}
使用
見倉庫內的test文件
