到目前為止,我們項目的結果大致如下:
-
傳感器生成的模擬數據(包含傳感器名稱、數據、時間戳)是通過傳感器在運行時動態創建的 Queue 來發送的。這些 Queue 很難直接被發現。
-
為了解決這個問題,我創建了另一個消息,它包含各傳感器的 Queue 的路由 key,這個消息是在一個“眾所周知”的 Queue 上發布的,所以協調器就可以得到傳感器的路由信息。
-
傳感器的數據是發布在默認的 Direct Exchange 上,也就是說只有一個消費者可以收到這個消息,這就是我們想要的效果。具體的,無論有多少個協調器,RabbitMQ 會保證只有一個協調器會收到信息,並且只會收到一次。
-
然后,用於發現傳感器的路徑確有不同的需求,如果存在多個協調器,那么當傳感器上線的時候,所有的協調器都必須得知,所以就不能使用 Direct Exchange 了。這時使用 Fanout Exchange 就比較合理了,Fanout Exchange 將會同時通知所有附加在 Exchange 上面的 Queue,也就是把傳感器的路由信息發送給所有在線的協調器。
-
但是這也有其他問題:如果沒有接收者監聽,那么這些路由信息不會保留,這個問題稍后再解決,我們先把發布路由信息的 Exchange 從 Direct 改為 Fanout。
使用 Fanout Exchange 發布傳感器路由信息
目前,在傳感器項目中,我們使用默認的 Direct Exchange 來發布傳感器路由消息:
看一下管理控制台,可以看到 RabbitMQ 還提供了一個 Fanout Exchange 也就是 amq.fanout:
修改代碼,暫時改用 amq.fanout 來發布傳感器路由信息:
-
首先,刪除第 38 行的代碼,它原是用來創建一個 Queue 以便協調程序可以接收到傳感器的路由信息。現在,這個工作將由 Exchange 的消費者們來完成,它們會創建自己的 Queue 來監聽這個 Exchange。
-
第 43 行,把路由 Key 改為 “”,因為 Fanout Exchange 不需要使用該 Key 來決定消息發往哪里,它會把消息進行復制並發送到每個綁定到它的 Queue 上面。
-
最后,第 42 行,把 exchange 這個參數改為 amq.fanout。
運行 sensors 項目查看效果
打開控制台:
可以看到 amq.fanout 確實有數據了,盡管現在的消息傳遞速率為 0。
點進去:
可以看到一個路由信息,但是因為沒有任何 Queue 綁定到這個 Exchange,這個消息就丟失了,因為消息無處可發。
重建協調器
在最早幾節內容中,我做了一個非常簡單的協調器程序,它可以簡單的發布和接收消息。為了配合我們的應用場景,我們需要建立一個更健壯一些的協調器。它的主要職責是:通過消息代理(RabbitMQ)與傳感器進行交互。
不過首先,為了代碼復用,我對現有的項目結構進行調整:
我把項目的外層目錄名從 sensors 改為 demo,然后在里面建立sensors 文件夾,把 main.go 移動到 sensors 里面,並改名為 sensor.go。
然后建立 coordinator 文件夾,在里面建立 queuelistener.go 文件,內容較多,我分為三個圖展示:
-
第 15 行,建立 QueueListener struct,它里面包含發現傳感器數值 Queue 的邏輯,接收它們的消息,並把它們在一個事件聚合器里面翻譯成事件。不過目前它主要聚焦獲取消息這項工作,所以它有三個字段:
-
到 RabbitMQ 的連接
-
在該連接上的 Channel
-
一個 Map,當作注冊表,里面存放着這個協調器所監聽的源,使用 Map 可以防止將同一個傳感器注冊兩次,而當傳感器下線的時候可以通過這個 Map 來關閉監聽(這個我就不實現了)
-
第 21 行,建立一個構造函數,它可以返回一個 *QueueListener
第 31 行創建一個方法 ListenForNewSource:
-
它可以讓 QueueListener 發現新的傳感器,在這里創建 Queue 的時候,我們不關心 Queue 的名稱,所以 name 參數為“”,這樣的話 RabbitMQ 會為它創建一個唯一的名稱。
-
但是當 Queue 被創建時,它會默認綁定到 Direct Exchange。而在之前,我剛把代碼修改為讓傳感器通過 amq.fanout Exchange 來發布它們的信息,所以我們需要把這個 Queue 重新綁定到那個上面。這里就使用 Channel 上的 QueueBind 方法來實現(第 33 行)。
-
QueueBind 方法參數:
-
第一個參數是剛剛創建的 Queue 的名稱,這就是要綁定的 Queue
-
第二個參數是路由 Key,由於 Fanout Exchange 會忽略這個參數,所以這里寫“”
-
第三個參數是要綁定的 Exchange 的名稱,也就是 amq.fanout
-
第四個參數,如果把 noWait 設置為 true,那么萬一綁定不成功,就會把 Channel 關閉。這里我把它設為 false,因為我知道 Exchange 和 Queue 都會存在,如果失敗,那么會關閉 Channel 並發生錯誤。
-
第五個參數不需要,設為 nil
-
第 40 行,設置消息的接收,返回 Go Channel,這里的參數需要用到 Queue 的名稱
-
第 49 行,通過 for range 來處理通過 Go Channel 發過來的消息。如果接收到消息,表示有新的傳感器上線了。
-
第 50 行,在有傳感器上線后,通過 Consume 方法和 msg.Body(也就是傳感器的名稱),來讀取傳感器的模擬數據。記得我們把傳感器的模擬數據發布到了默認的 Direct Exchange 上面,所以每次只會把消息傳遞給一個接收者,這意味着,當我注冊了多個協調器的時候,它們將共享到這些 Queue 的訪問,當這些發生的時候,RabbitMQ 將會輪流傳送給每一個注冊的接收者。這也就允許我們對協調器進行橫向擴展,而且不影響整個系統其余的部分。
-
第 59 行,判斷傳感器是否在該協調器中注冊,如果沒有,那就進行注冊。
-
第 62 行,使用 goroutine 來調用 AddListener 方法,該方法代碼如下:
-
這個方法將會監聽 Go Channel 中的消息
-
在里面使用 for range 來等待 Go Channel 傳送消息
-
在這里,我們把二進制數據轉化為我們可以在程序里使用的數據,也就是 SensorMessage 類型
-
然后暫時先打印即可
建立協調器的 main
在 coordinator 目錄下建立 exec 文件夾,目的是創建 main package,在里面創建 main.go 代碼如下:
-
第 9 行,我們創建一個 QueueListener
-
第 10 行,使用 goroutine 讓他進行監聽,防止阻塞主線程
-
第 12-13 行的目的就是讓程序一直存活,防止 goroutine 停止運行。
最后 sensor.go 里面有一處代碼需要修改,在 main 函數的 for 循環里面,每次使用 encoder 的時候都需要 重新創建一個,所以我添加了 63 行的代碼:
運行
我們運行一下試試,注意:一定要先運行 coordinator 項目,然后再運行 sensors 項目,否則會有問題。 下面左側是 coordinator,右側是 sensors:
可以看到 coordinator(協調器)可以讀取到傳感器的數據了。
這里我們使用了一個最簡單最基本的機制來做傳感器 Queue 的發現。