go【第十二篇】消息隊列


消息隊列是架構級解耦方案,常用於流量削峰、應用解耦、異步處理

消息隊列之NSQ

NSQ是目前比較流行的一個分布式的消息隊列,本文主要介紹了NSQ及Go語言如何操作NSQ。

NSQ介紹

NSQ是Go語言編寫的一個開源的實時分布式內存消息隊列,其性能十分優異。 NSQ的優勢有以下優勢:

NSQ提倡分布式和分散的拓撲,沒有單點故障,支持容錯和高可用性,並提供可靠的消息交付保證
NSQ支持橫向擴展,沒有任何集中式代理。
NSQ易於配置和部署,並且內置了管理界面。

安裝

官方下載頁面根據自己的平台下載並解壓即可。

啟動nsqd:	nsqd -lookupd-tcp-address=127.0.0.1:4160
	[nsqd] 2019/07/18 16:02:50.968403 INFO: nsqd v1.1.0 (built w/go1.10.3)
	[nsqd] 2019/07/18 16:02:51.013659 INFO: ID: 826
	[nsqd] 2019/07/18 16:02:51.014577 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
	[nsqd] 2019/07/18 16:02:51.035655 INFO: HTTP: listening on [::]:4151
	[nsqd] 2019/07/18 16:02:51.035655 INFO: LOOKUP(127.0.0.1:4160): adding peer
	[nsqd] 2019/07/18 16:02:51.038262 INFO: LOOKUP connecting to 127.0.0.1:4160
	[nsqd] 2019/07/18 16:02:51.035655 INFO: TCP: listening on [::]:4150

啟動nsqd:	nsqlookupd
	[nsqlookupd] 2019/07/18 15:59:34.219793 INFO: nsqlookupd v1.1.0 (built w/go1.10.3)
	[nsqlookupd] 2019/07/18 15:59:34.279192 INFO: TCP: listening on [::]:4160
	[nsqlookupd] 2019/07/18 15:59:34.279192 INFO: HTTP: listening on [::]:4161
	
運行nsqadmin管理:	nsqadmin -lookupd-http-address=127.0.0.1:4161
	[nsqadmin] 2019/07/18 15:59:54.169512 INFO: nsqadmin v1.1.0 (built w/go1.10.3)
	[nsqadmin] 2019/07/18 15:59:54.213611 INFO: HTTP: listening on [::]:4171
	
創建topic
	http://127.0.0.1:4171/lookup
	topic_demo4
	
	
	

Go操作NSQ

// nsq_producer/main.go
package main

import (
    "bufio"
    "fmt"
    "os"
    "strings"

    "github.com/nsqio/go-nsq"
)

// NSQ Producer Demo

var producer *nsq.Producer

// 初始化生產者
func initProducer(str string) (err error) {
    config := nsq.NewConfig()
    producer, err = nsq.NewProducer(str, config)
    if err != nil {
        fmt.Printf("create producer failed, err:%v\n", err)
        return err
    }
    return nil
}

func main() {
    nsqAddress := "127.0.0.1:4150" // nsqd的地址
    err := initProducer(nsqAddress)
    if err != nil {
        fmt.Printf("init producer failed, err:%v\n", err)
        return
    }

    reader := bufio.NewReader(os.Stdin) // 從標准輸入讀取
    for {
        data, err := reader.ReadString('\n')
        if err != nil {
            fmt.Printf("read string from stdin failed, err:%v\n", err)
            continue
        }
        data = strings.TrimSpace(data) // 去掉輸入內容前后的空格
        if strings.ToUpper(data) == "Q" { // 輸入Q退出
            break
        }
        // 向 'topic_demo' publish 數據
        err = producer.Publish("topic_demo", []byte(data))
        if err != nil {
            fmt.Printf("publish msg to nsq failed, err:%v\n", err)
            continue
        }
    }
}
生產者
// nsq_consumer/main.go
package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/nsqio/go-nsq"
)

// NSQ Consumer Demo

// MyHandler 是一個消費者類型
type MyHandler struct {
    Title string
}

// HandleMessage 是需要實現的處理消息的方法
func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
    fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))
    return
}

// 初始化消費者
func initConsumer(topic string, channel string, address string) (err error) {
    config := nsq.NewConfig()
    config.LookupdPollInterval = 15 * time.Second // 15秒查詢一次有沒有新的nsqd節點加入進來
    c, err := nsq.NewConsumer(topic, channel, config)
    if err != nil {
        fmt.Printf("create consumer failed, err:%v\n", err)
        return
    }
    // 創建一個結構體對象
    consumer := &MyHandler{
        Title: "鳴人一號",
    }
    c.AddHandler(consumer)

    // if err := c.ConnectToNSQD(address); err != nil { // 直接連NSQD
    if err := c.ConnectToNSQLookupd(address); err != nil { // 通過lookupd查詢
        return err
    }
    return nil

}

func main() {
    err := initConsumer("topic_demo", "xxx", "127.0.0.1:4161")
    if err != nil {
        fmt.Printf("init consumer failed, err:%v\n", err)
        return
    }
    c := make(chan os.Signal)        // 定義一個信號的通道
    signal.Notify(c, syscall.SIGINT) // 轉發鍵盤中斷信號到c
    <-c                              // 阻塞
}
消費者

 

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM