消息隊列是架構級解耦方案,常用於流量削峰、應用解耦、異步處理
消息隊列之NSQ
NSQ是目前比較流行的一個分布式的消息隊列,本文主要介紹了NSQ及Go語言如何操作NSQ。
NSQ介紹
NSQ是Go語言編寫的一個開源的實時分布式內存消息隊列,其性能十分優異。 NSQ的優勢有以下優勢:
NSQ提倡分布式和分散的拓撲,沒有單點故障,支持容錯和高可用性,並提供可靠的消息交付保證 NSQ支持橫向擴展,沒有任何集中式代理。 NSQ易於配置和部署,並且內置了管理界面。
安裝
官方下載頁面根據自己的平台下載並解壓即可。
啟動nsqd: nsqd -lookupd-tcp-address=127.0.0.1:4160 [nsqd] 2019/07/18 16:02:50.968403 INFO: nsqd v1.1.0 (built w/go1.10.3) [nsqd] 2019/07/18 16:02:51.013659 INFO: ID: 826 [nsqd] 2019/07/18 16:02:51.014577 INFO: NSQ: persisting topic/channel metadata to nsqd.dat [nsqd] 2019/07/18 16:02:51.035655 INFO: HTTP: listening on [::]:4151 [nsqd] 2019/07/18 16:02:51.035655 INFO: LOOKUP(127.0.0.1:4160): adding peer [nsqd] 2019/07/18 16:02:51.038262 INFO: LOOKUP connecting to 127.0.0.1:4160 [nsqd] 2019/07/18 16:02:51.035655 INFO: TCP: listening on [::]:4150 啟動nsqd: nsqlookupd [nsqlookupd] 2019/07/18 15:59:34.219793 INFO: nsqlookupd v1.1.0 (built w/go1.10.3) [nsqlookupd] 2019/07/18 15:59:34.279192 INFO: TCP: listening on [::]:4160 [nsqlookupd] 2019/07/18 15:59:34.279192 INFO: HTTP: listening on [::]:4161 運行nsqadmin管理: nsqadmin -lookupd-http-address=127.0.0.1:4161 [nsqadmin] 2019/07/18 15:59:54.169512 INFO: nsqadmin v1.1.0 (built w/go1.10.3) [nsqadmin] 2019/07/18 15:59:54.213611 INFO: HTTP: listening on [::]:4171 創建topic http://127.0.0.1:4171/lookup topic_demo4
Go操作NSQ

// nsq_producer/main.go package main import ( "bufio" "fmt" "os" "strings" "github.com/nsqio/go-nsq" ) // NSQ Producer Demo var producer *nsq.Producer // 初始化生產者 func initProducer(str string) (err error) { config := nsq.NewConfig() producer, err = nsq.NewProducer(str, config) if err != nil { fmt.Printf("create producer failed, err:%v\n", err) return err } return nil } func main() { nsqAddress := "127.0.0.1:4150" // nsqd的地址 err := initProducer(nsqAddress) if err != nil { fmt.Printf("init producer failed, err:%v\n", err) return } reader := bufio.NewReader(os.Stdin) // 從標准輸入讀取 for { data, err := reader.ReadString('\n') if err != nil { fmt.Printf("read string from stdin failed, err:%v\n", err) continue } data = strings.TrimSpace(data) // 去掉輸入內容前后的空格 if strings.ToUpper(data) == "Q" { // 輸入Q退出 break } // 向 'topic_demo' publish 數據 err = producer.Publish("topic_demo", []byte(data)) if err != nil { fmt.Printf("publish msg to nsq failed, err:%v\n", err) continue } } }

// nsq_consumer/main.go package main import ( "fmt" "os" "os/signal" "syscall" "time" "github.com/nsqio/go-nsq" ) // NSQ Consumer Demo // MyHandler 是一個消費者類型 type MyHandler struct { Title string } // HandleMessage 是需要實現的處理消息的方法 func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) { fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body)) return } // 初始化消費者 func initConsumer(topic string, channel string, address string) (err error) { config := nsq.NewConfig() config.LookupdPollInterval = 15 * time.Second // 15秒查詢一次有沒有新的nsqd節點加入進來 c, err := nsq.NewConsumer(topic, channel, config) if err != nil { fmt.Printf("create consumer failed, err:%v\n", err) return } // 創建一個結構體對象 consumer := &MyHandler{ Title: "鳴人一號", } c.AddHandler(consumer) // if err := c.ConnectToNSQD(address); err != nil { // 直接連NSQD if err := c.ConnectToNSQLookupd(address); err != nil { // 通過lookupd查詢 return err } return nil } func main() { err := initConsumer("topic_demo", "xxx", "127.0.0.1:4161") if err != nil { fmt.Printf("init consumer failed, err:%v\n", err) return } c := make(chan os.Signal) // 定義一個信號的通道 signal.Notify(c, syscall.SIGINT) // 轉發鍵盤中斷信號到c <-c // 阻塞 }