用Golang寫一個簡單的eventBus(Pub/Sub)


編寫一個簡單的EventBus

先放github地址
用go寫一個Pub/Sub比Java簡單多了,因為go有chan這機制。

總線(Bus)

管理所有專題(topic)和訂閱該專題的用戶。以map形式存儲。

這里加一把表級鎖。

type Bus struct {
	subNode map[string]*node
	rw      sync.RWMutex
}

節點(node)

node內管理着訂閱同一專題的用戶序列。
這里加了一把序列鎖,在Bus的表級鎖被舉起時,node的鎖不會使用,這樣能減小鎖粒度,提高並發度。

// node contains a slice of subs that subscribe same topic
type node struct {
	subs []Sub
	// Note node's rw will not be used when bus's rw is helded.
	rw sync.RWMutex
}

用戶(Sub)

Sub內有一個chan成員變量,每次有消息被發送到chan內時,由於chan的機制,Sub就能及時收到被推送的消息。

type Sub struct {
	out chan interface{}
}

訂閱

把sub添加到bus中map便利的topic對應的序列中。
注意這里用到了兩種鎖的上鎖解鎖時機。

func (b *Bus) Subscribe(topic string, sub Sub) {
	b.rw.Lock()
	if n, ok := b.subNode[topic]; ok {
		// found the node
		b.rw.Unlock()
		n.rw.Lock()
		defer n.rw.Unlock()
		n.subs = append(n.subs, sub)
	} else {
		defer b.rw.Unlock()
		n := NewNode()
		b.subNode[topic] = &n
	}
}

發布

邏輯就是遍歷用戶序列,把消息發送到sub的chan中

func (b *Bus) Publish(topic string, msg interface{}) error {
	b.rw.Lock()
	if n, ok := b.subNode[topic]; ok {
		// found the node
		b.rw.Unlock()
		n.rw.RLock()
		defer n.rw.RUnlock()
		// got the subs list and publish msg
		go func(subs []Sub, msg interface{}) {
			for _, sub := range subs {
				sub.receive(msg)
			}
		}(n.subs, msg)
		// successed return null
		return nil
	} else {
		// topic not exist
		return errors.New("topic not exist")
	}

}

使用

見倉庫內的test文件


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM