高性能消息隊列NSQ


前言

最近我再網上尋找使用golang實現的mq,因為我知道golang一般實現的應用部署起來很方便,所以我就找到了一個叫做nsq的mq,其實它並不能完全稱為隊列,但是它的輕量和性能的高效,讓我真的大開眼界。

如果你有興趣,我覺得也可以了解一下:
網上有人翻譯了國外的一篇文章:
我們是如何使用NSQ處理7500億消息的

 

安裝和部署

官網提供

如果你有能力的話直接閱讀官方的說明進行操作就可以了
https://nsq.io/overview/quick_start.html

如果看不懂我還找到了中文翻譯過的:

http://wiki.jikexueyuan.com/project/nsq-guide/

簡單部署

下面是我使用的最快部署測試方式,使用服務器環境centos7.4,防火牆開放端口4160,4161,4151
4171
1、在下載頁面下載對應版本(可能有的時候需要科學上網)
https://nsq.io/deployment/installing.html
這里使用linux版本
nsq-1.1.0.linux-amd64.go1.10.3.tar.gz

2、將包上傳至服務器后解壓;
tar xvf nsq-1.1.0.linux-amd64.go1.10.3.tar.gz

3、進入bin目錄 cd nsq-1.1.0.linux-amd64.go1.10.3/bin

4、后台啟動三個服務
nohup ./nsqlookupd > /dev/null 2>&1 &
nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &

 

簡單使用

1、使用
curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'
會創建一個test主題,並發送一個hello world消息

2、外部通過:http://127.0.0.1:4171/
進行訪問可以看到NSQ的管理界面,非常的簡潔
其中127.0.0.1為服務器IP

3、使用
./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
消費test中剛才的消息,並輸出到服務器/tmp目錄中

 

特性

官方給出的文檔給出了很多特性的說明,針對於一個MQ來說,我認為下面幾個特性是你必須知道的:

  • 默認一開始消息不是持久化的
    nsq采用的方式時內存+硬盤的模式,當內存到達一定程度時就會將數據持久化到硬盤
    1、如果將 --mem-queue-size 設置為 0,所有的消息將會存儲到磁盤。
    2、但是即使服務器重啟也會將當時在內存中的消息持久化

  • 消息是沒有順序的
    這一點很關鍵,由於nsq使用內存+磁盤的模式,而且還有requeue的操作,所以發送消息的順序和接收的順序可能不一樣

  • 官方不推薦使用客戶端發消息
    官方提供相應的客戶端發送消息,但是HTTP可能更方便一些

  • 沒有復制
    nsq節點相對獨立,節點與節點之間沒有復制或者集群的關系。

  • 沒有鑒權相關模塊
    當前release版本的nsq沒有鑒權模塊,只有版本v0.2.29+高於這個的才有

  • 幾個小點
    topic名稱有長度限制,命名建議用下划線連接;
    消息體大小有限制;

優缺點

優點:
1、部署極其方便,沒有任何環境依賴,直接啟動就行
2、輕量沒有過多的配置參數,只需要簡單的配置就可以直接使用
3、性能高
4、消息不存在丟失的情況

缺點:
1、消息無順序
2、節點之間沒有消息復制
3、沒有鑒權

 

多節點部署

基本概念

nsqd:基本的節點
nsqlookupd:匯總節點信息,提供查詢和管理topic等服務
nsqadmin:管理端展示UI界面,能有一個web頁面去查看和操作

結構


最簡單的多節點部署可以是這樣的一個結構

部署步驟和命令

PS:后台啟動使用nohup即可,下面只是為了說明啟動方式和命令參數

第一步需要啟動nsqlookupd
./nsqlookupd
默認占用4161和4160兩個端口
使用-http-address和-tcp-address可以修改

第二步啟動兩個nsqd
./nsqd -lookupd-tcp-address=192.168.1.102:4160 -broadcast-address=192.168.1.103 -data-path="/temp/nsq"
其中
-lookupd-tcp-address為上面nsqlookupd的IP和tcp的端口4160
-broadcast-address我填寫的是自己的IP,這個IP官網上寫的是會注冊到nsqlookupd
-data-path為消息持久化的位置

第三步啟動nsqadmin
./nsqadmin -lookupd-http-address=192.168.4.102:4161
同樣需要指定-lookupd-http-address但是這次是http的端口也就是4161因為admin通過http請求來查詢相關信息

后續擴展

上面只是最簡單的兩個節點的部署,如果后續想擴展就會如下

其中nginx是可以不需要的,你可以果斷選擇同時向多個節點發送消息,或者當消息沒有處理的時候重新進行發送,因為這樣也是nsq設計之初的考慮。你也可以根據自己的需要設計你自己的架構。

 

客戶端

官方提供了很多語言接入的客戶端 https://nsq.io/clients/client_libraries.html
針對消息生產者的客戶端,官方還推薦直接使用post請求發送消息,如:
curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'
表示向test主題發送hello world這個消息

