NATS Streaming
NATS Streaming是一個以NATS為驅動的數據流系統且它的源碼也是由Golang語言編寫的。其中NATS Streaming服務是一個可執行的文件名為:nats-streaming-server。NATS Streaming與底層NATS服務平台無縫嵌入、擴展和互動。NATS Streaming服務作為開源軟件在MIT許可下載,Apcera也積極的在維護和支持NATS Streaming 服務。
以下為NATS Streaming與NATS服務整體關系結構圖:
功能
除了NATS平台核心的特性外,NATS Streaming提供了以下一些功能特性
增強消息協議
NATS Streaming使用谷歌協議緩沖區實現自己的增強型消息格式。這些消息通過二進制數據流在NATS核心平台進行傳播,因此不需要改變NATS的基本協議。NATS Streaming信息包含以下字段:
序列 - 一個全局順序序列號為主題的通道
主題 - 是NATS Streaming 交付對象
答復內容 - 對應"reply-to"對應的對象內容
數據 - 真是數據內容
時間戳 - 接收的時間戳,單位是納秒
重復發送 - 標志這條數據是否需要服務再次發送
CRC32 - 一個循環冗余數據校驗選項,在數據存儲和數據通訊領域里,為了保證數據的正確性所采用的檢錯手段,這里使用的是 IEEE CRC32 算法
消息/事件的持久性
NATS Streaming提供了可配置的消息持久化,持久目的地可以為內存或者文件。另外,對應的存儲子系統使用了一個公共接口允許我們開發自己自定義實現來持久化對應的消息
至少一次的發送
NATS Streaming提供了發布者和服務器之間的消息確認(發布操作) 和訂閱者和服務器之間的消息確認(確認消息發送)。其中消息被保存在服務器端內存或者輔助存儲(或其他外部存儲器)用來為需要重新接受消息的訂閱者進行重發消息。
發布者發送速率限定
NATS Streaming提供了一個連接選項叫 MaxPubAcksInFlight,它能有效的限制一個發布者可能隨意的在任何時候發送的未被確認的消息。當達到這個配置的最大數量時,異步發送調用接口將會被阻塞,直到未確認消息降到指定數量之下。
每個訂閱者的速率匹配/限制
NATS Streaming運行指定的訂閱中設置一個參數為 MaxInFlight,它用來指定已確認但未消費的最大數據量,當達到這個限制時,NATS Streaming 將暫停發送消息給訂閱者,直到未確認的數據量小於設定的量為止
以主題重發的歷史數據
新訂閱的可以在已經存儲起來的訂閱的主題頻道指定起始位置消息流。通過使用這個選項,消息就可以開始發送傳遞了:
1. 訂閱的主題存儲的最早的信息
2. 與當前訂閱主題之前的最近存儲的數據,這通常被認為是 "最后的值" 或 "初值" 對應的緩存
3. 一個以納秒為基准的 日期/時間
4. 一個歷史的起始位置相對當前服務的 日期/時間,例如:最后30秒
5. 一個特定的消息序列號
持久訂閱
訂閱也可以指定一個“持久化的名稱”可以在客戶端重啟時不受影響。持久訂閱會使得對應服務跟蹤客戶端最后確認消息的序列號和持久名稱。當這個客戶端重啟或者重新訂閱的時候,使用相同的客戶端ID 和 持久化的名稱,對應的服務將會從最早的未被確認的消息處恢復
安裝 NATS Streaming 服務
提供了多種安裝方式
1. 從github上下載對應版本的二進制安裝包進行安裝
2. 直接使用 go 命令進行一鍵式安裝
3. 如果是Docker環境下可以通過Docker hub上進行安裝
4. windows 和 macOS上也有對應的安裝方式,具體的請查看官網 install
我這里使用的是第二種方式即直接通過go命令進行安裝
$ go get github.com/nats-io/nats-streaming-server
啟動NATS Streaming服務
1. 直接啟動:
$ nats-streaming-server
如果看到以下內容說明啟動成功了:
[12260] 2017/04/07 17:15:13.735047 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.4.0 [12260] 2017/04/07 17:15:13.735131 [INF] STREAM: ServerID: P717ypV9bkgOlj0z4wNVl5 [12260] 2017/04/07 17:15:13.737291 [INF] Starting nats-server version 0.9.6 [12260] 2017/04/07 17:15:13.737307 [INF] Listening for client connections on 0.0.0.0:4222 [12260] 2017/04/07 17:15:13.738248 [INF] Server is ready [12260] 2017/04/07 17:15:14.024368 [INF] STREAM: Message store is MEMORY [12260] 2017/04/07 17:15:14.024401 [INF] STREAM: --------- Store Limits --------- [12260] 2017/04/07 17:15:14.024414 [INF] STREAM: Channels: 100 * [12260] 2017/04/07 17:15:14.024417 [INF] STREAM: -------- channels limits ------- [12260] 2017/04/07 17:15:14.024421 [INF] STREAM: Subscriptions: 1000 * [12260] 2017/04/07 17:15:14.024425 [INF] STREAM: Messages : 1000000 * [12260] 2017/04/07 17:15:14.024442 [INF] STREAM: Bytes : 976.56 MB * [12260] 2017/04/07 17:15:14.024446 [INF] STREAM: Age : unlimited * [12260] 2017/04/07 17:15:14.024449 [INF] STREAM: --------------------------------
2. 啟動NATS Streaming服務並開啟NATS監控
$ nats-streaming-server -m 8222
注意:
NATS Streaming 主題名稱是不支持通配符的,在NATS中主題名是可以使用 * 和 > 的,這里不支持。
其他的大部分和NATS一致,因為它就是基於NATS實現的
NATS Streaming使用
1. 首先進入已經下載好的 nats-streaming-server 對應的路徑,並啟動
$ cd $GOPATH/src/github.com/nats-io/nats-streaming-server $ go run nats-streaming-server.go
2. 運行發布者(publisher)客戶端並且發送消息到指定的主題上
$ cd $GOPATH/src/github.com/nats-io/go-nats-streaming/examples $ go run stan-pub.go foo "msg one" Published [foo] : 'msg one' $ go run stan-pub.go foo "msg two" Published [foo] : 'msg two' $ go run stan-pub.go foo "msg three" Published [foo] : 'msg three'
3. 運行訂閱者(subscriber)客戶端,使用 -all 參數來接受由發布者發布對應主題上的所有消息
$ go run stan-sub.go --all -c test-cluster -id myID foo Connected to nats://localhost:4222 clusterID: [test-cluster] clientID: [myID] subscribing with DeliverAllAvailable Listening on [foo], clientID=[myID], qgroup=[] durable=[] [#1] Received on [foo]: 'sequence:1 subject:"foo" data:"msg one" timestamp:1465962202884478817 ' [#2] Received on [foo]: 'sequence:2 subject:"foo" data:"msg two" timestamp:1465962208545003897 ' [#3] Received on [foo]: 'sequence:3 subject:"foo" data:"msg three" timestamp:1465962215567601196
以上可以看到使用參數 -all 可以獲取到所有發布的數據,其實還有一些其他的參數供我們使用,具體如下所示:
--seq <seqno> 從指定的序列號開始 --all 接受所有發送的數據 --last 從上一次最后一次發送的那條數據開始 --since <duration> 從當前往前的時間段開始接收(例如:1s, 1hr, 具體可以參看:https://golang.org/pkg/time/#ParseDuration) --durable <name> 永久訂閱者的名稱 --unsubscribe 退出時解除永久訂閱
如果想要使用 GoLang 語言自己代碼開發的話,可以參看我的下一篇博客:NATS_13:NATS Streaming案例講解