NSQ
NSQ介紹
NSQ是Go語言編寫的一個開源的實時分布式內存消息隊列,其性能十分優異。 NSQ的優勢有以下優勢:
- NSQ提倡分布式和分散的拓撲,沒有單點故障,支持容錯和高可用性,並提供可靠的消息交付保證
- NSQ支持橫向擴展,沒有任何集中式代理。
- NSQ易於配置和部署,並且內置了管理界面。
NSQ的應用場景
通常來說,消息隊列都適用以下場景。
異步處理
參照下圖利用消息隊列把業務流程中的非關鍵流程異步化,從而顯著降低業務請求的響應時間。
應用解耦
通過使用消息隊列將不同的業務邏輯解耦,降低系統間的耦合,提高系統的健壯性。后續有其他業務要使用訂單數據可直接訂閱消息隊列,提高系統的靈活性。![nsq應用場景1]
流量削峰
類似秒殺(大秒)等場景下,某一時間可能會產生大量的請求,使用消息隊列能夠為后端處理請求提供一定的緩沖區,保證后端服務的穩定性。
安裝
官方下載頁面根據自己的平台下載並解壓即可。
NSQ組件
nsqd
nsqd是一個守護進程,它接收、排隊並向客戶端發送消息。
啟動nsqd
,指定-broadcast-address=127.0.0.1
來配置廣播地址
./nsqd -broadcast-address=127.0.0.1
如果是在搭配nsqlookupd
使用的模式下需要還指定nsqlookupd
地址:
./nsqd -broadcast-address=127.0.0.1 -lookupd-tcp-address=127.0.0.1:4160
如果是部署了多個nsqlookupd
節點的集群,那還可以指定多個-lookupd-tcp-address
。
nsqdq
相關配置項如下:
-auth-http-address value
<addr>:<port> to query auth server (may be given multiple times)
-broadcast-address string
address that will be registered with lookupd (defaults to the OS hostname) (default "PROSNAKES.local")
-config string
path to config file
-data-path string
path to store disk-backed messages
-deflate
enable deflate feature negotiation (client compression) (default true)
-e2e-processing-latency-percentile value
message processing time percentiles (as float (0, 1.0]) to track (can be specified multiple times or comma separated '1.0,0.99,0.95', default none)
-e2e-processing-latency-window-time duration
calculate end to end latency quantiles for this duration of time (ie: 60s would only show quantile calculations from the past 60 seconds) (default 10m0s)
-http-address string
<addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4151")
-http-client-connect-timeout duration
timeout for HTTP connect (default 2s)
-http-client-request-timeout duration
timeout for HTTP request (default 5s)
-https-address string
<addr>:<port> to listen on for HTTPS clients (default "0.0.0.0:4152")
-log-prefix string
log message prefix (default "[nsqd] ")
-lookupd-tcp-address value
lookupd TCP address (may be given multiple times)
-max-body-size int
maximum size of a single command body (default 5242880)
-max-bytes-per-file int
number of bytes per diskqueue file before rolling (default 104857600)
-max-deflate-level int
max deflate compression level a client can negotiate (> values == > nsqd CPU usage) (default 6)
-max-heartbeat-interval duration
maximum client configurable duration of time between client heartbeats (default 1m0s)
-max-msg-size int
maximum size of a single message in bytes (default 1048576)
-max-msg-timeout duration
maximum duration before a message will timeout (default 15m0s)
-max-output-buffer-size int
maximum client configurable size (in bytes) for a client output buffer (default 65536)
-max-output-buffer-timeout duration
maximum client configurable duration of time between flushing to a client (default 1s)
-max-rdy-count int
maximum RDY count for a client (default 2500)
-max-req-timeout duration
maximum requeuing timeout for a message (default 1h0m0s)
-mem-queue-size int
number of messages to keep in memory (per topic/channel) (default 10000)
-msg-timeout string
duration to wait before auto-requeing a message (default "1m0s")
-node-id int
unique part for message IDs, (int) in range [0,1024) (default is hash of hostname) (default 616)
-snappy
enable snappy feature negotiation (client compression) (default true)
-statsd-address string
UDP <addr>:<port> of a statsd daemon for pushing stats
-statsd-interval string
duration between pushing to statsd (default "1m0s")
-statsd-mem-stats
toggle sending memory and GC stats to statsd (default true)
-statsd-prefix string
prefix used for keys sent to statsd (%s for host replacement) (default "nsq.%s")
-sync-every int
number of messages per diskqueue fsync (default 2500)
-sync-timeout duration
duration of time per diskqueue fsync (default 2s)
-tcp-address string
<addr>:<port> to listen on for TCP clients (default "0.0.0.0:4150")
-tls-cert string
path to certificate file
-tls-client-auth-policy string
client certificate auth policy ('require' or 'require-verify')
-tls-key string
path to key file
-tls-min-version value
minimum SSL/TLS version acceptable ('ssl3.0', 'tls1.0', 'tls1.1', or 'tls1.2') (default 769)
-tls-required
require TLS for client connections (true, false, tcp-https)
-tls-root-ca-file string
path to certificate authority file
-verbose
enable verbose logging
-version
print version string
-worker-id
do NOT use this, use --node-id
nsqlookupd
nsqlookupd是維護所有nsqd狀態、提供服務發現的守護進程。它能為消費者查找特定topic
下的nsqd提供了運行時的自動發現服務。 它不維持持久狀態,也不需要與任何其他nsqlookupd實例協調以滿足查詢。因此根據你系統的冗余要求盡可能多地部署nsqlookupd
節點。它們小豪的資源很少,可以與其他服務共存。我們的建議是為每個數據中心運行至少3個集群。
nsqlookupd
相關配置項如下:
-broadcast-address string
address of this lookupd node, (default to the OS hostname) (default "PROSNAKES.local")
-config string
path to config file
-http-address string
<addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4161")
-inactive-producer-timeout duration
duration of time a producer will remain in the active list since its last ping (default 5m0s)
-log-prefix string
log message prefix (default "[nsqlookupd] ")
-tcp-address string
<addr>:<port> to listen on for TCP clients (default "0.0.0.0:4160")
-tombstone-lifetime duration
duration of time a producer will remain tombstoned if registration remains (default 45s)
-verbose
enable verbose logging
-version
print version string
nsqadmin
一個實時監控集群狀態、執行各種管理任務的Web管理平台。 啟動nsqadmin
,指定nsqlookupd
地址:
./nsqadmin -lookupd-http-address=127.0.0.1:4161
我們可以使用瀏覽器打開http://127.0.0.1:4171/
訪問如下管理界面。
nsqadmin
相關的配置項如下:
-allow-config-from-cidr string
A CIDR from which to allow HTTP requests to the /config endpoint (default "127.0.0.1/8")
-config string
path to config file
-graphite-url string
graphite HTTP address
-http-address string
<addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4171")
-http-client-connect-timeout duration
timeout for HTTP connect (default 2s)
-http-client-request-timeout duration
timeout for HTTP request (default 5s)
-http-client-tls-cert string
path to certificate file for the HTTP client
-http-client-tls-insecure-skip-verify
configure the HTTP client to skip verification of TLS certificates
-http-client-tls-key string
path to key file for the HTTP client
-http-client-tls-root-ca-file string
path to CA file for the HTTP client
-log-prefix string
log message prefix (default "[nsqadmin] ")
-lookupd-http-address value
lookupd HTTP address (may be given multiple times)
-notification-http-endpoint string
HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent
-nsqd-http-address value
nsqd HTTP address (may be given multiple times)
-proxy-graphite
proxy HTTP requests to graphite
-statsd-counter-format string
The counter stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string. (default "stats.counters.%s.count")
-statsd-gauge-format string
The gauge stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string. (default "stats.gauges.%s")
-statsd-interval duration
time interval nsqd is configured to push to statsd (must match nsqd) (default 1m0s)
-statsd-prefix string
prefix used for keys sent to statsd (%s for host replacement, must match nsqd) (default "nsq.%s")
-version
print version string
NSQ架構
NSQ工作模式
Topic和Channel
每個nsqd實例旨在一次處理多個數據流。這些數據流稱為“topics”
,一個topic
具有1個或多個“channels”
。每個channel
都會收到topic
所有消息的副本,實際上下游的服務是通過對應的channel
來消費topic
消息。
topic
和channel
不是預先配置的。topic
在首次使用時創建,方法是將其發布到指定topic
,或者訂閱指定topic
上的channel
。channel
是通過訂閱指定的channel
在第一次使用時創建的。
topic
和channel
都相互獨立地緩沖數據,防止緩慢的消費者導致其他chennel
的積壓(同樣適用於topic
級別)。
channel
可以並且通常會連接多個客戶端。假設所有連接的客戶端都處於准備接收消息的狀態,則每條消息將被傳遞到隨機客戶端。例如:
總而言之,消息是從
topic -> channel
(每個channel接收該topic的所有消息的副本)多播的,但是從channel -> consumers
均勻分布(每個消費者接收該channel的一部分消息)。
NSQ接收和發送消息流程
NSQ特性
- 消息默認不持久化,可以配置成持久化模式。nsq采用的方式時內存+硬盤的模式,當內存到達一定程度時就會將數據持久化到硬盤。
- 如果將
--mem-queue-size
設置為0,所有的消息將會存儲到磁盤。 - 服務器重啟時也會將當時在內存中的消息持久化。
- 如果將
- 每條消息至少傳遞一次。
- 消息不保證有序。
Go操作NSQ
官方提供了Go語言版的客戶端:go-nsq,更多客戶端支持請查看CLIENT LIBRARIES。
安裝
go get -u github.com/nsqio/go-nsq
生產者
一個簡單的生產者示例代碼如下:
// nsq_producer/main.go
package main
import (
"bufio"
"fmt"
"os"
"strings"
"github.com/nsqio/go-nsq"
)
// NSQ Producer Demo
var producer *nsq.Producer
// 初始化生產者
func initProducer(str string) (err error) {
config := nsq.NewConfig()
producer, err = nsq.NewProducer(str, config)
if err != nil {
fmt.Printf("create producer failed, err:%v\n", err)
return err
}
return nil
}
func main() {
nsqAddress := "127.0.0.1:4150"
err := initProducer(nsqAddress)
if err != nil {
fmt.Printf("init producer failed, err:%v\n", err)
return
}
reader := bufio.NewReader(os.Stdin) // 從標准輸入讀取
for {
data, err := reader.ReadString('\n')
if err != nil {
fmt.Printf("read string from stdin failed, err:%v\n", err)
continue
}
data = strings.TrimSpace(data)
if strings.ToUpper(data) == "Q" { // 輸入Q退出
break
}
// 向 'topic_demo' publish 數據
err = producer.Publish("topic_demo", []byte(data))
if err != nil {
fmt.Printf("publish msg to nsq failed, err:%v\n", err)
continue
}
}
}
將上面的代碼編譯執行,然后在終端輸入兩條數據123
和456
:
$ ./nsq_producer
123
2018/10/22 18:41:20 INF 1 (127.0.0.1:4150) connecting to nsqd
456
使用瀏覽器打開http://127.0.0.1:4171/
可以查看到類似下面的頁面: 在下面這個頁面能看到當前的topic
信息:
點擊頁面上的topic_demo
就能進入一個展示更多詳細信息的頁面,在這個頁面上我們可以查看和管理topic
,同時能夠看到目前在LWZMBP:4151 (127.0.01:4151)
這個nsqd
上有2條message。又因為沒有消費者接入所以暫時沒有創建channel
。![nsqadmin界面2]
在/nodes
這個頁面我們能夠很方便的查看當前接入lookupd
的nsqd
節點。
這個/counter
頁面顯示了處理的消息數量,因為我們沒有接入消費者,所以處理的消息數量為0。
在/lookup
界面支持創建topic
和channel
。
消費者
一個簡單的消費者示例代碼如下:
// nsq_consumer/main.go
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/nsqio/go-nsq"
)
// NSQ Consumer Demo
// MyHandler 是一個消費者類型
type MyHandler struct {
Title string
}
// HandleMessage 是需要實現的處理消息的方法
func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))
return
}
// 初始化消費者
func initConsumer(topic string, channel string, address string) (err error) {
config := nsq.NewConfig()
config.LookupdPollInterval = 15 * time.Second
c, err := nsq.NewConsumer(topic, channel, config)
if err != nil {
fmt.Printf("create consumer failed, err:%v\n", err)
return
}
consumer := &MyHandler{
Title: "沙河1號",
}
c.AddHandler(consumer)
// if err := c.ConnectToNSQD(address); err != nil { // 直接連NSQD
if err := c.ConnectToNSQLookupd(address); err != nil { // 通過lookupd查詢
return err
}
return nil
}
func main() {
err := initConsumer("topic_demo", "first", "127.0.0.1:4161")
if err != nil {
fmt.Printf("init consumer failed, err:%v\n", err)
return
}
c := make(chan os.Signal) // 定義一個信號的通道
signal.Notify(c, syscall.SIGINT) // 轉發鍵盤中斷信號到c
<-c // 阻塞
}
將上面的代碼保存之后編譯執行,就能夠獲取之前我們publish的兩條消息了:
$ ./nsq_consumer
2018/10/22 18:49:06 INF 1 [topic_demo/first] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=topic_demo
2018/10/22 18:49:06 INF 1 [topic_demo/first] (127.0.0.1:4150) connecting to nsqd
沙河1號 recv from 127.0.0.1:4150, msg:123
沙河1號 recv from 127.0.0.1:4150, msg:456
同時在nsqadmin的/counter
頁面查看到處理的數據數量為2。
關於go-nsq
的更多內容請閱讀[go-nsq的官方文檔](