下面介紹兩種客戶端,一種是golang的客戶端,一種是java的客戶端

Golang的客戶端

其中192.168.4.102:4150為發送消息的地址,消費者里面寫的也是相同的地址就可以了。

生產者:

package main

import (
    "github.com/nsqio/go-nsq"
    "time"
)

func main() {
    for i := 0 ; i < 10; i++  {
        sendMessage()
    }
    time.Sleep(time.Second * 10)
}

func sendMessage() {
    url := "192.168.4.102:4150"
    producer, err := nsq.NewProducer(url, nsq.NewConfig())
    if err != nil {
        panic(err)
    }
    err = producer.Publish("test", []byte("hello world"))
    if err != nil {
        panic(err)
    }
    producer.Stop()
}

 

 

消費者:

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "sync"
)

func main() {
    testNSQ()
}

type NSQHandler struct {
}

func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
    fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
    return nil
}

func testNSQ() {
    url := "192.168.4.102:4150"
    
    waiter := sync.WaitGroup{}
    waiter.Add(1)

    go func() {
        defer waiter.Done()
        config:=nsq.NewConfig()
        config.MaxInFlight=9

        for i := 0; i<10; i++ {
            consumer, err := nsq.NewConsumer("test", "struggle", config)
            if nil != err {
                fmt.Println("err", err)
                return
            }

            consumer.AddHandler(&NSQHandler{})
            err = consumer.ConnectToNSQD(url)
            if nil != err {
                fmt.Println("err", err)
                return
            }
        }
        select{}
    }()

    waiter.Wait()
}

 

 

 

Java的客戶端

說實話java的客戶端確實用的人比較少,因為我看到實際在github上面的星星和關注就比較少,所以客戶端多多少少都存在一些問題。nsq-j和JavaNSQClient是官方排的考前的客戶端。
這里說一下nsq-j
https://github.com/sproutsocial/nsq-j

生產者

Publisher publisher = new Publisher("192.168.4.102:4150");
System.out.print(publisher);

byte[] data = "Hello nsq".getBytes();
publisher.publish("example_topic", data);
publisher.publish("example_topic", data);

// 注意這里需要這樣關閉,不然的話就阻塞住了
publisher.getClient().stop();

 

 

消費者

public class PubExample {

    public static void handleData(byte[] data) {
        System.out.println("Received:" + new String(data));
    }

    public static void main(String[] args) {
        Subscriber subscriber = new Subscriber("192.168.4.102:4161");
        subscriber.subscribe("test", "struggle", PubExample::handleData);
    }
}

 

 

需要注意的是其中192.168.4.102:4161這個是nsqlookupd的http地址和端口和生產者是不一樣的

java客戶端是根據nsqlookupd來找到對應消費端口

所以啟動nsqlookupd的時候需要注意,啟動nsqd需要加上參數--broadcast-address
如:./nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=192.168.4.102

這樣java消費者才能找到對應的地址否則會出現
ERROR com.sproutsocial.nsq.Subscription - error connecting to:localhost.localdomain:4150
java.net.UnknownHostException: localhost.localdomain
這樣類似的錯誤

 

我建議的客戶端

官方也說了,發送消息其實不建議使用客戶端,而建議使用http請求,所以我自己是使用okhttp進行消息的發送,案例如下:

OkHttpClient client = new OkHttpClient();

MediaType mediaType = MediaType.parse("application/json");
RequestBody body = RequestBody.create(mediaType, "{"code": 1}");

Request request = new Request.Builder()
.url("http://192.168.4.102:4151/pub?topic=test")
.post(body)
.addHeader("Content-Type", "application/json")
.build();

Response response = client.newCall(request).execute();
System.out.println(response);

當然這里沒有對client進行配置,這就涉及okhttp了,這里不再贅述

至於消費端還是使用nsq-j的

 

總結

使用下來我們可以看到,nsq為了提供性能在一些方面是做出了妥協的,我們可以總結出下面幾個方面供大家參考:
1、暫時nsq的鑒權功能在高版本才支持,但是高版本沒有release所以建議nsq在內網環境下使用,或者在一些安全的端口使用,避免被攻擊
2、部署節點在3個以上,nsq已經對於消息丟失做了很多的考慮,基本上不會出現丟失的情況,在你考慮冪等性的情況下,同時部署多個節點有利於消息進行處理
3、如果對消息順序有要求的情況下,nsq是不能使用的,因為nsq不能保證消息的順序
4、節點之間沒有消息復制,所以即使多個節點部署,萬一節點出現問題,還是有一段時間會出現消息無法接收到的情況,所以向多個節點同時發送消息也是一種解決方式
5、因為nsq拋棄了一些東西,那么所帶來的自然是方便,整體使用下來主要感受就是輕量,部署和配置都很方便,而且對於節點的監控能有界面

希望后續nsq能在幾個版本更新之后能給我們帶來更加牛逼的表現。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM