前言
我們可以將原本耦合、同步執行的程序 解耦成 生產端+ 消息隊列+消費端模型的異步程序,加上分布式的生產者和消費者架構就可以在一定程度上支撐大並發。
NSQ是go語言開發的消息隊列,所以對nsqd進行水平擴展時它的部署、配置也會相對簡單。
如果熟悉golang的話在遇到了十分棘手的問題時,看一下源碼!
NSQ介紹
NSQ是1個分布式(distributed)、可擴展(scalable)、配置簡單(Ops Friendly)、可集成(integrated)、實時( realtime )的消息傳遞平台。
Topic和Channel
每個nsqd實例旨在一次處理多個數據流。這些數據流稱為“topics”,
1個topic具有1個或N個“channels”,所有的channels都會收到topic消息的副本,實際上topic是通過channel來消費它產生的消息的。
破天荒的技術源自生活場景的啟發!
聽說德國哲學家 萊布尼茲 最初正是根據伏羲黃老的陰陽學說提出了二進制思想。666
如果說topic是工廠,channel就是各種不同的銷售渠道,那么生產者就是工人們,消費者就是來自許多不同渠道的客戶們。
topic是生產者者端指定的
channel是消費者端選擇的
topic
和channel
都相互獨立地緩沖數據,防止緩慢的消費者導致其他chennel
的積壓(同樣適用於topic
級別)。
channel
可以並且通常會連接多個客戶端。假設所有連接的客戶端都處於准備接收消息的狀態,則每條消息將被傳遞到隨機客戶端。例如:
NSQ部署
NSQ核心
NSQ有3大核心組件 nsqlookupd(消費者和生產者關系調度)、nsqd(存儲消息的主進程)、nsqadmin(web 管理服務) 都在源碼的bin目錄下
[root@zhanggen bin]# ls nsqadmin nsqlookupd nsq_tail nsq_to_http to_nsq nsqd nsq_stat nsq_to_file nsq_to_nsq [root@zhanggen bin]# pwd /nsp/nsq-1.2.0.linux-amd64.go1.12.9/bin [root@zhanggen bin]#
啟動NSQ
1.nsqlookupd啟動
-broadcast-address指定nsqlookupd監聽的ip地址
[root@zhanggen bin]# ./nsqlookupd -broadcast-address=0.0.0.0 [nsqlookupd] 2020/05/14 04:21:44.808337 INFO: nsqlookupd v1.2.0 (built w/go1.12.9) [nsqlookupd] 2020/05/14 04:21:44.822982 INFO: HTTP: listening on [::]:4161 [nsqlookupd] 2020/05/14 04:21:44.823181 INFO: TCP: listening on [::]:4160
上面看到nslookupd 開了2個端口1TCP服務端口(4060)1個http服務的端口(4061)
1個監聽在TCP服務監聽在4160端口(檢測所有組冊到nsdlookup中的nsqd的心跳)。
1個監聽在HTTP服務4161端口(為nsqadmin提供web api)
2.啟動nsqd
--lookupd-tcp-address=0.0.0.0:4160 指定當前nsqd服務注冊到哪個nsqlookup!
[root@zhanggen bin]# ./nsqd --lookupd-tcp-address=0.0.0.0:4160 [nsqd] 2020/05/14 04:39:42.068804 INFO: nsqd v1.2.0 (built w/go1.12.9) [nsqd] 2020/05/14 04:39:42.069169 INFO: ID: 133 [nsqd] 2020/05/14 04:39:42.070362 INFO: NSQ: persisting topic/channel metadata to nsqd.dat [nsqd] 2020/05/14 04:39:42.092537 INFO: LOOKUP(0.0.0.0:4160): adding peer [nsqd] 2020/05/14 04:39:42.092573 INFO: LOOKUP connecting to 0.0.0.0:4160 [nsqd] 2020/05/14 04:39:42.093154 INFO: TCP: listening on [::]:4150 [nsqd] 2020/05/14 04:39:42.093236 INFO: HTTP: listening on [::]:4151 [nsqd] 2020/05/14 04:39:42.100185 INFO: LOOKUPD(0.0.0.0:4160): peer info {TCPPort:4160 HTTPPort:4161 Version:1.2.0 BroadcastAddress:0.0.0.0}
啟動nsqadmin服務
[root@zhanggen bin]# ./nsqadmin --lookupd-http-address=0.0.0.0:4161 [nsqadmin] 2020/05/14 04:45:46.652464 INFO: nsqadmin v1.2.0 (built w/go1.12.9) [nsqadmin] 2020/05/14 04:45:46.686502 INFO: HTTP: listening on [::]:4171
啟動成功
[root@zhanggen zhanggen]# netstat -antlp |grep nsq* Active Internet connections (servers and established) tcp 0 0 192.168.122.1:53 0.0.0.0:* LISTEN 6337/dnsmasq
tcp 0 0 127.0.0.1:47706 127.0.0.1:4160 ESTABLISHED 26913/./nsqd tcp6 0 0 :::4171 :::* LISTEN 26998/./nsqadmin tcp6 0 0 :::4150 :::* LISTEN 26913/./nsqd tcp6 0 0 :::4151 :::* LISTEN 26913/./nsqd tcp6 0 0 :::4160 :::* LISTEN 26904/./nsqlookupd tcp6 0 0 :::4161 :::* LISTEN 26904/./nsqlookupd tcp6 0 0 192.168.56.133:4161 192.168.56.1:8380 ESTABLISHED 26904/./nsqlookupd tcp6 0 0 127.0.0.1:4160 127.0.0.1:47706 ESTABLISHED 26904/./nsqlookupd tcp6 0 0 192.168.56.133:4161 192.168.56.1:8379 ESTABLISHED 26904/./nsqlookupd tcp6 0 0 192.168.56.133:4161 192.168.56.1:8010 ESTABLISHED 26904/./nsqlookupd
NSQ應用
nsqdq配置項
-auth-http-address value <addr>:<port> to query auth server (may be given multiple times) -broadcast-address string address that will be registered with lookupd (defaults to the OS hostname) (default "PROSNAKES.local") -config string path to config file -data-path string path to store disk-backed messages -deflate enable deflate feature negotiation (client compression) (default true) -e2e-processing-latency-percentile value message processing time percentiles (as float (0, 1.0]) to track (can be specified multiple times or comma separated '1.0,0.99,0.95', default none) -e2e-processing-latency-window-time duration calculate end to end latency quantiles for this duration of time (ie: 60s would only show quantile calculations from the past 60 seconds) (default 10m0s) -http-address string <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4151") -http-client-connect-timeout duration timeout for HTTP connect (default 2s) -http-client-request-timeout duration timeout for HTTP request (default 5s) -https-address string <addr>:<port> to listen on for HTTPS clients (default "0.0.0.0:4152") -log-prefix string log message prefix (default "[nsqd] ") -lookupd-tcp-address value lookupd TCP address (may be given multiple times) -max-body-size int maximum size of a single command body (default 5242880) -max-bytes-per-file int number of bytes per diskqueue file before rolling (default 104857600) -max-deflate-level int max deflate compression level a client can negotiate (> values == > nsqd CPU usage) (default 6) -max-heartbeat-interval duration maximum client configurable duration of time between client heartbeats (default 1m0s) -max-msg-size int maximum size of a single message in bytes (default 1048576) -max-msg-timeout duration maximum duration before a message will timeout (default 15m0s) -max-output-buffer-size int maximum client configurable size (in bytes) for a client output buffer (default 65536) -max-output-buffer-timeout duration maximum client configurable duration of time between flushing to a client (default 1s) -max-rdy-count int maximum RDY count for a client (default 2500) -max-req-timeout duration maximum requeuing timeout for a message (default 1h0m0s) -mem-queue-size int number of messages to keep in memory (per topic/channel) (default 10000) -msg-timeout string duration to wait before auto-requeing a message (default "1m0s") -node-id int unique part for message IDs, (int) in range [0,1024) (default is hash of hostname) (default 616) -snappy enable snappy feature negotiation (client compression) (default true) -statsd-address string UDP <addr>:<port> of a statsd daemon for pushing stats -statsd-interval string duration between pushing to statsd (default "1m0s") -statsd-mem-stats toggle sending memory and GC stats to statsd (default true) -statsd-prefix string prefix used for keys sent to statsd (%s for host replacement) (default "nsq.%s") -sync-every int number of messages per diskqueue fsync (default 2500) -sync-timeout duration duration of time per diskqueue fsync (default 2s) -tcp-address string <addr>:<port> to listen on for TCP clients (default "0.0.0.0:4150") -tls-cert string path to certificate file -tls-client-auth-policy string client certificate auth policy ('require' or 'require-verify') -tls-key string path to key file -tls-min-version value minimum SSL/TLS version acceptable ('ssl3.0', 'tls1.0', 'tls1.1', or 'tls1.2') (default 769) -tls-required require TLS for client connections (true, false, tcp-https) -tls-root-ca-file string path to certificate authority file -verbose enable verbose logging -version print version string -worker-id do NOT use this, use --node-id
nsqlookupd配置項
-broadcast-address string address of this lookupd node, (default to the OS hostname) (default "PROSNAKES.local") -config string path to config file -http-address string <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4161") -inactive-producer-timeout duration duration of time a producer will remain in the active list since its last ping (default 5m0s) -log-prefix string log message prefix (default "[nsqlookupd] ") -tcp-address string <addr>:<port> to listen on for TCP clients (default "0.0.0.0:4160") -tombstone-lifetime duration duration of time a producer will remain tombstoned if registration remains (default 45s) -verbose enable verbose logging -version print version string
nsqadmin配置項
-allow-config-from-cidr string A CIDR from which to allow HTTP requests to the /config endpoint (default "127.0.0.1/8") -config string path to config file -graphite-url string graphite HTTP address -http-address string <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4171") -http-client-connect-timeout duration timeout for HTTP connect (default 2s) -http-client-request-timeout duration timeout for HTTP request (default 5s) -http-client-tls-cert string path to certificate file for the HTTP client -http-client-tls-insecure-skip-verify configure the HTTP client to skip verification of TLS certificates -http-client-tls-key string path to key file for the HTTP client -http-client-tls-root-ca-file string path to CA file for the HTTP client -log-prefix string log message prefix (default "[nsqadmin] ") -lookupd-http-address value lookupd HTTP address (may be given multiple times) -notification-http-endpoint string HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent -nsqd-http-address value nsqd HTTP address (may be given multiple times) -proxy-graphite proxy HTTP requests to graphite -statsd-counter-format string The counter stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string. (default "stats.counters.%s.count") -statsd-gauge-format string The gauge stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string. (default "stats.gauges.%s") -statsd-interval duration time interval nsqd is configured to push to statsd (must match nsqd) (default 1m0s) -statsd-prefix string prefix used for keys sent to statsd (%s for host replacement, must match nsqd) (default "nsq.%s") -version print version string
go-nsq
官方提供了Go語言版的客戶端:go-nsq,更多客戶端支持請查看CLIENT LIBRARIES。
go get -u github.com/nsqio/go-nsq
生產者
package main import ( "encoding/json" "fmt" "github.com/nsqio/go-nsq" ) var producer *nsq.Producer func initProducer(nsqdAddress string) (err error) { config := nsq.NewConfig() //創建1個生產者 producer, err = nsq.NewProducer(nsqdAddress, config) if err != nil { fmt.Println("創建生成者失敗", err) return err } return nil } type memory struct { Total int `json:"total"` Free int `json:"free"` Usage int `json:"usage"` } func main() { nsqAddress := "192.168.56.133:4150" err := initProducer(nsqAddress) if err != nil { return } m:=memory{ Total:10000, Free:100, Usage:200, } b, err := json.Marshal(m) if err != nil { fmt.Println("json序列化") return } err = producer.Publish("CMDB",b) if err != nil { fmt.Printf("publish msg to nsq failed, err:%v\n", err) } return }
消費者
package main import ( "encoding/json" "fmt" "time" "github.com/nsqio/go-nsq" ) type memory struct { Name string `json:"-"` Total int `json:"total"` Free int `json:"free"` Usage int `json:"usage"` } //HandleMessage 是需要實現的處理消息的方法 func (m *memory) HandleMessage(memMsg *nsq.Message) (err error) { //獲取並解析數據 err = json.Unmarshal(memMsg.Body, m) if err != nil { fmt.Printf("json數據解析失敗!") return } fmt.Printf("標題:%s 來自:%v, 內容:%#v\n", m.Name, memMsg.NSQDAddress, m) fmt.Println(m.Total, m.Usage, m.Free) return nil } func initConsumer(topic, channel, address string) (err error) { //消費端配置 config := nsq.NewConfig() //連接lookupd中介時15秒超時 config.LookupdPollInterval = 15 * time.Second //創建消費端 consumer, err := nsq.NewConsumer(topic, channel, config) if err != nil { fmt.Println("creat consumer fiald", err) return } //創建內存消費者1 consumerMem := &memory{ Name: "1號消費者", } consumer.AddHandler(consumerMem) // if err := c.ConnectToNSQD(address); err != nil { // 直接連NSQD // 通過lookupd中介找到nsqd服務,並獲取生產者Publish的數據 if err := consumer.ConnectToNSQLookupd(address); err != nil { return err } return nil } func main() { //topic, channel, address err := initConsumer("CMDB", "chanMEM", "192.168.56.133:4161") if err != nil { fmt.Printf("init consumer failed, err:%v\n", err) return } for { //main等待 } }