NATS_08:NATS客戶端Go語言手動編寫


NATS客戶端
    一個NATS客戶端是基於NATS服務端來說既可以是一個生產數據的也可以是消費數據的。生產數據的叫生產者英文為 publishers,消費數據的叫消費者英文為 subscribers,其中消費者既可以是同步的也可以是異步的。NATS客戶端與NATS服務端是通過點對點的方式進行連接通信的,客戶端是不需要知道彼此的位置就可以相互通信的。

  目前Apcera也積極的為我們維護和提供了多個其他語言的客戶端,我們可以直接下載使用。當然,我們也可以自己去寫相關的客戶端代碼。

Go語言版的客戶端

  接下來我們就用 Go 語言來自己寫客戶端實現與 NATS 服務進行通信

 1. 異步的訂閱者subscriber

  手動編寫一個名為 async-sub.go 的源碼,它是一個最基本的帶有 debugging 日志的客戶端代碼,具體代碼如下所示:

package main

import (
    "runtime"
    "log"
    "github.com/nats-io/go-nats"
)

func main() {
    // create server connection
    natsConnection, _ := nats.Connect(nats.DefaultURL)
    log.Println("connected to " + nats.DefaultURL)

    // subscribe to subject
    log.Printf("subscribing to subject 'foo' \n")
    natsConnection.Subscribe("foo", func(msg *nats.Msg) {
        //handle the message
        log.Printf("received message '%s\n", string(msg.Data) + "'")
    })
    
    // keep the connection alive
    runtime.Goexit()
}

解釋

  1. import packages

    我們需要引入有關 nats 相關服務包;而對於訂閱者subscriber還必須引入go包中的runtime;我們還需要使用go中的log包,用於客戶端的日志信息打印,方便我們隨時查看問題。

  2. 創建連接

    NATS服務端口默認是運行在4222,客戶端采用默認連接方式創建一個連接。其中nats.DefaultURL->nats://localhost:4222

  3. 訂閱主題

    客戶端訂閱主題為“foo“的NATS消息。訂閱者的方法會返回收到消息的有效信息。

  4. 消息處理

    訂閱者實現異步消息處理程序來處理消息。在這個應用案例中,客戶端只是將每個收到的消息進行日志的打印。沒有顯式的去編寫消息處理程序代碼,訂閱是同步的要求是需要額外客戶端代碼來處理消息(參見下面的同步用戶的例子)。

  5. 保持連接一直為激活狀體啊

    這個 runtime.Goexit() 是保證在主程序執行完之后客戶端程序一直為激活狀態,換句話說,客戶端並不會單一接收到消息后就終止運行了。

  6. 測試

    運行我們上面寫到代碼,如果你是在編譯器中編寫的代碼,那么可以直接在編譯器上運行;如果是記事本類進行編寫的,則需要在終端運行:

    go run async-sub.go

    最終打印的結果為:

  2017/04/05 14:54:57 connected to nats://localhost:4222
  2017/04/05 14:54:57 subscribing to subject 'foo' 

 2. 簡單發布者publisher

  這個客戶端 pub-simple.go 在主題 "foo" 上發布一個簡單的NATS消息為“Hello NATS“,那么訂閱者客戶端 async-sub.go 就應該會收到這條消息

package main

import (
    "log"
    "github.com/nats-io/go-nats"
    "time"
)

func main() {
    // create server connection and defer close
    natsConnection, _ := nats.Connect(nats.DefaultURL)
    defer natsConnection.Close()
    log.Println("connected to " + nats.DefaultURL)

    // publish messge on subject by name foo
    subject := "foo"
    natsConnection.Publish(subject, []byte("Hello NATS"))
    log.Printf("published message on subject " + subject)

    time.Sleep(30 * time.Second)
}

  這里對應訂閱主題是可以支持簡單正則相關的定義,所以我們可以指定很多規則。這段代碼和上面訂閱者的代碼類似,這里就不多做冗余講解了,直接將運行結果貼出來:

2017/04/05 16:00:03 connected to nats://localhost:4222
2017/04/05 16:00:03 subscribing to subject 'foo' 
2017/04/05 16:00:12 received message 'Hello NATS'

  除了一個簡單的產生者發送消息(使用字符串的方式)案例,我們還可以使用內置的 msg 結構體,具體案例如下所示:

package main

import (
    "log"
    "github.com/nats-io/go-nats"
)

func main() {
    // create server connection and defer close
    natsConnection, _ := nats.Connect(nats.DefaultURL)
    defer natsConnection.Close()
    log.Println("connected to " + nats.DefaultURL)

    // msg structure
    msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World")}
    natsConnection.PublishMsg(msg)

    log.Println("published msg.Subject = " + msg.Subject, " | msg.Data = " + string(msg.Data))
}

  運行結果和上面的類似。

 3. 同步訂閱者客戶端

  同步客戶並沒有實現與用戶的消息處理程序。相反,接收訂閱的客戶端負責實現代碼來處理消息。此時客戶被阻塞無法接受更多的信息,直到客戶端處理完返回消息。

  舉例如下

sub, err := natsConnection.SubscribeSync("foo")
m, err := sub.NextMsg(timeout)

 4. 權限驗證

  我們在啟動NATS服務的時候想要設定一定的權限控制,這樣當客戶端連接的時候必須需要驗證才能訪問連接。目前具體的有兩種方式,一種是配置文件配置,另外一種就是在啟動時加上相應的權限內容。這里說一下命令行啟動時如何開啟權限驗證:

gnatsd -DV -m 8222 -user foo -pass bar

  其中 -DV 是 logging 的配置項,目的是為了更加詳細的觀察每一個客戶端的連接詳情,具體其他的可以參看我上一篇博客。這里只講用到的:

  -D, --debug                   Enable debugging output

  -V,  --trace                    Trace the raw protocol

  -DV                               Debug and trace

  -user          User required for connections

  -pass          Password required for connections

  啟動信息打印如下:

[4037] 2017/04/05 17:20:47.015397 [INF] Starting nats-server version 0.9.6
[4037] 2017/04/05 17:20:47.015478 [DBG] Go build version go1.8
[4037] 2017/04/05 17:20:47.015489 [INF] Starting http monitor on 0.0.0.0:8222
[4037] 2017/04/05 17:20:47.015589 [INF] Listening for client connections on 0.0.0.0:4222
[4037] 2017/04/05 17:20:47.015624 [DBG] Server id is S1wbxzcmrpcNKURHXNEfvc
[4037] 2017/04/05 17:20:47.015629 [INF] Server is ready

  如果我還直接用上面沒有改動的 async-sub.go 代碼直接運行,觀察 NATS 服務器日志信息打印如下:

[4037] 2017/04/05 17:23:41.289881 [DBG] ::1:52074 - cid:1 - Client connection created
[4037] 2017/04/05 17:23:41.290715 [TRC] ::1:52074 - cid:1 - ->> [CONNECT {"verbose":false,"pedantic":false,"tls_required":false,"name":"","lang":"go","version":"1.2.2","protocol":1}]
[4037] 2017/04/05 17:23:41.290805 [ERR] ::1:52074 - cid:1 - Authorization Error
[4037] 2017/04/05 17:23:41.290817 [TRC] ::1:52074 - cid:1 - <<- [-ERR Authorization Violation]
[4037] 2017/04/05 17:23:41.290839 [DBG] ::1:52074 - cid:1 - Client connection closed

  那么接下來我們就需要改動上面已經寫過的客戶端 訂閱者 和 發布者 的代碼

  1. async-sub.go 代碼修改,其中修改的部分用紅色字體標注

package main

import (
    "runtime"
    "log"
    "github.com/nats-io/go-nats"
)

func main() {
    // create server connection
    natsConnection, _ := nats.Connect("nats://foo:bar@localhost:4222")
    log.Println("connected to " + nats.DefaultURL)

    // subscribe to subject
    log.Printf("subscribing to subject 'foo' \n")
    natsConnection.Subscribe("foo", func(msg *nats.Msg) {
        //handle the message
        log.Printf("received message '%s\n", string(msg.Data) + "'")
    })

    // keep the connection alive
    runtime.Goexit()
}

  此時服務器后端日志信息打印如下內容:

