使用go-channel實現消息隊列


前言

這周姐姐入職了新公司,老板想探探他的底,看了一眼他的簡歷,呦呵,精通kafka,這小姑娘有兩下子,既然這樣,那你寫一個消息隊列吧。因為要用go語言寫,這可給姐姐愁壞了。趕緊來求助我,我這么堅貞不屈一人,在姐姐的軟磨硬泡下還是答應他了,所以接下來我就手把手教姐姐怎么寫一個消息隊列。下面我們就來看一看我是怎么寫的吧~~~。

本代碼已上傳到我的github:

有需要的小伙伴,可自行下載,順便給個小星星吧~~~

什么是消息隊列

姐姐真是把我愁壞了,自己寫的精通kafka,竟然不知道什么是消息隊列,於是,一向好脾氣的我開始給姐姐講一講什么是消息隊列。

消息隊列,我們一般稱它為MQ(Message Queue),兩個單詞的結合,這兩個英文單詞想必大家都應該知道吧,其實最熟悉的還是Queue吧,即隊列。隊列是一種先進先出的數據結構,隊列的使用還是比較普遍的,但是已經有隊列了,怎么還需要MQ呢?

我:問你呢,姐姐,知道嗎?為什么還需要MQ

姐姐:快點講,想挨打呀?

我:噗。。。 算我多嘴,哼~~~

欠欠的我開始了接下來的耐心講解......

舉一個簡單的例子,假設現在我們要做一個系統,該登陸系統需要在用戶登陸成功后,發送封郵件到用戶郵箱進行提醒,需求還是很簡單的,我們先開看一看沒有MQ,我們該怎么實現呢?畫一個時序圖來看一看:

看這個圖,郵件發送在請求登陸時進行,當密碼驗證成功后,就發送郵件,然后返回登陸成功。這樣是可以的,但是他是有缺陷的。這讓我們的登陸操作變得復雜了,每次請求登陸都需要進行郵件發送,如果這里出現錯誤,整個登陸請求也出現了錯誤,導致登陸不成功;還有一個問題,本來我們登陸請求調用接口僅僅需要100ms,因為中間要做一次發送郵件的等待,那么調用一次登陸接口的時間就要增長,這就是問題所在,一封郵件他的優先級 不是很高的,用戶也不需要實時收到這封郵件,所以這時,就體現了消息隊列的重要性了,我們用消息隊列進行改進一下。

這里我們將發送郵件請求放到Mq中,這樣我們就能提高用戶體驗的吞吐量,這個很重要,顧客就是上帝嘛,畢竟也沒有人喜歡用一個很慢很慢的app。

這里只是舉了MQ眾多應用中的其中一個,即異步應用,MQ還在系統解藕、削峰/限流中有着重要應用,這兩個我就不具體講解了,原理都一樣,好好思考一下,你們都能懂得。

channel

好啦,姐姐終於知道什么是消息隊列了,但是現在還是沒法進行消息隊列開發的,因為還差一個知識點,即go語言中的channel。這個很重要,我們還需要靠這個來開發我們的消息隊列呢。

因篇幅有限,這里不詳細介紹channel,只介紹基本使用方法。

什么是channel

Goroutine 和 Channel 是 Go 語言並發編程的兩大基石。Goroutine 用於執行並發任務,Channel 用於 goroutine 之間的同步、通信。Go提倡使用通信的方法代替共享內存,當一個Goroutine需要和其他Goroutine資源共享時,Channel就會在他們之間架起一座橋梁,並提供確保安全同步的機制。channel本質上其實還是一個隊列,遵循FIFO原則。具體規則如下:

  • 先從 Channel 讀取數據的 Goroutine 會先接收到數據;
  • 先向 Channel 發送數據的 Goroutine 會得到先發送數據的權利;

創建通道

創建通道需要用到關鍵字 make ,格式如下:

通道實例 := make(chan 數據類型)
  • 數據類型:通道內傳輸的元素類型。
  • 通道實例:通過make創建的通道句柄。

無緩沖通道的使用

Go語言中無緩沖的通道(unbuffered channel)是指在接收前沒有能力保存任何值的通道。這種類型的通道要求發送 goroutine 和接收 goroutine 同時准備好,才能完成發送和接收操作。

無緩沖通道的定義方式如下:

通道實例 := make(chan 通道類型)
  • 通道類型:和無緩沖通道用法一致,影響通道發送和接收的數據類型。
  • 緩沖大小:0
  • 通道實例:被創建出的通道實例。

寫個例子來幫助大家理解一下吧:

package main

import (
    "sync"
    "time"
)

func main() {
    c := make(chan string)

    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        defer wg.Done()
        c <- `Golang夢工廠`
    }()

    go func() {
        defer wg.Done()

        time.Sleep(time.Second * 1)
        println(`Message: `+ <-c)
    }()

    wg.Wait()
}

帶緩沖的通道的使用

Go語言中有緩沖的通道(buffered channel)是一種在被接收前能存儲一個或者多個值的通道。這種類型的通道並不強制要求 goroutine 之間必須同時完成發送和接收。通道會阻塞發送和接收動作的條件也會不同。只有在通道中沒有要接收的值時,接收動作才會阻塞。只有在通道沒有可用緩沖區容納被發送的值時,發送動作才會阻塞。

有緩沖通道的定義方式如下:

通道實例 := make(chan 通道類型, 緩沖大小)
  • 通道類型:和無緩沖通道用法一致,影響通道發送和接收的數據類型。
  • 緩沖大小:決定通道最多可以保存的元素數量。
  • 通道實例:被創建出的通道實例。

來寫一個例子講解一下:

package main

import (
    "sync"
    "time"
)

func main() {
    c := make(chan string, 2)

    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        defer wg.Done()

        c <- `Golang夢工廠`
        c <- `asong`
    }()

    go func() {
        defer wg.Done()

        time.Sleep(time.Second * 1)
        println(`公眾號: `+ <-c)
        println(`作者: `+ <-c)
    }()

    wg.Wait()
}

好啦,通道的概念就介紹到這里了,如果需要,下一篇我出一個channel詳細講解的文章。

消息隊列編碼實現

准備篇

終於開始進入主題了,姐姐都聽的快要睡着了,我轟隆一嗓子,立馬精神,但是呢,asong也是挨了一頓小電炮,代價慘痛呀,嗚嗚嗚............

在開始編寫代碼編寫直接,我需要構思我們的整個代碼架構,這才是正確的編碼方式。我們先來定義一個接口,把我們需要實現的方法先列出來,后期對每一個代碼進行實現就可以了。因此可以列出如下方法:

type Broker interface {
 publish(topic string, msg interface{}) error
 subscribe(topic string) (<-chan interface{}, error)
 unsubscribe(topic string, sub <-chan interface{}) error
 close()
 broadcast(msg interface{}, subscribers []chan interface{})
 setConditions(capacity int)
}
  • publish:進行消息的推送,有兩個參數即topicmsg,分別是訂閱的主題、要傳遞的消息
  • subscribe:消息的訂閱,傳入訂閱的主題,即可完成訂閱,並返回對應的channel通道用來接收數據
  • unsubscribe:取消訂閱,傳入訂閱的主題和對應的通道
  • close:這個的作用就是很明顯了,就是用來關閉消息隊列的
  • broadCast:這個屬於內部方法,作用是進行廣播,對推送的消息進行廣播,保證每一個訂閱者都可以收到
  • setConditions:這里是用來設置條件,條件就是消息隊列的容量,這樣我們就可以控制消息隊列的大小了

細心的你們有沒有發現什么問題,這些代碼我都定義的是內部方法,也就是包外不可用。為什么這么做呢,因為這里屬於代理要做的事情,我們還需要在封裝一層,也就是客戶端能直接調用的方法,這樣才符合軟件架構。因此可以寫出如下代碼:

package mq


type Client struct {
 bro *BrokerImpl
}

func NewClient() *Client {
 return &Client{
  bro: NewBroker(),
 }
}

func (c *Client)SetConditions(capacity int)  {
 c.bro.setConditions(capacity)
}

func (c *Client)Publish(topic string, msg interface{}) error{
 return c.bro.publish(topic,msg)
}

func (c *Client)Subscribe(topic string) (<-chan interface{}, error){
 return c.bro.subscribe(topic)
}

func (c *Client)Unsubscribe(topic string, sub <-chan interface{}) error {
 return c.bro.unsubscribe(topic,sub)
}

func (c *Client)Close()  {
  c.bro.close()
}

func (c *Client)GetPayLoad(sub <-chan interface{})  interface{}{
 for val:= range sub{
  if val != nil{
   return val
  }
 }
 return nil
}

上面只是准好了代碼結構,但是消息隊列實現的結構我們還沒有設計,現在我們就來設計一下。

type BrokerImpl struct {
 exit chan bool
 capacity int

 topics map[string][]chan interface{} // key: topic  value : queue
 sync.RWMutex // 同步鎖
}
  • exit:也是一個通道,這個用來做關閉消息隊列用的
  • capacity:即用來設置消息隊列的容量
  • topics:這里使用一個map結構,key即是topic,其值則是一個切片,chan類型,這里這么做的原因是我們一個topic可以有多個訂閱者,所以一個訂閱者對應着一個通道
  • sync.RWMutex:讀寫鎖,這里是為了防止並發情況下,數據的推送出現錯誤,所以采用加鎖的方式進行保證

好啦,現在我們已經准備的很充分啦,開始接下來方法填充之旅吧~~~

Publishbroadcast

這里兩個合在一起講的原因是braodcast是屬於publish里的。這里的思路很簡單,我們只需要把傳入的數據進行廣播即可了,下面我們來看代碼實現:

func (b *BrokerImpl) publish(topic string, pub interface{}) error {
 select {
 case <-b.exit:
  return errors.New("broker closed")
 default:
 }

 b.RLock()
 subscribers, ok := b.topics[topic]
 b.RUnlock()
 if !ok {
  return nil
 }

 b.broadcast(pub, subscribers)
 return nil
}


func (b *BrokerImpl) broadcast(msg interface{}, subscribers []chan interface{}) {
 count := len(subscribers)
 concurrency := 1

 switch {
 case count > 1000:
  concurrency = 3
 case count > 100:
  concurrency = 2
 default:
  concurrency = 1
 }
 pub := func(start int) {
  for j := start; j < count; j += concurrency {
   select {
   case subscribers[j] <- msg:
   case <-time.After(time.Millisecond * 5):
   case <-b.exit:
    return
   }
  }
 }
 for i := 0; i < concurrency; i++ {
  go pub(i)
 }
}

publish方法中沒有什么好講的,這里主要說一下broadcast的實現:

這里主要對數據進行廣播,所以數據推送出去就可以了,沒必要一直等着他推送成功,所以這里我們我們采用goroutine。在推送的時候,當推送失敗時,我們也不能一直等待呀,所以這里我們加了一個超時機制,超過5毫秒就停止推送,接着進行下面的推送。

可能你們會有疑惑,上面怎么還有一個switch選項呀,干什么用的呢?考慮這樣一個問題,當有大量的訂閱者時,,比如10000個,我們一個for循環去做消息的推送,那推送一次就會耗費很多時間,並且不同的消費者之間也會產生延時,,所以采用這種方法進行分解可以降低一定的時間。

subscribeunsubScribe

我們先來看代碼:

func (b *BrokerImpl) subscribe(topic string) (<-chan interface{}, error) {
 select {
 case <-b.exit:
  return nil, errors.New("broker closed")
 default:
 }

 ch := make(chan interface{}, b.capacity)
 b.Lock()
 b.topics[topic] = append(b.topics[topic], ch)
 b.Unlock()
 return ch, nil
}
func (b *BrokerImpl) unsubscribe(topic string, sub <-chan interface{}) error {
 select {
 case <-b.exit:
  return errors.New("broker closed")
 default:
 }

 b.RLock()
 subscribers, ok := b.topics[topic]
 b.RUnlock()

 if !ok {
  return nil
 }
 // delete subscriber
 var newSubs []chan interface{}
 for _, subscriber := range subscribers {
  if subscriber == sub {
   continue
  }
  newSubs = append(newSubs, subscriber)
 }

 b.Lock()
 b.topics[topic] = newSubs
 b.Unlock()
 return nil
}

這里其實就很簡單了:

  • subscribe:這里的實現則是為訂閱的主題創建一個channel,然后將訂閱者加入到對應的topic中就可以了,並且返回一個接收channel
  • unsubScribe:這里實現的思路就是將我們剛才添加的channel刪除就可以了。

close

func (b *BrokerImpl) close()  {
 select {
 case <-b.exit:
  return
 default:
  close(b.exit)
  b.Lock()
  b.topics = make(map[string][]chan interface{})
  b.Unlock()
 }
 return
}

這里就是為了關閉整個消息隊列,這句代碼b.topics = make(map[string][]chan interface{})比較重要,這里主要是為了保證下一次使用該消息隊列不發生沖突。

setConditions GetPayLoad

還差最后兩個方法,一個是設置我們的消息隊列容量,另一個是封裝一個方法來獲取我們訂閱的消息:

func (b *BrokerImpl)setConditions(capacity int)  {
 b.capacity = capacity
}
func (c *Client)GetPayLoad(sub <-chan interface{})  interface{}{
 for val:= range sub{
  if val != nil{
   return val
  }
 }
 return nil
}

測試

好啦,代碼這么快就被寫完了,接下來我們進行測試一下吧。

單元測試

正式測試之前,我們還是需要先進行一下單元測試,養成好的習慣,只有先自測了,才能有底氣說我的代碼沒問題,要不直接跑程序,會出現很多bug的。

這里我們測試方法如下:我們向不同的topic發送不同的信息,當訂閱者收到消息后,就行取消訂閱。

func TestClient(t *testing.T) {
 b := NewClient()
 b.SetConditions(100)
 var wg sync.WaitGroup

 for i := 0; i < 100; i++ {
  topic := fmt.Sprintf("Golang夢工廠%d", i)
  payload := fmt.Sprintf("asong%d", i)

  ch, err := b.Subscribe(topic)
  if err != nil {
   t.Fatal(err)
  }

  wg.Add(1)
  go func() {
   e := b.GetPayLoad(ch)
   if e != payload {
    t.Fatalf("%s expected %s but get %s", topic, payload, e)
   }
   if err := b.Unsubscribe(topic, ch); err != nil {
    t.Fatal(err)
   }
   wg.Done()
  }()

  if err := b.Publish(topic, payload); err != nil {
   t.Fatal(err)
  }
 }

 wg.Wait()
}

測試通過,沒問題,接下來我們在寫幾個方法測試一下

測試

這里分為兩種方式測試

測試一:使用一個定時器,向一個主題定時推送消息.

// 一個topic 測試
func OnceTopic()  {
 m := mq.NewClient()
 m.SetConditions(10)
 ch,err :=m.Subscribe(topic)
 if err != nil{
  fmt.Println("subscribe failed")
  return
 }
 go OncePub(m)
 OnceSub(ch,m)
 defer m.Close()
}

// 定時推送
func OncePub(c *mq.Client)  {
 t := time.NewTicker(10 * time.Second)
 defer t.Stop()
 for  {
  select {
  case <- t.C:
   err := c.Publish(topic,"asong真帥")
   if err != nil{
    fmt.Println("pub message failed")
   }
  default:

  }
 }
}

// 接受訂閱消息
func OnceSub(m <-chan interface{},c *mq.Client)  {
 for  {
  val := c.GetPayLoad(m)
  fmt.Printf("get message is %s\n",val)
 }
}

測試二:使用一個定時器,定時向多個主題發送消息:

//多個topic測試
func ManyTopic()  {
 m := mq.NewClient()
 defer m.Close()
 m.SetConditions(10)
 top := ""
 for i:=0;i<10;i++{
  top = fmt.Sprintf("Golang夢工廠_%02d",i)
  go Sub(m,top)
 }
 ManyPub(m)
}

func ManyPub(c *mq.Client)  {
 t := time.NewTicker(10 * time.Second)
 defer t.Stop()
 for  {
  select {
  case <- t.C:
   for i:= 0;i<10;i++{
    //多個topic 推送不同的消息
    top := fmt.Sprintf("Golang夢工廠_%02d",i)
    payload := fmt.Sprintf("asong真帥_%02d",i)
    err := c.Publish(top,payload)
    if err != nil{
     fmt.Println("pub message failed")
    }
   }
  default:

  }
 }
}

func Sub(c *mq.Client,top string)  {
 ch,err := c.Subscribe(top)
 if err != nil{
  fmt.Printf("sub top:%s failed\n",top)
 }
 for  {
  val := c.GetPayLoad(ch)
  if val != nil{
   fmt.Printf("%s get message is %s\n",top,val)
  }
 }
}

總結

終於幫助姐姐解決了這個問題,姐姐開心死了,給我一頓親,啊不對,是一頓誇,誇的人家都不好意思了。

這一篇你學會了嗎?沒學會不要緊,趕快去把源代碼下載下來,好好通讀一下,很好理解的~~~。

其實這一篇是為了接下來的kafka學習打基礎的,學好了這一篇,接下來學習的kafka就會容易很多啦~~~

github地址:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/queue

如果能給一個小星星就好了~~~

結尾給大家發一個小福利吧,最近我在看[微服務架構設計模式]這一本書,講的很好,自己也收集了一本PDF,有需要的小伙可以到自行下載。獲取方式:關注公眾號:[Golang夢工廠],后台回復:[微服務],即可獲取。

我翻譯了一份GIN中文文檔,會定期進行維護,有需要的小伙伴后台回復[gin]即可下載。

我是asong,一名普普通通的程序猿,讓我一起慢慢變強吧。我自己建了一個golang交流群,有需要的小伙伴加我vx,我拉你入群。歡迎各位的關注,我們下期見~~~

推薦往期文章:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM