簡介
NSQ是1個分布式(distributed)、可擴展(scalable)、配置簡單(Ops Friendly)、可集成(integrated)、實時( realtime )的消息傳遞平台。即消息中間件。
可以將原本耦合、同步執行的程序 解耦成 生產端+ 消息隊列+消費端模型的異步程序,加上分布式的生產者和消費者架構就可以在一定程度上支撐大並發。
NSQ是go語言開發的消息隊列,它的設計目標是為在多台計算機上運行的松散服務提供一個現代化的基礎設施骨架。
官方和第三方為NSQ開發了眾多客戶端功能庫,如官方提供的基於HTTP的nsqd、Go客戶端go-nsq、Python客戶端pynsq、基於Node.js的JavaScript客戶端nsqjs、異步C客戶端libnsq、Java客戶端nsq-java以及基於各種語言的眾多第三方客戶端功能庫。
組成

NSQ主要是由3個進程組成的:
- nsqd是一個接收、排隊、然后轉發消息到客戶端的進程。
- nsqlookupd 管理拓撲信息並提供最終一致性的發現服務。
- nsqadmin用於實時查看集群的統計數據(並且執行各種各樣的管理任務)
Topic 和 Channel
每個nsqd實例旨在一次處理多個數據流。這些數據流稱為“topics”,
1個topic具有1個或N個“channels”,所有的channels都會收到topic消息的副本,實際上topic是通過channel來消費它產生的消息的。
如果說topic是工廠,channel就是各種不同的銷售渠道,那么生產者就是工人們,消費者就是來自許多不同渠道的客戶們。
topic 由生產者端指定,channel由消費者端指定
topic和channel都相互獨立地緩沖數據,防止緩慢的消費者導致其他chennel的積壓(同樣適用於topic級別)。
channel可以並且通常會連接多個客戶端。假設所有連接的客戶端都處於准備接收消息的狀態,則每條消息將被傳遞到隨機客戶端。例如:

部署
建議使用 docker 部署,插拔方便,不影響主機器。
創建一個文件夾,如 mkdir docker-compose-test , 在其中創建如下 docker-compose.yml 文件。
docker-compose.yml 如下:
version: '3'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
# networks: # 本地部署可以刪除地址綁定這三行,下同
# nsqnetwork:
# ipv4_address: 172.20.0.2
ports:
- "9910:4160" # Tcp端口 廣播
- "9911:4161" # Http端口 客戶端發現和管理操作
nsqd:
image: nsqio/nsq
command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
# networks:
# nsqnetwork:
# ipv4_address: 172.20.0.3
ports:
- "9912:4150" # tcp服務客戶端
- "9913:4151" # 提供http端口
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
# networks:
# nsqnetwork:
# ipv4_address: 172.20.0.4
ports:
- "9914:4171"
保存退出后,使用如下命令啟動
docker-compose up
后台啟動則可以 使用命令
docker-compose up -d
訪問本地網址 127.0.0.1:9914 即可看到頁面板。
代碼使用
相關客戶端支持可參看此鏈接
因為業務相關,我們一般采用的是 HTTP 的方式生產消息。使用方式如下:
curl -d "<message>" http://127.0.0.1:9913/pub?topic=name
采用上述命令生產一個消息給 nsq 后,可在 nsq 面板(127.0.0.1:9914)看到這條消息。

主頁面
點擊,即可進入內部查看具體信息。包括 NSQD 節點,具體的channel,隊列中的消息數,連接數等。

nsqadmin

channel 。上圖最下一行提示沒有客戶端(消費者)連接這個 channel。啟動客戶端即可。
客戶端測試代碼:
# nsq_consume_yezi.py
import nsq
def handler(message):
print(message)
print(message.body)
return True
r = nsq.Reader(message_handler=handler, nsqd_tcp_addresses=['127.0.0.1:9912'], topic='yezi_topic', channel='ceshi01', lookupd_poll_interval=15)
nsq.run()
使用 python 啟動此代碼,

面板也顯示了客戶端的連接。

其他
列出所有的 NSQD 節點:

消息的統計:

lookup主機的列表:

