一 環境依賴:
golang 開發環境(version >= 1.2) 下源碼,配置環境變量,執行安裝腳本
gpm 依賴包管理器 ubantu: sudo apt-get intall gpm
二 NSQ安裝:
- git獲取源碼: mkdir -p $GOPATH/src/github.com/nsqio;cd $GOPATH/src/github.com/nsqio;git clone https://github.com/nsqio/nsq.git;cd nsq
- 安裝依賴包: gpm install
- 安裝NSQ: go install ./...
三 開啟NSQ:
- nsqd節點維護進程:nsqlookupd &
- nsqd節點進程:nsqd --lookupd-tcp-address=127.0.0.1:4160 &
- 消息產看進程:nsqadmin --lookupd-http-address=127.0.0.1:4161 &
ps:nsqlookupd與nsqadmin為輔助進程,可不使用直接用nsqd也可正常工作.
這里開啟的進程均用默認的端口
四 工具測試:
- curl -d 'hello world 1' 'http://127.0.0.1:4151/put?topic=test' // 產生一個topic為“test” 消息內容為“hello world”的消息
- nsq_to_file --topic=test --output-dir=./tmp --lookupd-http-address=127.0.0.1:4161 // 將topic為“test”的消息寫到./tmp目錄下的一個文件中
五 代碼測試:
1 package main 2 3 import ( 4 "fmt" 5 "time" 6 7 "github.com/nsqio/go-nsq" 8 ) 9 10 // nsq發布消息 11 func Producer() { 12 p, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig()) // 新建生產者 13 if err != nil { 14 panic(err) 15 } 16 17 if err := p.Publish("test", []byte("hello NSQ!!!")); err != nil { // 發布消息 18 panic(err) 19 } 20 } 21 22 // nsq訂閱消息 23 type ConsumerT struct{} 24 25 func (*ConsumerT) HandleMessage(msg *nsq.Message) error { 26 fmt.Println(string(msg.Body)) 27 return nil 28 } 29 30 func Consumer() { 31 c, err := nsq.NewConsumer("test", "test-channel", nsq.NewConfig()) // 新建一個消費者 32 if err != nil { 33 panic(err) 34 } 35 c.AddHandler(&ConsumerT{}) // 添加消息處理 36 if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil { // 建立連接 37 panic(err) 38 } 39 } 40 // 主函數 41 func main() { 42 Producer() 43 Consumer() 44 time.Sleep(time.Second * 3) 45 } 46 // 運行將會打印: hello NSQ!!!
六 使用總結:
單機使用條件,同步發布消息速度也非常快(10w/s),發布消息端基本無需再做緩存封裝。接收端的消息處理應耗時盡量的短,避免消息積累,當消息積累到NSQ的緩存的數量會將多余的消息寫到文件,此時也會減緩發送端的發送速度,
因此,對接收端可使用channel和go routine做簡單封裝處理。若某topic消息產生太快太多也可將其單獨使用一個nsqd處理,避免消息積累影響其它消息投遞與接收。
