消息中間件NSQ入門


參考:

https://www.cnblogs.com/smartrui/p/12549562.html

https://blog.csdn.net/lldouble/article/details/82023125

https://blog.csdn.net/riba2534/article/details/106977687/

 

 

 

 

 

帶你入門Go的消息隊列NSQ

以前看到過NSQ這個東西,也一直沒去看。今天剛好有時間就搭建了下,簡單嘗試了下這個Go語言下的消息隊列NSQ,我這里簡要記錄下。

其實,NSQ國內用的是比較少的,我這里也是算了解這么個東西吧 ,稍微看下源碼,學到東西而已。

NSQ簡介

NSQ是一個基於Go語言的分布式實時消息平台, 它具有分布式、去中心化的拓撲結構,支持無限水平擴展。無單點故障、故障容錯、高可用性以及能夠保證消息的可靠傳遞的特征。另外,NSQ非常容易配置和部署, 且支持眾多的消息協議。支持多種客戶端,協議簡單,如果有興趣可參照協議自已實現一個也可以。

NSQ是一個基於Go語言的開源的分布式實時消息平台,他的代碼托管在GitHub上。

NSQ可用於大規模系統的實時消息服務,它的設計目標是為在分布式環境下提供一個強大的去除中心化的分布式服務架構,可以每天處理數以億計的實時消息。NSQ的優點是無單點故障、故障容錯、高可用性和信息傳遞的高可靠性。NSQ安裝部署簡單,容易水平擴展,目前已有很多公司都是采用其作為自身企業內部的實時消息服務。而且它的靈活性很強,支持很多種協議。官方直接提供了拆箱可用的Go庫和Python庫。好,其他的就不廢話了,隨便搜索下NSQ相關的博文或者去NSQ的官方網站了解更詳細的信息吧。

 

NSQ的幾個組件

  1. nsqd:一個負責接收、排隊、轉發消息到客戶端的守護進程
  2. nsqlookupd:管理拓撲信息, 用於收集nsqd上報的topic和channel,並提供最終一致性的發現服務的守護進程
  3. nsqadmin:一套Web用戶界面,可實時查看集群的統計數據和執行相應的管理任務
  4. utilities:基礎功能、數據流處理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq

相關網址

NSQ官網:NSQ官網

GitHub: Github

NSQ官方客戶端:NSQ官方客戶端

NSQ文檔:NSQ文檔

NSQ協議:NSQ協議

NSQ安裝

NSQ的安裝方式有好幾種,可以通過二進制、源碼、Docker、Brew等方式安裝

二進制安裝,可以到安裝地址 上面下載對應平台的Release包,然后解壓就行了。

如果是Mac電腦,直接用Brew安裝

brew install nsq

如果是Docker的安裝,就參照下上面那個網址吧,按照步驟操作就行了,我沒有用的這個安裝方式。

我是用的源碼的安裝方式,因為二進制的那個放在S3上面,下起來好慢,於是直接把Github的源代碼下載來,這里也有一個好處就是可以看源碼來跟蹤學習。還方便些。

下載后的目錄結構如下所示:

NSQ 運行

如果用源碼運行,而不是Make后將可執行文件放到bin目錄這種,那么下載后解決完所有的依賴包后,cd 進入到 nsqio/nsq/apps/nsqd目錄后,可以執行 go run ./ 或 go run main.go options.go 否則會報如下錯誤

nsqio/nsq/apps/nsqd/main.go:44:13: undefined: nsqdFlagSet nsqio/nsq/apps/nsqd/main.go:54:10: undefined: config 

其實進入到apps目錄執行,最終還是會到 nsqio/nsq/nsqd這個下面去執行業務處理代碼的,apps這里僅僅是用go-srv這個包進行了一層服務包裝而已,變成守護和一些入口參數等。

$ go run ./ [nsqd] 2020/03/22 00:55:27.597911 INFO: nsqd v1.2.1-alpha (built w/go1.11.2) [nsqd] 2020/03/22 00:55:27.597980 INFO: ID: 809 [nsqd] 2020/03/22 00:55:27.598396 INFO: TOPIC(test): created [nsqd] 2020/03/22 00:55:27.598449 INFO: TOPIC(test): new channel(test) [nsqd] 2020/03/22 00:55:27.598535 INFO: TOPIC(test): new channel(lc) [nsqd] 2020/03/22 00:55:27.598545 INFO: NSQ: persisting topic/channel metadata to nsqd.dat [nsqd] 2020/03/22 00:55:27.599714 INFO: TCP: listening on [::]:4150 [nsqd] 2020/03/22 00:55:27.599806 INFO: HTTP: listening on [::]:4151 

看到上面的提示,表示啟動成功了,它會分別開放TCP和HTTP的端口,4150,4151可以通過配置或flag參數的方式更改, 同時它也支持TLS/SSL.

HTTP測試

啟動nsqd后,可以用http來測試發送一條消息,可使用CURL來操作。

$ curl -d '這是一條測試消息' 'http://127.0.0.1:4151/pub?topic=test&channel=lc' OK 

NSQ消息模式

我們知道消息一般有推和拉模式,NSQ的消息模式為推的方式,這種模式可以保證消息的及時性,當有消息時可以及時推送出去。但是要根椐客戶端的消耗能力和節奏去控制,NSQ是通過更改RDY的值來實現的。當沒有消息時為0, 服務端推送消息后,客戶端比如調用 updateRDY()這個方法改成3, 那么服務端推送時,就會根椐這個值做流控了。

發送消息是通過連接的TCP發出去的,先發到Topic下面,再轉到Channel下面,最后從通道 memoryMsgChan 中取出msg,然后發出。

github.com/nsqio/nsq/nsqd/protocol_v2.go func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { var err error var memoryMsgChan chan *Message var backendMsgChan <-chan []byte var subChannel *Channel // NOTE: `flusherChan` is used to bound message latency for // the pathological case of a channel on a low volume topic // with >1 clients having >1 RDY counts var flusherChan <-chan time.Time var sampleRate int32 subEventChan := client.SubEventChan identifyEventChan := client.IdentifyEventChan outputBufferTicker := time.NewTicker(client.OutputBufferTimeout) heartbeatTicker := time.NewTicker(client.HeartbeatInterval) heartbeatChan := heartbeatTicker.C msgTimeout := client.MsgTimeout ... ... case msg := <-memoryMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err != nil { goto exit } flushed = false case <-client.ExitChan: goto exit } 

NSQ還支持延時消息的發送,比如訂單在30分鍾未支付做無效處理等場景,延時使用的是heap包的優級先隊列,實現了里面的一些方法。通過判斷當前時間和延時時間做對比,然后從延時隊列里面彈出消息再發送到channel中,后續流程和普通消息一樣,我看網上有 人碰到過說延時消息會有並發問題,最后還用的Redis的ZSET實現的,所以不確定這個延時的靠不靠譜,要求不高的倒是可以試試。

curl -d '這是一條延遲消息' 'http://127.0.0.1:4151/pub?topic=test&channel=lc&defer=3000'

defer參數,單位:毫秒

NSQ消費

消費消息時,channel類似於kafka里面的消費組的概念,比如同一個channel。那么只會被一個實例消費,不會多個實例都能消費到那條消息,所以可用於消息的負載均衡, 我看到網上有人有疑惑就是他指定topic,然后再用不同的channel去消費,說怎么能收到其它channel的消息,不能直接過濾消息,其實channel不是用來過濾的。

NSQ發送的消息可以確保至少被一個消費者消費,它的消費級別為至少消費一次,為了確保消息消費,如果客戶端超時、重新放入隊列或重連等,重復消費是不可避免的,所以客戶端業務流程一定要做消息的冪等處理。

客戶端回復FIN 或者 REQ 表示成功或者重發。如果客戶端未能及時發送,則NSQ將重復發送消息給該客戶端。

另外,NSQ不像 Kafka,我們是能到消息的有序的,但NSQ不行,客戶端收到的消費為無序的。雖然每條消息有一個時間戳,但如果對順序有要求的,那就要注意了。所以,NSQ更適合處理數據量大但是彼此間沒有順序關系的消息

NSQ的Go客戶端

NSQ是支持多種形式的客戶端的,像HTTP或客戶端庫來操作,而且官方其實還建議使用HTTP的方式,HTTP的方式,直接發GET或POST請求就行了。

這里Go的話,可使用go-nsq這個庫,地址為:go-nsq :go-nsq

go get https://github.com/nsqio/go-nsq 

發送消息

package main import ( "errors" "fmt" "github.com/nsqio/go-nsq" "time" ) func main() { var ( p1 *producer p2 *producer ) p1 = &producer{} p2 = &producer{} p1.producer,_ = InitProducer("127.0.0.1:4150") p2.producer,_ = InitProducer("127.0.0.1:4150") defer p1.producer.Stop() defer p2.producer.Stop() //p1.publish("test","hello!!!") p1.deferredPublish("test", 10 * time.Second,"這是一條延遲消息?") fmt.Println("done") } type producer struct { producer *nsq.Producer } func(p *producer) publish(topic string,message string) (err error){ if message == "" { return errors.New("message is empty") } if err = p.producer.Publish(topic,[]byte(message)); err != nil { fmt.Println(err) return err } return nil } // 延遲消息 func(p *producer) deferredPublish(topic string,delay time.Duration, message string) (err error){ if message == "" { return errors.New("message is empty") } if err = p.producer.DeferredPublish(topic,delay, []byte(message)); err != nil { fmt.Println(err) return err } return nil } func InitProducer(addr string) (p *nsq.Producer,err error){ var ( config *nsq.Config ) config = nsq.NewConfig() if p, err = nsq.NewProducer(addr, config); err != nil { return nil, err } return p, nil } 

消費消息

package main import ( "encoding/json" "fmt" "github.com/nsqio/go-nsq" ) //nsqio消費測試 type MyTestHandler struct { q *nsq.Consumer messageReceive int } func (h *MyTestHandler) HandleMessage(message *nsq.Message) error { type Data struct { } var ( data *Data err error ) data = &Data{} if err = json.Unmarshal(message.Body, data) ;err != nil { fmt.Printf("Id:%s, Msg:%s \n", message.ID, string(message.Body)) err = nil } message.Finish() return nil } func initConsuemr(topic string, channel string) { var ( config *nsq.Config h *MyTestHandler err error ) h = &MyTestHandler{ } config = nsq.NewConfig() if h.q, err = nsq.NewConsumer(topic, channel, config); err != nil { fmt.Println(err) return } h.q.AddHandler(h) if err = h.q.ConnectToNSQD("127.0.0.1:4150"); err != nil { fmt.Println(err) } //<-h.q.StopChan fmt.Println("stop") return } func main() { initConsuemr("test","test") initConsuemr("test","lc") select{} } 

總的來說,NSQ的消費是有保障的,能保證消息的可靠性。可用多個 nsqd和nsqlookupd做分布式集群等,使用Go的channel能夠高並發消費,高吞吐量,而且,部署方面也簡單。
不過,給我的感覺還是不如Kafka和RocketMQ這些專業的消息隊列,不過在某些場景下還是夠用的。這個就得根椐自已的情況去取舍了,畢竟,沒有好的架構,只有合適的架構。

 

 

 

 

 

 

NSQ基本概念

核心概念

在討論NSQ如何在實踐中使用前,先理解NSQ隊列的架構原理是非常值得的。它的設計很簡單,可以通過幾個核心概念來理解。

Topic ——一個topic就是程序發布消息的一個邏輯鍵,當程序第一次發布消息時就會創建topic。

Channels ——channel組與消費者相關,是消費者之間的負載均衡,channel在某種意義上來說是一個“隊列”。每當一個發布者發送一條消息到一個topic,消息會被復制到所有消費者連接的channel上,消費者通過這個特殊的channel讀取消息,實際上,在消費者第一次訂閱時就會創建channel。

Channel會將消息進行排列,如果沒有消費者讀取消息,消息首先會在內存中排隊,當量太大時就會被保存到磁盤中。

Message s——消息構成了我們數據流的中堅力量,消費者可以選擇結束消息,表明它們正在被正常處理,或者重新將他們排隊待到后面再進行處理。每個消息包含傳遞嘗試的次數,當消息傳遞超過一定的閥值次數時,我們應該放棄這些消息,或者作為額外消息進行處理。

NSQ在操作期間同樣運行着兩個程序:

Nsqd ——nsqd守護進程是NSQ的核心部分,它是一個單獨的監聽某個端口進來的消息的二進制程序。每個nsqd節點都獨立運行,不共享任何狀態。當一個節點啟動時,它向一組nsqlookupd節點進行注冊操作,並將保存在此節點上的topic和channel進行廣播。

客戶端可以發布消息到nsqd守護進程上,或者從nsqd守護進程上讀取消息。通常,消息發布者會向一個單一的local nsqd發布消息,消費者從連接了的一組nsqd節點的topic上遠程讀取消息。如果你不關心動態添加節點功能,你可以直接運行standalone模式。

Nsqlookupd ——nsqlookupd服務器像consul或etcd那樣工作,只是它被設計得沒有協調和強一致性能力。每個nsqlookupd都作為nsqd節點注冊信息的短暫數據存儲區。消費者連接這些節點去檢測需要從哪個nsqd節點上讀取消息。

消息的生命周期

讓我們觀察一個關於nsq如何在實際中工作的更為詳細的例子。

NSQ推薦通過他們相應的nsqd實例使用協同定位發布者,這意味着即使面對網絡分區,消息也會被保存在本地,直到它們被一個消費者讀取。更重要的是,發布者不必去發現其他的nsqd節點,他們總是可以向本地實例發布消息。

首先,一個發布者向它的本地nsqd發送消息,要做到這點,首先要先打開一個連接,然后發送一個包含topic和消息主體的發布命令,在這種情況下,我們將消息發布到事件topic上以分散到我們不同的worker中。

事件topic會復制這些消息並且在每一個連接topic的channel上進行排隊,在我們的案例中,有三個channel,它們其中之一作為檔案channel。消費者會獲取這些消息並且上傳到S3。

每個channel的消息都會進行排隊,直到一個worker把他們消費,如果此隊列超出了內存限制,消息將會被寫入到磁盤中。

Nsqd節點首先會向nsqlookup廣播他們的位置信息,一旦它們注冊成功,worker將會從nsqlookup服務器節點上發現所有包含事件topic的nsqd節點。

然后每個worker向每個nsqd主機進行訂閱操作,用於表明worker已經准備好接受消息了。這里我們不需要一個完整的連通圖,但我們必須要保證每個單獨的nsqd實例擁有足夠的消費者去消費它們的消息,否則channel會被隊列堆着。

從客戶端庫代碼中抽取一部分,這里是一個關於如何處理我們的消息的一段代碼:

 

 

如果因為某些原因第三方發生故障了,我們可以處理這些故障,在這個代碼片中,我們有三種處理邏輯:

1、如果超過了某個嘗試次數閥值,我們就將消息丟棄。

2、如果消息已經被處理成功了,我們就結束消息。

3、如果發生了錯誤,我們將需要傳遞的消息重新進行排隊。

正如你所看到的,NSQ隊列的行為既簡單又明確。

在我們的案例中,我們在丟棄消息之前將容忍MAX_DELIVERY_ATTEMPTS * BACKOFF_TIME分鍾的故障。

在Segment系統中,我們統計消息嘗試的次數、消息丟棄數、消息重新排隊數等等,然后結束某些消息以保證我們有一個好的服務質量。如果消息丟棄數超過了我們設置的閥值,我們將在任何時候對服務發出警報。

在實踐中

在生產環境中,我們幾乎在我們所有的實例中運行nsqd守護程序,發布者之間協同定位。NSQ在實際生產中運行良好有幾個原因:

簡單的協議 ——如果你的隊列已經有了一個很好的客戶端庫,這個不是一個很大的問題,但如果你現在的客戶端庫存在bug或者過時了,一個簡單的協議就能體現出優勢了。

NSQ有一個快速的二進制協議,通過短短的幾天工作量就可以很簡單地實現這些協議,我們還自己創建了我們的純JS驅動(當時只存在coffeescript驅動),這個純JS驅動運行的很穩定可靠。

運行簡單 ——NSQ沒有復雜的水印設置或JVM級別的配置,相反,你可以配置保存到內存中的消息的數量和消息最大值,如果隊列被消息填滿了,消息會被保存到磁盤上。

分布式 ——因為NSQ沒有在守護程序之間共享信息,所以它從一開始就是為了分布式操作而生。個別的機器可以隨便宕機隨便啟動而不會影響到系統的其余部分,消息發布者可以在本地發布,即使面對網絡分區。

這種“分布式優先”的設計理念意味着NSQ基本上可以永遠不斷地擴展,需要更高的吞吐量?那就添加更多的nsqd吧。

唯一的共享狀態就是保存在lookup節點上,甚至它們不需要全局視圖,配置某些nsqd注冊到某些lookup節點上這是很簡單的配置,唯一關鍵的地方就是消費者可以通過lookup節點獲取所有完整的節點集。

清晰的故障事件——NSQ在組件內建立了一套明確關於可能導致故障的的故障權衡機制,這對消息傳遞和恢復都有意義。

我是最少意外原則的堅定信仰者,尤其是當它涉及到分布式系統時。系統發生故障,我們接收它,但我們不可能會指望系統以意外的形式發生故障,你最終會忽略這些故障案例,因為你甚至都不打算考慮它們為什么會發生。

雖然它們可能不像Kafka系統那樣提供嚴格的保證級別,但NSQ簡單的操作使故障情況非常明顯。

UNIX-y工具 ——NSQ是一個很好的通用型工具,所以NSQ附帶了很多實用的程序,這些程序是多用途和可組合的。

除了TCP協議,NSQ提供一個簡單的CURL的HTTP接口用於維護操作,它從CLI附帶了二進制文件管道,用tail跟蹤隊列的尾部,從一個隊列使用管道到另外一個隊列,還有HTTP發布訂閱。

甚至還有一個用於監控和暫停隊列的管理面板,包括一個動態的計數器在上面。

丟失了什么?

正如我所提到的,簡單並不是沒有折衷:

沒有復制 ——不像其他的隊列組件,NSQ並沒有提供任何形式的復制和集群,也正是這點讓它能夠如此簡單地運行,但它確實對於一些高保證性高可靠性的消息發布沒有足夠的保證。

我們可以通過降低文件同步的時間來部分避免,只需通過一個標志配置,通過EBS支持我們的隊列。但是這樣仍然存在一個消息被發布后馬上死亡,丟失了有效的寫入的情況。

基本消息路由 ——在NSQ中,topic和channel幾乎是你所有能獲得到的東西,沒有關於路由和基於key的親和力的觀念。我們很樂意為各種用例提供支持,無論是根據條件去篩選消息,還是根據條件路由到某些節點上。取而代之的是,我們最終建立了路由worker,它們處於隊列之間,扮演一個聰明的直通濾波器。

沒有嚴格的順序 ——雖然Kafka由一個有序的日志構成,但NSQ不是。消息可以在任何時間以任何順序進入隊列。在我們使用的案例中,這通常沒有關系,因為所有的數據都被加上了時間戳,但它並不適合需要嚴格順序的情況。

無數據重復刪除功能 ——Aphyr已經在他的文章中廣泛探討了基於超時系統的危險性。NSQ同樣也調入了這個陷阱,它使用了心跳檢測機制去測試消費者是否存活還是死亡。我們之前已經寫過關於很多原因會導致我們的worker無法完成心跳檢測,所以在worker中必須有一個單獨的步驟確保冪等性。

簡單的工作原理

正如你所看到的,后面看到的所有好處的基本核心就是簡單性,NSQ是一個簡單的隊列,這意味着它很容易進行故障推理和很容易發現bug。消費者可以自行處理故障事件而不會影響系統剩下的其余部分。

事實上,簡單性是我們決定使用NSQ的首要因素,這方便與我們的許多其他軟件一起維護,通過引入隊列使我們得到了堪稱完美的表現,通過隊列甚至讓我們增加了幾個數量級的吞吐量。

今天,我們面臨一個更加復雜的未來,我們越來越多的worker需要一套嚴格可靠性和順序性保障,這已經超過了NSQ提供的簡單功能。

我們計划在其他基礎設施中用Kafka替換NSQ,在生產上從JVM中運行可以獲取更多的好處。關於Kafka我們有一個明確的權衡,我們自己必須肩負起更多負責的運營。另一方面,它擁有一個可復制的、有序的日志可以提供給我們更好的服務。

但對於其他適合NSQ的worker,它為我們服務的相當好,我們期待着繼續鞏固它的堅實的基礎。

 

 

 

 

 

 

NSQ使用入門

簡介

消息隊列是進程間通信或同一進程不同線程間進行通信一種方式,可以將服務異步化,對流量進行整形,削峰填谷,是高並發、大數據場景下不可或缺的中間件;使得消息生產者和消費者解耦,方便系統模塊化設計。

NSQ是一個基於Go語言的分布式實時消息平台,它基於MIT開源協議發布,由bitly公司開源出來的一款簡單易用的消息中間件。 官方和第三方還為NSQ開發了眾多客戶端功能庫,如官方提供的基於HTTP的nsqd、Go客戶端go-nsq、Python客戶端pynsq、基於Node.js的JavaScript客戶端nsqjs、異步C客戶端libnsq、Java客戶端nsq-java以及基於各種語言的眾多第三方客戶端功能庫。

特性

  1. Distributed NSQ提供了分布式的,去中心化,且沒有單點故障的拓撲結構,穩定的消息傳輸發布保障,能夠具有高容錯和HA(高可用)特性。

  2. Scalable易於擴展NSQ支持水平擴展,沒有中心化的brokers。內置的發現服務簡化了在集群中增加節點。同時支持pub-sub和load-balanced 的消息分發。

  3. Ops FriendlyNSQ非常容易配置和部署,生來就綁定了一個管理界面。二進制包沒有運行時依賴。官方有Docker image。

  4. Integrated高度集成官方的 Go 和 Python庫都有提供。而且為大多數語言提供了庫。

  5. 消息不持久化,全在內存。可配置–mem-queue-size來超出閾值的消息寫到硬盤

  6. 保證at least once投遞。nsqd沒掛的前提下,由於超時,連接斷開,重新入隊等原因,消息可能多次投遞,client自己保證消費消息的操作是具有冪等性的。

  7. 值得注意的是,重要的是 nsqd 和 nsqlookupd 守護進程被設計成獨立運行,沒有相互之間的溝通或協調。

概念

Topic :一個topic就是程序發布消息的一個邏輯鍵,當程序第一次發布消息時就會創建topic。

Channels :channel與消費者相關,是消費者之間的負載均衡,channel在某種意義上來說是一個“隊列”。每當一個發布者發送一條消息到一個topic,消息會被復制到所有消費者連接的channel上,消費者通過這個特殊的channel讀取消息,實際上,在消費者第一次訂閱時就會創建channel。Channel會將消息進行排列,如果沒有消費者讀取消息,消息首先會在內存中排隊,當量太大時就會被保存到磁盤中

Messages:消息構成了我們數據流的中堅力量,消費者可以選擇結束消息,表明它們正在被正常處理,或者重新將他們排隊待到后面再進行處理。每個消息包含傳遞嘗試的次數,當消息傳遞超過一定的閥值次數時,我們應該放棄這些消息,或者作為額外消息進行處理。

nsqd:nsqd 是一個守護進程,負責接收,排隊,投遞消息給客戶端。它可以獨立運行,不過通常它是由 nsqlookupd 實例所在集群配置的(它在這能聲明 topics 和 channels,以便大家能找到)。

nsqlookupd:nsqlookupd 是守護進程負責管理拓撲信息。客戶端通過查詢 nsqlookupd 來發現指定話題(topic)的生產者,並且 nsqd 節點廣播話題(topic)和通道(channel)信息。有兩個接口:TCP 接口,nsqd 用它來廣播。HTTP 接口,客戶端用它來發現和管理。注:是消費者查詢去哪里取消息,而不是生產者查詢生產消息到哪里去

每個nsqd具有與nsqlookupd的長期TCP連接,在該連接上它定期推送其狀態。此數據用於通知nsqlookupd將為消費者提供哪些nsqd地址。對於消費者,將公開HTTP /查找端點以進行輪詢。

對於nsqlookupd,通過運行多個實例來實現高可用性。它們不直接相互通信,數據被認為最終是一致的。消費者輪詢所有已配置的nsqlookupd實例並將響應聯合起來。陳舊,不可訪問或其他故障節點不會使系統停止運行。

nsqadmin:nsqadmin 是一套 WEB UI,用來匯集集群的實時統計,並執行不同的管理任務。 常用工具類:

nsq_to _file:消費指定的話題(topic)/通道(channel),並寫到文件中,有選擇的滾動和/或壓縮文件。

nsq_to _http:消費指定的話題(topic)/通道(channel)和執行 HTTP requests (GET/POST) 到指定的端點。

nsq_to _nsq:消費者指定的話題/通道和重發布消息到目的地 nsqd 通過 TCP。

拓撲結構

NSQ推薦通過他們相應的nsqd實例使用協同定位發布者,這意味着即使面對網絡分區,消息也會被保存在本地,直到它們被一個消費者讀取。更重要的是,發布者不必去發現其他的nsqd節點,他們總是可以向本地實例發布消息。

在這里插入圖片描述

首先,一個發布者向它的本地nsqd發送消息,要做到這點,首先要先打開一個連接,然后發送一個包含topic和消息主體的發布命令,在這種情況下,我們將消息發布到事件topic上以分散到我們不同的worker中。 事件topic會復制這些消息並且在每一個連接topic的channel上進行排隊,在我們的案例中,有三個channel,它們其中之一作為檔案channel。消費者會獲取這些消息並且上傳到S3。

在這里插入圖片描述

每個channel的消息都會進行排隊,直到一個worker把他們消費,如果此隊列超出了內存限制,消息將會被寫入到磁盤中。Nsqd節點首先會向nsqlookup廣播他們的位置信息,一旦它們注冊成功,worker將會從nsqlookup服務器節點上發現所有包含事件topic的nsqd節點。

在這里插入圖片描述
然后每個worker向每個nsqd主機進行訂閱操作,用於表明worker已經准備好接受消息了。這里我們不需要一個完整的連通圖,但我們必須要保證每個單獨的nsqd實例擁有足夠的消費者去消費它們的消息,否則channel會被隊列堆着。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM