前言
這周姐姐入職了新公司,老板想探探他的底,看了一眼他的簡歷,呦呵,精通kafka,這小姑娘有兩下子,既然這樣,那你寫一個消息隊列吧。因為要用go語言寫,這可給姐姐愁壞了。趕緊來求助我,我這么堅貞不屈一人,在姐姐的軟磨硬泡下還是答應他了,所以接下來我就手把手教姐姐怎么寫一個消息隊列。下面我們就來看一看我是怎么寫的吧~~~。
本代碼已上傳到我的github:
有需要的小伙伴,可自行下載,順便給個小星星吧~~~
什么是消息隊列
姐姐真是把我愁壞了,自己寫的精通kafka
,竟然不知道什么是消息隊列,於是,一向好脾氣的我開始給姐姐講一講什么是消息隊列。
消息隊列,我們一般稱它為MQ(Message Queue)
,兩個單詞的結合,這兩個英文單詞想必大家都應該知道吧,其實最熟悉的還是Queue
吧,即隊列。隊列是一種先進先出的數據結構,隊列的使用還是比較普遍的,但是已經有隊列了,怎么還需要MQ
呢?
我:問你呢,姐姐,知道嗎?為什么還需要
MQ
?姐姐:快點講,想挨打呀?
我:噗。。。 算我多嘴,哼~~~
欠欠的我開始了接下來的耐心講解......
舉一個簡單的例子,假設現在我們要做一個系統,該登陸系統需要在用戶登陸成功后,發送封郵件到用戶郵箱進行提醒,需求還是很簡單的,我們先開看一看沒有MQ
,我們該怎么實現呢?畫一個時序圖來看一看:
看這個圖,郵件發送在請求登陸時進行,當密碼驗證成功后,就發送郵件,然后返回登陸成功。這樣是可以的,但是他是有缺陷的。這讓我們的登陸操作變得復雜了,每次請求登陸都需要進行郵件發送,如果這里出現錯誤,整個登陸請求也出現了錯誤,導致登陸不成功;還有一個問題,本來我們登陸請求調用接口僅僅需要100ms,因為中間要做一次發送郵件的等待,那么調用一次登陸接口的時間就要增長,這就是問題所在,一封郵件他的優先級 不是很高的,用戶也不需要實時收到這封郵件,所以這時,就體現了消息隊列的重要性了,我們用消息隊列進行改進一下。
這里我們將發送郵件請求放到Mq
中,這樣我們就能提高用戶體驗的吞吐量,這個很重要,顧客就是上帝嘛,畢竟也沒有人喜歡用一個很慢很慢的app。
這里只是舉了MQ
眾多應用中的其中一個,即異步應用,MQ
還在系統解藕、削峰/限流中有着重要應用,這兩個我就不具體講解了,原理都一樣,好好思考一下,你們都能懂得。
channel
好啦,姐姐終於知道什么是消息隊列了,但是現在還是沒法進行消息隊列開發的,因為還差一個知識點,即go語言中的channel
。這個很重要,我們還需要靠這個來開發我們的消息隊列呢。
因篇幅有限,這里不詳細介紹channel
,只介紹基本使用方法。
什么是channel
Goroutine 和 Channel 是 Go 語言並發編程的兩大基石。Goroutine 用於執行並發任務,Channel 用於 goroutine 之間的同步、通信。Go提倡使用通信的方法代替共享內存,當一個Goroutine需要和其他Goroutine資源共享時,Channel就會在他們之間架起一座橋梁,並提供確保安全同步的機制。channel
本質上其實還是一個隊列,遵循FIFO原則。具體規則如下:
- 先從 Channel 讀取數據的 Goroutine 會先接收到數據;
- 先向 Channel 發送數據的 Goroutine 會得到先發送數據的權利;
創建通道
創建通道需要用到關鍵字 make ,格式如下:
通道實例 := make(chan 數據類型)
- 數據類型:通道內傳輸的元素類型。
- 通道實例:通過make創建的通道句柄。
無緩沖通道的使用
Go語言中無緩沖的通道(unbuffered channel)是指在接收前沒有能力保存任何值的通道。這種類型的通道要求發送 goroutine 和接收 goroutine 同時准備好,才能完成發送和接收操作。
無緩沖通道的定義方式如下:
通道實例 := make(chan 通道類型)
- 通道類型:和無緩沖通道用法一致,影響通道發送和接收的數據類型。
- 緩沖大小:0
- 通道實例:被創建出的通道實例。
寫個例子來幫助大家理解一下吧:
package main
import (
"sync"
"time"
)
func main() {
c := make(chan string)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
c <- `Golang夢工廠`
}()
go func() {
defer wg.Done()
time.Sleep(time.Second * 1)
println(`Message: `+ <-c)
}()
wg.Wait()
}
帶緩沖的通道的使用
Go語言中有緩沖的通道(buffered channel)是一種在被接收前能存儲一個或者多個值的通道。這種類型的通道並不強制要求 goroutine 之間必須同時完成發送和接收。通道會阻塞發送和接收動作的條件也會不同。只有在通道中沒有要接收的值時,接收動作才會阻塞。只有在通道沒有可用緩沖區容納被發送的值時,發送動作才會阻塞。
有緩沖通道的定義方式如下:
通道實例 := make(chan 通道類型, 緩沖大小)
- 通道類型:和無緩沖通道用法一致,影響通道發送和接收的數據類型。
- 緩沖大小:決定通道最多可以保存的元素數量。
- 通道實例:被創建出的通道實例。
來寫一個例子講解一下:
package main
import (
"sync"
"time"
)
func main() {
c := make(chan string, 2)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
c <- `Golang夢工廠`
c <- `asong`
}()
go func() {
defer wg.Done()
time.Sleep(time.Second * 1)
println(`公眾號: `+ <-c)
println(`作者: `+ <-c)
}()
wg.Wait()
}
好啦,通道的概念就介紹到這里了,如果需要,下一篇我出一個channel
詳細講解的文章。
消息隊列編碼實現
准備篇
終於開始進入主題了,姐姐都聽的快要睡着了,我轟隆一嗓子,立馬精神,但是呢,asong也是挨了一頓小電炮,代價慘痛呀,嗚嗚嗚............
在開始編寫代碼編寫直接,我需要構思我們的整個代碼架構,這才是正確的編碼方式。我們先來定義一個接口,把我們需要實現的方法先列出來,后期對每一個代碼進行實現就可以了。因此可以列出如下方法:
type Broker interface {
publish(topic string, msg interface{}) error
subscribe(topic string) (<-chan interface{}, error)
unsubscribe(topic string, sub <-chan interface{}) error
close()
broadcast(msg interface{}, subscribers []chan interface{})
setConditions(capacity int)
}
publish
:進行消息的推送,有兩個參數即topic
、msg
,分別是訂閱的主題、要傳遞的消息subscribe
:消息的訂閱,傳入訂閱的主題,即可完成訂閱,並返回對應的channel
通道用來接收數據unsubscribe
:取消訂閱,傳入訂閱的主題和對應的通道close
:這個的作用就是很明顯了,就是用來關閉消息隊列的broadCast
:這個屬於內部方法,作用是進行廣播,對推送的消息進行廣播,保證每一個訂閱者都可以收到setConditions
:這里是用來設置條件,條件就是消息隊列的容量,這樣我們就可以控制消息隊列的大小了
細心的你們有沒有發現什么問題,這些代碼我都定義的是內部方法,也就是包外不可用。為什么這么做呢,因為這里屬於代理要做的事情,我們還需要在封裝一層,也就是客戶端能直接調用的方法,這樣才符合軟件架構。因此可以寫出如下代碼:
package mq
type Client struct {
bro *BrokerImpl
}
func NewClient() *Client {
return &Client{
bro: NewBroker(),
}
}
func (c *Client)SetConditions(capacity int) {
c.bro.setConditions(capacity)
}
func (c *Client)Publish(topic string, msg interface{}) error{
return c.bro.publish(topic,msg)
}
func (c *Client)Subscribe(topic string) (<-chan interface{}, error){
return c.bro.subscribe(topic)
}
func (c *Client)Unsubscribe(topic string, sub <-chan interface{}) error {
return c.bro.unsubscribe(topic,sub)
}
func (c *Client)Close() {
c.bro.close()
}
func (c *Client)GetPayLoad(sub <-chan interface{}) interface{}{
for val:= range sub{
if val != nil{
return val
}
}
return nil
}
上面只是准好了代碼結構,但是消息隊列實現的結構我們還沒有設計,現在我們就來設計一下。
type BrokerImpl struct {
exit chan bool
capacity int
topics map[string][]chan interface{} // key: topic value : queue
sync.RWMutex // 同步鎖
}
exit
:也是一個通道,這個用來做關閉消息隊列用的capacity
:即用來設置消息隊列的容量topics
:這里使用一個map結構,key即是topic
,其值則是一個切片,chan
類型,這里這么做的原因是我們一個topic可以有多個訂閱者,所以一個訂閱者對應着一個通道sync.RWMutex
:讀寫鎖,這里是為了防止並發情況下,數據的推送出現錯誤,所以采用加鎖的方式進行保證
好啦,現在我們已經准備的很充分啦,開始接下來方法填充之旅吧~~~
Publish
和broadcast
這里兩個合在一起講的原因是braodcast
是屬於publish
里的。這里的思路很簡單,我們只需要把傳入的數據進行廣播即可了,下面我們來看代碼實現:
func (b *BrokerImpl) publish(topic string, pub interface{}) error {
select {
case <-b.exit:
return errors.New("broker closed")
default:
}
b.RLock()
subscribers, ok := b.topics[topic]
b.RUnlock()
if !ok {
return nil
}
b.broadcast(pub, subscribers)
return nil
}
func (b *BrokerImpl) broadcast(msg interface{}, subscribers []chan interface{}) {
count := len(subscribers)
concurrency := 1
switch {
case count > 1000:
concurrency = 3
case count > 100:
concurrency = 2
default:
concurrency = 1
}
pub := func(start int) {
for j := start; j < count; j += concurrency {
select {
case subscribers[j] <- msg:
case <-time.After(time.Millisecond * 5):
case <-b.exit:
return
}
}
}
for i := 0; i < concurrency; i++ {
go pub(i)
}
}
publish
方法中沒有什么好講的,這里主要說一下broadcast
的實現:
這里主要對數據進行廣播,所以數據推送出去就可以了,沒必要一直等着他推送成功,所以這里我們我們采用goroutine
。在推送的時候,當推送失敗時,我們也不能一直等待呀,所以這里我們加了一個超時機制,超過5毫秒就停止推送,接着進行下面的推送。
可能你們會有疑惑,上面怎么還有一個switch
選項呀,干什么用的呢?考慮這樣一個問題,當有大量的訂閱者時,,比如10000個,我們一個for循環去做消息的推送,那推送一次就會耗費很多時間,並且不同的消費者之間也會產生延時,,所以采用這種方法進行分解可以降低一定的時間。
subscribe
和 unsubScribe
我們先來看代碼:
func (b *BrokerImpl) subscribe(topic string) (<-chan interface{}, error) {
select {
case <-b.exit:
return nil, errors.New("broker closed")
default:
}
ch := make(chan interface{}, b.capacity)
b.Lock()
b.topics[topic] = append(b.topics[topic], ch)
b.Unlock()
return ch, nil
}
func (b *BrokerImpl) unsubscribe(topic string, sub <-chan interface{}) error {
select {
case <-b.exit:
return errors.New("broker closed")
default:
}
b.RLock()
subscribers, ok := b.topics[topic]
b.RUnlock()
if !ok {
return nil
}
// delete subscriber
var newSubs []chan interface{}
for _, subscriber := range subscribers {
if subscriber == sub {
continue
}
newSubs = append(newSubs, subscriber)
}
b.Lock()
b.topics[topic] = newSubs
b.Unlock()
return nil
}
這里其實就很簡單了:
subscribe
:這里的實現則是為訂閱的主題創建一個channel
,然后將訂閱者加入到對應的topic
中就可以了,並且返回一個接收channel
。unsubScribe
:這里實現的思路就是將我們剛才添加的channel
刪除就可以了。
close
func (b *BrokerImpl) close() {
select {
case <-b.exit:
return
default:
close(b.exit)
b.Lock()
b.topics = make(map[string][]chan interface{})
b.Unlock()
}
return
}
這里就是為了關閉整個消息隊列,這句代碼b.topics = make(map[string][]chan interface{})
比較重要,這里主要是為了保證下一次使用該消息隊列不發生沖突。
setConditions
GetPayLoad
還差最后兩個方法,一個是設置我們的消息隊列容量,另一個是封裝一個方法來獲取我們訂閱的消息:
func (b *BrokerImpl)setConditions(capacity int) {
b.capacity = capacity
}
func (c *Client)GetPayLoad(sub <-chan interface{}) interface{}{
for val:= range sub{
if val != nil{
return val
}
}
return nil
}
測試
好啦,代碼這么快就被寫完了,接下來我們進行測試一下吧。
單元測試
正式測試之前,我們還是需要先進行一下單元測試,養成好的習慣,只有先自測了,才能有底氣說我的代碼沒問題,要不直接跑程序,會出現很多bug
的。
這里我們測試方法如下:我們向不同的topic
發送不同的信息,當訂閱者收到消息后,就行取消訂閱。
func TestClient(t *testing.T) {
b := NewClient()
b.SetConditions(100)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
topic := fmt.Sprintf("Golang夢工廠%d", i)
payload := fmt.Sprintf("asong%d", i)
ch, err := b.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
wg.Add(1)
go func() {
e := b.GetPayLoad(ch)
if e != payload {
t.Fatalf("%s expected %s but get %s", topic, payload, e)
}
if err := b.Unsubscribe(topic, ch); err != nil {
t.Fatal(err)
}
wg.Done()
}()
if err := b.Publish(topic, payload); err != nil {
t.Fatal(err)
}
}
wg.Wait()
}
測試通過,沒問題,接下來我們在寫幾個方法測試一下
測試
這里分為兩種方式測試
測試一:使用一個定時器,向一個主題定時推送消息.
// 一個topic 測試
func OnceTopic() {
m := mq.NewClient()
m.SetConditions(10)
ch,err :=m.Subscribe(topic)
if err != nil{
fmt.Println("subscribe failed")
return
}
go OncePub(m)
OnceSub(ch,m)
defer m.Close()
}
// 定時推送
func OncePub(c *mq.Client) {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <- t.C:
err := c.Publish(topic,"asong真帥")
if err != nil{
fmt.Println("pub message failed")
}
default:
}
}
}
// 接受訂閱消息
func OnceSub(m <-chan interface{},c *mq.Client) {
for {
val := c.GetPayLoad(m)
fmt.Printf("get message is %s\n",val)
}
}
測試二:使用一個定時器,定時向多個主題發送消息:
//多個topic測試
func ManyTopic() {
m := mq.NewClient()
defer m.Close()
m.SetConditions(10)
top := ""
for i:=0;i<10;i++{
top = fmt.Sprintf("Golang夢工廠_%02d",i)
go Sub(m,top)
}
ManyPub(m)
}
func ManyPub(c *mq.Client) {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <- t.C:
for i:= 0;i<10;i++{
//多個topic 推送不同的消息
top := fmt.Sprintf("Golang夢工廠_%02d",i)
payload := fmt.Sprintf("asong真帥_%02d",i)
err := c.Publish(top,payload)
if err != nil{
fmt.Println("pub message failed")
}
}
default:
}
}
}
func Sub(c *mq.Client,top string) {
ch,err := c.Subscribe(top)
if err != nil{
fmt.Printf("sub top:%s failed\n",top)
}
for {
val := c.GetPayLoad(ch)
if val != nil{
fmt.Printf("%s get message is %s\n",top,val)
}
}
}
總結
終於幫助姐姐解決了這個問題,姐姐開心死了,給我一頓親,啊不對,是一頓誇,誇的人家都不好意思了。
這一篇你學會了嗎?沒學會不要緊,趕快去把源代碼下載下來,好好通讀一下,很好理解的~~~。
其實這一篇是為了接下來的kafka學習打基礎的,學好了這一篇,接下來學習的kafka就會容易很多啦~~~
github地址:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/queue
如果能給一個小星星就好了~~~
結尾給大家發一個小福利吧,最近我在看[微服務架構設計模式]這一本書,講的很好,自己也收集了一本PDF,有需要的小伙可以到自行下載。獲取方式:關注公眾號:[Golang夢工廠],后台回復:[微服務],即可獲取。
我翻譯了一份GIN中文文檔,會定期進行維護,有需要的小伙伴后台回復[gin]即可下載。
我是asong,一名普普通通的程序猿,讓我一起慢慢變強吧。我自己建了一個golang
交流群,有需要的小伙伴加我vx
,我拉你入群。歡迎各位的關注,我們下期見~~~
推薦往期文章: