前言
最近我再網上尋找使用golang實現的mq,因為我知道golang一般實現的應用部署起來很方便,所以我就找到了一個叫做nsq的mq,其實它並不能完全稱為隊列,但是它的輕量和性能的高效,讓我真的大開眼界。
如果你有興趣,我覺得也可以了解一下:
網上有人翻譯了國外的一篇文章:
我們是如何使用NSQ處理7500億消息的
安裝和部署
官網提供
如果你有能力的話直接閱讀官方的說明進行操作就可以了
https://nsq.io/overview/quick_start.html
如果看不懂我還找到了中文翻譯過的:
http://wiki.jikexueyuan.com/project/nsq-guide/
簡單部署
下面是我使用的最快部署測試方式,使用服務器環境centos7.4,防火牆開放端口4160,4161,4151
4171
1、在下載頁面下載對應版本(可能有的時候需要科學上網)
https://nsq.io/deployment/installing.html
這里使用linux版本
nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
2、將包上傳至服務器后解壓;
tar xvf nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
3、進入bin目錄 cd nsq-1.1.0.linux-amd64.go1.10.3/bin
4、后台啟動三個服務
nohup ./nsqlookupd > /dev/null 2>&1 &
nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &
簡單使用
1、使用
curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'
會創建一個test主題,並發送一個hello world消息
2、外部通過:http://127.0.0.1:4171/
進行訪問可以看到NSQ的管理界面,非常的簡潔
其中127.0.0.1為服務器IP

3、使用
./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
消費test中剛才的消息,並輸出到服務器/tmp目錄中
特性
官方給出的文檔給出了很多特性的說明,針對於一個MQ來說,我認為下面幾個特性是你必須知道的:
-
默認一開始消息不是持久化的
nsq采用的方式時內存+硬盤的模式,當內存到達一定程度時就會將數據持久化到硬盤
1、如果將 --mem-queue-size 設置為 0,所有的消息將會存儲到磁盤。
2、但是即使服務器重啟也會將當時在內存中的消息持久化 -
消息是沒有順序的
這一點很關鍵,由於nsq使用內存+磁盤的模式,而且還有requeue的操作,所以發送消息的順序和接收的順序可能不一樣 -
官方不推薦使用客戶端發消息
官方提供相應的客戶端發送消息,但是HTTP可能更方便一些 -
沒有復制
nsq節點相對獨立,節點與節點之間沒有復制或者集群的關系。 -
沒有鑒權相關模塊
當前release版本的nsq沒有鑒權模塊,只有版本v0.2.29+高於這個的才有 -
幾個小點
topic名稱有長度限制,命名建議用下划線連接;
消息體大小有限制;
優缺點
優點:
1、部署極其方便,沒有任何環境依賴,直接啟動就行
2、輕量沒有過多的配置參數,只需要簡單的配置就可以直接使用
3、性能高
4、消息不存在丟失的情況
缺點:
1、消息無順序
2、節點之間沒有消息復制
3、沒有鑒權
多節點部署
基本概念
nsqd:基本的節點
nsqlookupd:匯總節點信息,提供查詢和管理topic等服務
nsqadmin:管理端展示UI界面,能有一個web頁面去查看和操作
結構

最簡單的多節點部署可以是這樣的一個結構
部署步驟和命令
PS:后台啟動使用nohup即可,下面只是為了說明啟動方式和命令參數
第一步需要啟動nsqlookupd
./nsqlookupd
默認占用4161和4160兩個端口
使用-http-address和-tcp-address可以修改
第二步啟動兩個nsqd
./nsqd -lookupd-tcp-address=192.168.1.102:4160 -broadcast-address=192.168.1.103 -data-path="/temp/nsq"
其中
-lookupd-tcp-address為上面nsqlookupd的IP和tcp的端口4160
-broadcast-address我填寫的是自己的IP,這個IP官網上寫的是會注冊到nsqlookupd
-data-path為消息持久化的位置
第三步啟動nsqadmin
./nsqadmin -lookupd-http-address=192.168.4.102:4161
同樣需要指定-lookupd-http-address但是這次是http的端口也就是4161因為admin通過http請求來查詢相關信息
后續擴展
上面只是最簡單的兩個節點的部署,如果后續想擴展就會如下

其中nginx是可以不需要的,你可以果斷選擇同時向多個節點發送消息,或者當消息沒有處理的時候重新進行發送,因為這樣也是nsq設計之初的考慮。你也可以根據自己的需要設計你自己的架構。
客戶端
官方提供了很多語言接入的客戶端 https://nsq.io/clients/client_libraries.html
針對消息生產者的客戶端,官方還推薦直接使用post請求發送消息,如:
curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'
表示向test主題發送hello world這個消息
下面介紹兩種客戶端,一種是golang的客戶端,一種是java的客戶端
Golang的客戶端
其中192.168.4.102:4150為發送消息的地址,消費者里面寫的也是相同的地址就可以了。
生產者:
package main import ( "github.com/nsqio/go-nsq" "time" ) func main() { for i := 0 ; i < 10; i++ { sendMessage() } time.Sleep(time.Second * 10) } func sendMessage() { url := "192.168.4.102:4150" producer, err := nsq.NewProducer(url, nsq.NewConfig()) if err != nil { panic(err) } err = producer.Publish("test", []byte("hello world")) if err != nil { panic(err) } producer.Stop() }
消費者:
package main import ( "fmt" "github.com/nsqio/go-nsq" "sync" ) func main() { testNSQ() } type NSQHandler struct { } func (this *NSQHandler) HandleMessage(msg *nsq.Message) error { fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body)) return nil } func testNSQ() { url := "192.168.4.102:4150" waiter := sync.WaitGroup{} waiter.Add(1) go func() { defer waiter.Done() config:=nsq.NewConfig() config.MaxInFlight=9 for i := 0; i<10; i++ { consumer, err := nsq.NewConsumer("test", "struggle", config) if nil != err { fmt.Println("err", err) return } consumer.AddHandler(&NSQHandler{}) err = consumer.ConnectToNSQD(url) if nil != err { fmt.Println("err", err) return } } select{} }() waiter.Wait() }
Java的客戶端
說實話java的客戶端確實用的人比較少,因為我看到實際在github上面的星星和關注就比較少,所以客戶端多多少少都存在一些問題。nsq-j和JavaNSQClient是官方排的考前的客戶端。
這里說一下nsq-j
https://github.com/sproutsocial/nsq-j
生產者
Publisher publisher = new Publisher("192.168.4.102:4150"); System.out.print(publisher); byte[] data = "Hello nsq".getBytes(); publisher.publish("example_topic", data); publisher.publish("example_topic", data); // 注意這里需要這樣關閉,不然的話就阻塞住了 publisher.getClient().stop();
消費者
public class PubExample { public static void handleData(byte[] data) { System.out.println("Received:" + new String(data)); } public static void main(String[] args) { Subscriber subscriber = new Subscriber("192.168.4.102:4161"); subscriber.subscribe("test", "struggle", PubExample::handleData); } }
需要注意的是其中192.168.4.102:4161這個是nsqlookupd的http地址和端口和生產者是不一樣的
java客戶端是根據nsqlookupd來找到對應消費端口
所以啟動nsqlookupd的時候需要注意,啟動nsqd需要加上參數--broadcast-address
如:./nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=192.168.4.102
這樣java消費者才能找到對應的地址否則會出現
ERROR com.sproutsocial.nsq.Subscription - error connecting to:localhost.localdomain:4150
java.net.UnknownHostException: localhost.localdomain
這樣類似的錯誤
我建議的客戶端
官方也說了,發送消息其實不建議使用客戶端,而建議使用http請求,所以我自己是使用okhttp進行消息的發送,案例如下:
OkHttpClient client = new OkHttpClient(); MediaType mediaType = MediaType.parse("application/json"); RequestBody body = RequestBody.create(mediaType, "{"code": 1}"); Request request = new Request.Builder() .url("http://192.168.4.102:4151/pub?topic=test") .post(body) .addHeader("Content-Type", "application/json") .build(); Response response = client.newCall(request).execute(); System.out.println(response);
當然這里沒有對client進行配置,這就涉及okhttp了,這里不再贅述
至於消費端還是使用nsq-j的
總結
使用下來我們可以看到,nsq為了提供性能在一些方面是做出了妥協的,我們可以總結出下面幾個方面供大家參考:
1、暫時nsq的鑒權功能在高版本才支持,但是高版本沒有release所以建議nsq在內網環境下使用,或者在一些安全的端口使用,避免被攻擊
2、部署節點在3個以上,nsq已經對於消息丟失做了很多的考慮,基本上不會出現丟失的情況,在你考慮冪等性的情況下,同時部署多個節點有利於消息進行處理
3、如果對消息順序有要求的情況下,nsq是不能使用的,因為nsq不能保證消息的順序
4、節點之間沒有消息復制,所以即使多個節點部署,萬一節點出現問題,還是有一段時間會出現消息無法接收到的情況,所以向多個節點同時發送消息也是一種解決方式
5、因為nsq拋棄了一些東西,那么所帶來的自然是方便,整體使用下來主要感受就是輕量,部署和配置都很方便,而且對於節點的監控能有界面
希望后續nsq能在幾個版本更新之后能給我們帶來更加牛逼的表現。