NATS客戶端
一個NATS客戶端是基於NATS服務端來說既可以是一個生產數據的也可以是消費數據的。生產數據的叫生產者英文為 publishers,消費數據的叫消費者英文為 subscribers,其中消費者既可以是同步的也可以是異步的。NATS客戶端與NATS服務端是通過點對點的方式進行連接通信的,客戶端是不需要知道彼此的位置就可以相互通信的。
目前Apcera也積極的為我們維護和提供了多個其他語言的客戶端,我們可以直接下載使用。當然,我們也可以自己去寫相關的客戶端代碼。
Go語言版的客戶端
接下來我們就用 Go 語言來自己寫客戶端實現與 NATS 服務進行通信
1. 異步的訂閱者subscriber
手動編寫一個名為 async-sub.go 的源碼,它是一個最基本的帶有 debugging 日志的客戶端代碼,具體代碼如下所示:
package main import ( "runtime" "log" "github.com/nats-io/go-nats" ) func main() { // create server connection natsConnection, _ := nats.Connect(nats.DefaultURL) log.Println("connected to " + nats.DefaultURL) // subscribe to subject log.Printf("subscribing to subject 'foo' \n") natsConnection.Subscribe("foo", func(msg *nats.Msg) { //handle the message log.Printf("received message '%s\n", string(msg.Data) + "'") }) // keep the connection alive runtime.Goexit() }
解釋
1. import packages
我們需要引入有關 nats 相關服務包;而對於訂閱者subscriber還必須引入go包中的runtime;我們還需要使用go中的log包,用於客戶端的日志信息打印,方便我們隨時查看問題。
2. 創建連接
NATS服務端口默認是運行在4222,客戶端采用默認連接方式創建一個連接。其中nats.DefaultURL->nats://localhost:4222
3. 訂閱主題
客戶端訂閱主題為“foo“的NATS消息。訂閱者的方法會返回收到消息的有效信息。
4. 消息處理
訂閱者實現異步消息處理程序來處理消息。在這個應用案例中,客戶端只是將每個收到的消息進行日志的打印。沒有顯式的去編寫消息處理程序代碼,訂閱是同步的要求是需要額外客戶端代碼來處理消息(參見下面的同步用戶的例子)。
5. 保持連接一直為激活狀體啊
這個 runtime.Goexit() 是保證在主程序執行完之后客戶端程序一直為激活狀態,換句話說,客戶端並不會單一接收到消息后就終止運行了。
6. 測試
運行我們上面寫到代碼,如果你是在編譯器中編寫的代碼,那么可以直接在編譯器上運行;如果是記事本類進行編寫的,則需要在終端運行:
go run async-sub.go
最終打印的結果為:
2017/04/05 14:54:57 connected to nats://localhost:4222 2017/04/05 14:54:57 subscribing to subject 'foo'
2. 簡單發布者publisher
這個客戶端 pub-simple.go 在主題 "foo" 上發布一個簡單的NATS消息為“Hello NATS“,那么訂閱者客戶端 async-sub.go 就應該會收到這條消息
package main import ( "log" "github.com/nats-io/go-nats" "time" ) func main() { // create server connection and defer close natsConnection, _ := nats.Connect(nats.DefaultURL) defer natsConnection.Close() log.Println("connected to " + nats.DefaultURL) // publish messge on subject by name foo subject := "foo" natsConnection.Publish(subject, []byte("Hello NATS")) log.Printf("published message on subject " + subject) time.Sleep(30 * time.Second) }
這里對應訂閱主題是可以支持簡單正則相關的定義,所以我們可以指定很多規則。這段代碼和上面訂閱者的代碼類似,這里就不多做冗余講解了,直接將運行結果貼出來:
2017/04/05 16:00:03 connected to nats://localhost:4222 2017/04/05 16:00:03 subscribing to subject 'foo' 2017/04/05 16:00:12 received message 'Hello NATS'
除了一個簡單的產生者發送消息(使用字符串的方式)案例,我們還可以使用內置的 msg 結構體,具體案例如下所示:
package main import ( "log" "github.com/nats-io/go-nats" ) func main() { // create server connection and defer close natsConnection, _ := nats.Connect(nats.DefaultURL) defer natsConnection.Close() log.Println("connected to " + nats.DefaultURL) // msg structure msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World")} natsConnection.PublishMsg(msg) log.Println("published msg.Subject = " + msg.Subject, " | msg.Data = " + string(msg.Data)) }
運行結果和上面的類似。
3. 同步訂閱者客戶端
同步客戶並沒有實現與用戶的消息處理程序。相反,接收訂閱的客戶端負責實現代碼來處理消息。此時客戶被阻塞無法接受更多的信息,直到客戶端處理完返回消息。
舉例如下
sub, err := natsConnection.SubscribeSync("foo") m, err := sub.NextMsg(timeout)
4. 權限驗證
我們在啟動NATS服務的時候想要設定一定的權限控制,這樣當客戶端連接的時候必須需要驗證才能訪問連接。目前具體的有兩種方式,一種是配置文件配置,另外一種就是在啟動時加上相應的權限內容。這里說一下命令行啟動時如何開啟權限驗證:
gnatsd -DV -m 8222 -user foo -pass bar
其中 -DV 是 logging 的配置項,目的是為了更加詳細的觀察每一個客戶端的連接詳情,具體其他的可以參看我上一篇博客。這里只講用到的:
-D, --debug Enable debugging output
-V, --trace Trace the raw protocol
-DV Debug and trace
-user User required for connections
-pass Password required for connections
啟動信息打印如下:
[4037] 2017/04/05 17:20:47.015397 [INF] Starting nats-server version 0.9.6 [4037] 2017/04/05 17:20:47.015478 [DBG] Go build version go1.8 [4037] 2017/04/05 17:20:47.015489 [INF] Starting http monitor on 0.0.0.0:8222 [4037] 2017/04/05 17:20:47.015589 [INF] Listening for client connections on 0.0.0.0:4222 [4037] 2017/04/05 17:20:47.015624 [DBG] Server id is S1wbxzcmrpcNKURHXNEfvc [4037] 2017/04/05 17:20:47.015629 [INF] Server is ready
如果我還直接用上面沒有改動的 async-sub.go 代碼直接運行,觀察 NATS 服務器日志信息打印如下:
[4037] 2017/04/05 17:23:41.289881 [DBG] ::1:52074 - cid:1 - Client connection created [4037] 2017/04/05 17:23:41.290715 [TRC] ::1:52074 - cid:1 - ->> [CONNECT {"verbose":false,"pedantic":false,"tls_required":false,"name":"","lang":"go","version":"1.2.2","protocol":1}] [4037] 2017/04/05 17:23:41.290805 [ERR] ::1:52074 - cid:1 - Authorization Error [4037] 2017/04/05 17:23:41.290817 [TRC] ::1:52074 - cid:1 - <<- [-ERR Authorization Violation] [4037] 2017/04/05 17:23:41.290839 [DBG] ::1:52074 - cid:1 - Client connection closed
那么接下來我們就需要改動上面已經寫過的客戶端 訂閱者 和 發布者 的代碼
1. async-sub.go 代碼修改,其中修改的部分用紅色字體標注
package main import ( "runtime" "log" "github.com/nats-io/go-nats" ) func main() { // create server connection natsConnection, _ := nats.Connect("nats://foo:bar@localhost:4222") log.Println("connected to " + nats.DefaultURL) // subscribe to subject log.Printf("subscribing to subject 'foo' \n") natsConnection.Subscribe("foo", func(msg *nats.Msg) { //handle the message log.Printf("received message '%s\n", string(msg.Data) + "'") }) // keep the connection alive runtime.Goexit() }
此時服務器后端日志信息打印如下內容:
[4037] 2017/04/05 17:31:13.681407 [TRC] ::1:52104 - cid:2 - ->> [PING] [4037] 2017/04/05 17:31:13.681411 [TRC] ::1:52104 - cid:2 - <<- [PONG] [4037] 2017/04/05 17:31:13.681608 [TRC] ::1:52104 - cid:2 - ->> [SUB foo 1]
2. pub-simple.go 代碼修改,其中修改的部分用紅色字體標注
package main import ( "log" "github.com/nats-io/go-nats" ) func main() { // create server connection and defer close natsConnectionString := "nats://foo:bar@localhost:4222" natsConnection, _ := nats.Connect(natsConnectionString) defer natsConnection.Close() log.Println("connected to " + nats.DefaultURL) // msg structure msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World")} natsConnection.PublishMsg(msg) log.Println("published msg.Subject = " + msg.Subject, " | msg.Data = " + string(msg.Data)) }
此時服務器終端打印日志信息如下:
[4037] 2017/04/05 17:33:13.687547 [DBG] ::1:52104 - cid:2 - Client Ping Timer [4037] 2017/04/05 17:33:13.687584 [TRC] ::1:52104 - cid:2 - <<- [PING] [4037] 2017/04/05 17:33:13.687651 [TRC] ::1:52104 - cid:2 - ->> [PING] [4037] 2017/04/05 17:33:13.687660 [TRC] ::1:52104 - cid:2 - <<- [PONG] [4037] 2017/04/05 17:33:13.687854 [TRC] ::1:52104 - cid:2 - ->> [PONG] [4037] 2017/04/05 17:35:13.691474 [DBG] ::1:52104 - cid:2 - Client Ping Timer [4037] 2017/04/05 17:35:13.691504 [TRC] ::1:52104 - cid:2 - <<- [PING] [4037] 2017/04/05 17:35:13.691589 [TRC] ::1:52104 - cid:2 - ->> [PING] [4037] 2017/04/05 17:35:13.691599 [TRC] ::1:52104 - cid:2 - <<- [PONG] [4037] 2017/04/05 17:35:13.691658 [TRC] ::1:52104 - cid:2 - ->> [PONG] [4037] 2017/04/05 17:35:28.933556 [DBG] ::1:52117 - cid:3 - Client connection created [4037] 2017/04/05 17:35:28.933944 [TRC] ::1:52117 - cid:3 - ->> [CONNECT {"verbose":false,"pedantic":false,"user":"foo","pass":"bar","tls_required":false,"name":"","lang":"go","version":"1.2.2","protocol":1}] [4037] 2017/04/05 17:35:28.933986 [TRC] ::1:52117 - cid:3 - ->> [PING] [4037] 2017/04/05 17:35:28.933992 [TRC] ::1:52117 - cid:3 - <<- [PONG] [4037] 2017/04/05 17:35:28.934227 [TRC] ::1:52117 - cid:3 - ->> [PUB foo bar 11] [4037] 2017/04/05 17:35:28.934240 [TRC] ::1:52117 - cid:3 - ->> MSG_PAYLOAD: [Hello World] [4037] 2017/04/05 17:35:28.934270 [TRC] ::1:52104 - cid:2 - <<- [MSG foo 1 bar 11] [4037] 2017/04/05 17:35:28.935859 [DBG] ::1:52117 - cid:3 - Client connection closed
從以上結果可以看出,前10行日志是訂閱者自發的ping-pong操作,檢測服務是否可以ping通,說白了就是心跳檢測。后面開始有發布者開始創建連接,創建成功后會打印發布的消息日志,最后關閉響應的連接。
以上就是使用 Go 語言模擬客戶端 發布者/訂閱者的實現