[4037] 2017/04/05 17:31:13.681407 [TRC] ::1:52104 - cid:2 - ->> [PING]
[4037] 2017/04/05 17:31:13.681411 [TRC] ::1:52104 - cid:2 - <<- [PONG]
[4037] 2017/04/05 17:31:13.681608 [TRC] ::1:52104 - cid:2 - ->> [SUB foo  1]

  2. pub-simple.go 代碼修改,其中修改的部分用紅色字體標注

package main

import (
    "log"
    "github.com/nats-io/go-nats"
)

func main() {
    // create server connection and defer close
    natsConnectionString := "nats://foo:bar@localhost:4222"
    natsConnection, _ := nats.Connect(natsConnectionString)
    defer natsConnection.Close()
    log.Println("connected to " + nats.DefaultURL)

    // msg structure
    msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World")}
    natsConnection.PublishMsg(msg)

    log.Println("published msg.Subject = " + msg.Subject, " | msg.Data = " + string(msg.Data))
}

  此時服務器終端打印日志信息如下:

[4037] 2017/04/05 17:33:13.687547 [DBG] ::1:52104 - cid:2 - Client Ping Timer
[4037] 2017/04/05 17:33:13.687584 [TRC] ::1:52104 - cid:2 - <<- [PING]
[4037] 2017/04/05 17:33:13.687651 [TRC] ::1:52104 - cid:2 - ->> [PING]
[4037] 2017/04/05 17:33:13.687660 [TRC] ::1:52104 - cid:2 - <<- [PONG]
[4037] 2017/04/05 17:33:13.687854 [TRC] ::1:52104 - cid:2 - ->> [PONG]
[4037] 2017/04/05 17:35:13.691474 [DBG] ::1:52104 - cid:2 - Client Ping Timer
[4037] 2017/04/05 17:35:13.691504 [TRC] ::1:52104 - cid:2 - <<- [PING]
[4037] 2017/04/05 17:35:13.691589 [TRC] ::1:52104 - cid:2 - ->> [PING]
[4037] 2017/04/05 17:35:13.691599 [TRC] ::1:52104 - cid:2 - <<- [PONG]
[4037] 2017/04/05 17:35:13.691658 [TRC] ::1:52104 - cid:2 - ->> [PONG]
[4037] 2017/04/05 17:35:28.933556 [DBG] ::1:52117 - cid:3 - Client connection created
[4037] 2017/04/05 17:35:28.933944 [TRC] ::1:52117 - cid:3 - ->> [CONNECT {"verbose":false,"pedantic":false,"user":"foo","pass":"bar","tls_required":false,"name":"","lang":"go","version":"1.2.2","protocol":1}]
[4037] 2017/04/05 17:35:28.933986 [TRC] ::1:52117 - cid:3 - ->> [PING]
[4037] 2017/04/05 17:35:28.933992 [TRC] ::1:52117 - cid:3 - <<- [PONG]
[4037] 2017/04/05 17:35:28.934227 [TRC] ::1:52117 - cid:3 - ->> [PUB foo bar 11]
[4037] 2017/04/05 17:35:28.934240 [TRC] ::1:52117 - cid:3 - ->> MSG_PAYLOAD: [Hello World]
[4037] 2017/04/05 17:35:28.934270 [TRC] ::1:52104 - cid:2 - <<- [MSG foo 1 bar 11]
[4037] 2017/04/05 17:35:28.935859 [DBG] ::1:52117 - cid:3 - Client connection closed

  從以上結果可以看出,前10行日志是訂閱者自發的ping-pong操作,檢測服務是否可以ping通,說白了就是心跳檢測。后面開始有發布者開始創建連接,創建成功后會打印發布的消息日志,最后關閉響應的連接。

 

  以上就是使用 Go 語言模擬客戶端 發布者/訂閱者的實現


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM