RabbitMQ 入門 (Go) - 4. 使用 Fanout Exchange 做服務發現(上)


到目前為止,我們項目的結果大致如下:

 

 

  • 傳感器生成的模擬數據(包含傳感器名稱、數據、時間戳)是通過傳感器在運行時動態創建的 Queue 來發送的。這些 Queue 很難直接被發現。

  • 為了解決這個問題,我創建了另一個消息,它包含各傳感器的 Queue 的路由 key,這個消息是在一個“眾所周知”的 Queue 上發布的,所以協調器就可以得到傳感器的路由信息。

  • 傳感器的數據是發布在默認的 Direct     Exchange 上,也就是說只有一個消費者可以收到這個消息,這就是我們想要的效果。具體的,無論有多少個協調器,RabbitMQ 會保證只有一個協調器會收到信息,並且只會收到一次。

  • 然后,用於發現傳感器的路徑確有不同的需求,如果存在多個協調器,那么當傳感器上線的時候,所有的協調器都必須得知,所以就不能使用 Direct     Exchange 了。這時使用 Fanout Exchange 就比較合理了,Fanout Exchange 將會同時通知所有附加在 Exchange 上面的 Queue,也就是把傳感器的路由信息發送給所有在線的協調器。

  • 但是這也有其他問題:如果沒有接收者監聽,那么這些路由信息不會保留,這個問題稍后再解決,我們先把發布路由信息的 Exchange 從 Direct 改為 Fanout。

 

使用 Fanout Exchange 發布傳感器路由信息

目前,在傳感器項目中,我們使用默認的 Direct Exchange 來發布傳感器路由消息:

 

 

看一下管理控制台,可以看到 RabbitMQ 還提供了一個 Fanout Exchange 也就是 amq.fanout: 

 

 

 

 

修改代碼,暫時改用 amq.fanout 來發布傳感器路由信息:

 

 

 

  1. 首先,刪除第 38 行的代碼,它原是用來創建一個 Queue 以便協調程序可以接收到傳感器的路由信息。現在,這個工作將由 Exchange 的消費者們來完成,它們會創建自己的 Queue 來監聽這個 Exchange。

  2. 第 43 行,把路由 Key 改為 “”,因為 Fanout Exchange 不需要使用該 Key 來決定消息發往哪里,它會把消息進行復制並發送到每個綁定到它的 Queue 上面。

  3. 最后,第 42 行,把 exchange 這個參數改為 amq.fanout。

 

運行 sensors 項目查看效果

 

 

 

打開控制台:

 

 

 

可以看到 amq.fanout 確實有數據了,盡管現在的消息傳遞速率為 0。

 

點進去:

 

 

 

可以看到一個路由信息,但是因為沒有任何 Queue 綁定到這個 Exchange,這個消息就丟失了,因為消息無處可發。

  

重建協調器

在最早幾節內容中,我做了一個非常簡單的協調器程序,它可以簡單的發布和接收消息。為了配合我們的應用場景,我們需要建立一個更健壯一些的協調器。它的主要職責是:通過消息代理(RabbitMQ)與傳感器進行交互。

不過首先,為了代碼復用,我對現有的項目結構進行調整:

 

 

 

我把項目的外層目錄名從 sensors 改為 demo,然后在里面建立sensors 文件夾,把 main.go 移動到 sensors 里面,並改名為 sensor.go。

 

然后建立 coordinator 文件夾,在里面建立 queuelistener.go 文件,內容較多,我分為三個圖展示:

 

 

 

  1. 第 15 行,建立 QueueListener struct,它里面包含發現傳感器數值 Queue 的邏輯,接收它們的消息,並把它們在一個事件聚合器里面翻譯成事件。不過目前它主要聚焦獲取消息這項工作,所以它有三個字段:

    1. 到 RabbitMQ 的連接

    2. 在該連接上的 Channel

    3. 一個 Map,當作注冊表,里面存放着這個協調器所監聽的源,使用 Map 可以防止將同一個傳感器注冊兩次,而當傳感器下線的時候可以通過這個 Map 來關閉監聽(這個我就不實現了)

  2. 第 21 行,建立一個構造函數,它可以返回一個 *QueueListener

 

 

 第 31 行創建一個方法 ListenForNewSource:

  1. 它可以讓 QueueListener 發現新的傳感器,在這里創建 Queue 的時候,我們不關心 Queue 的名稱,所以 name 參數為“”,這樣的話 RabbitMQ 會為它創建一個唯一的名稱。

  2. 但是當 Queue 被創建時,它會默認綁定到 Direct Exchange。而在之前,我剛把代碼修改為讓傳感器通過 amq.fanout Exchange 來發布它們的信息,所以我們需要把這個 Queue 重新綁定到那個上面。這里就使用 Channel 上的 QueueBind 方法來實現(第 33 行)。

  3. QueueBind 方法參數:

    1. 第一個參數是剛剛創建的 Queue 的名稱,這就是要綁定的 Queue

    2. 第二個參數是路由 Key,由於 Fanout Exchange 會忽略這個參數,所以這里寫“”

    3. 第三個參數是要綁定的 Exchange 的名稱,也就是 amq.fanout

    4. 第四個參數,如果把 noWait 設置為 true,那么萬一綁定不成功,就會把 Channel 關閉。這里我把它設為 false,因為我知道 Exchange 和 Queue 都會存在,如果失敗,那么會關閉 Channel 並發生錯誤。

    5. 第五個參數不需要,設為 nil

  4. 第 40 行,設置消息的接收,返回 Go Channel,這里的參數需要用到 Queue 的名稱

  5. 第 49 行,通過 for range 來處理通過 Go Channel 發過來的消息。如果接收到消息,表示有新的傳感器上線了。

  6. 第 50 行,在有傳感器上線后,通過 Consume 方法和 msg.Body(也就是傳感器的名稱),來讀取傳感器的模擬數據。記得我們把傳感器的模擬數據發布到了默認的 Direct Exchange 上面,所以每次只會把消息傳遞給一個接收者,這意味着,當我注冊了多個協調器的時候,它們將共享到這些 Queue 的訪問,當這些發生的時候,RabbitMQ 將會輪流傳送給每一個注冊的接收者。這也就允許我們對協調器進行橫向擴展,而且不影響整個系統其余的部分。

  7. 第 59 行,判斷傳感器是否在該協調器中注冊,如果沒有,那就進行注冊。

  8. 第 62 行,使用 goroutine 來調用 AddListener 方法,該方法代碼如下:

 

 

  1. 這個方法將會監聽 Go Channel 中的消息

  2. 在里面使用 for range 來等待 Go Channel 傳送消息

  3. 在這里,我們把二進制數據轉化為我們可以在程序里使用的數據,也就是     SensorMessage 類型

  4. 然后暫時先打印即可

 

建立協調器的 main

在 coordinator 目錄下建立 exec 文件夾,目的是創建 main package,在里面創建 main.go 代碼如下:

 

 

  1. 第 9 行,我們創建一個 QueueListener

  2. 第 10 行,使用 goroutine 讓他進行監聽,防止阻塞主線程

  3. 第 12-13 行的目的就是讓程序一直存活,防止 goroutine 停止運行。

最后 sensor.go 里面有一處代碼需要修改,在 main 函數的 for 循環里面,每次使用 encoder 的時候都需要 重新創建一個,所以我添加了 63 行的代碼:

 

 

運行 

我們運行一下試試,注意:一定要先運行 coordinator 項目,然后再運行 sensors 項目,否則會有問題。 下面左側是 coordinator,右側是 sensors:

 

 

可以看到 coordinator(協調器)可以讀取到傳感器的數據了。 

這里我們使用了一個最簡單最基本的機制來做傳感器 Queue 的發現。

 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM