使用示例
NSQ的golang版客戶端:https://github.com/nsqio/go-nsq
提供了生產者和消費者的接口封裝。
doc.go文件描述的蠻清楚的:
生產者Producer
// 創建一個生產者實例
// 值得注意的是,Producer與Nsqd是一對一的關系,消息不會在Nsqd間傳輸
// 所以如果要實現多Nsqd實例負載
config := nsq.NewConfig()
producer, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Fatal(err)
}
messageBody := []byte("hello")
topicName := "topic"
// 以同步的方式推送消息
// 用戶也可以選擇其他的api方法以異步或批量的方式推送
err = p.Publish(topicName, messageBody)
if err != nil {
log.Fatal(err)
}
// 優雅退出
producer.Stop()
消費者Consumer
type myMessageHandler struct {} // 自定義結構體,用於實現nsq.Handler接口
// 由用戶實現的方法,用於實現nsq.Handler接口
// nsq.Handler接口只有一個方法,即'HandleMessage(message *Message) error'
// Consumer收到消息后,會回調此方法完成處理
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
if len(m.Body) == 0 {
// 空消息返回nil,將自動向Nsqd響應FIN用於銷毀消息
return nil
}
err := processMessage(m.Body)
// 消息消費異常時可以返回err,將向Nsqd響應REQ。此時本條消息將會重新入隊繼續推送
return err
}
func main() {
// 創建Consumer實例
config := nsq.NewConfig()
consumer, err := nsq.NewConsumer("topic", "channel", config)
if err != nil {
log.Fatal(err)
}
// 指定自定義的nsq.Handler接口,收到的消息通過回調此接口來消費
consumer.AddHandler(&myMessageHandler{})
// 通過nsqlookupd來實現服務自動發現,會定時獲取新的注冊信息。
// 類似的還有:ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
err = consumer.ConnectToNSQLookupd("localhost:4161")
if err != nil {
log.Fatal(err)
}
// 優雅退出
consumer.Stop()
}
配置參數
無論是生產者還是消費者,創建時均需要提供一個Config結構體,nsq.NewConfig()方法可以創建並生成一個有默認參數的Config。
Config源碼見config.go。
注意:
- 只能通過nsq.NewConfig()方法來創建Config實例,該方法提供Config的默認初始化和驗證句柄。否則使用時可能會拋Panic
- 可以調用Config.Set()方法來修改參數值。注意config實例傳遞給Producer或Consumer時是深度拷貝的,所以傳遞之后修改原有的config實例是無效的。
- 參數有默認值和可設置的范圍限制,具體可參考下表。
下面是所有可選參數的介紹:
下表僅供參考,不同版本的代碼可能會有改動,請查看自己的config.go文件
| 參數 | 參數名 | 描述 | 類型 | 默認值 | 最小值 | 最大值 |
|---|---|---|---|---|---|---|
| DialTimeout | dial_timeout | 創建連接的超時時間 | time.Duration | 1s | ||
| ReadTimeout | read_timeout | 讀超時時間 | time.Duration | 60s | 100ms | 5m |
| WriteTimeout | write_timeout | 寫超時時間 | time.Duration | 1s | 100ms | 5m |
| LocalAddr | local_addr | 本機地址 | net.Addr | 系統自動分配 | ||
| LookupdPollInterval | lookupd_poll_interval | lookup間隔 | time.Duration | 60s | 100ms | 5m |
| LookupdPollJitter | lookupd_poll_jitter | 啟動lookupdLoop前的隨機抖動系數 | float64 | 0.3 | 0 | 1 |
| MaxRequeueDelay | max_requeue_delay | 允許的DPUB指令最大延時 | time.Duration | 15m | 0 | 60m |
| DefaultRequeueDelay | default_requeue_delay | 默認的DPUB延時 | time.Duration | 90s | 0 | 60m |
| BackoffStrategy | backoff_strategy | 出錯時的退避策略相關 | BackoffStrategy | ExponentialStrategy | ||
| MaxBackoffDuration | max_backoff_duration | 出錯時的退避策略相關 | time.Duration | 2m | 0 | 60m |
| BackoffMultiplier | backoff_multiplier | 出錯時的退避策略相關 | time.Duration | 1s | 0 | 60m |
| MaxAttempts | max_attempts | 最大的消費次數,超過的消息將丟棄 | uint16 | 5 | 0 | 65535 |
| LowRdyIdleTimeout | low_rdy_idle_timeout | 超過這個時間未收到消息則重置RDY | time.Duration | 10s | 1s | 5m |
| LowRdyTimeout | low_rdy_timeout | 超過這個時間未發送RDY則重置RDY | time.Duration | 30s | 1s | 5m |
| RDYRedistributeInterval | rdy_redistribute_interval | rdyLoop()檢查重置RDY條件的間隔 | time.Duration | 5s | 1ms | 5s |
| ClientID | client_id | string | short hostname | |||
| Hostname | hostname | string | ||||
| UserAgent | user_agent | string | ||||
| HeartbeatInterval | heartbeat_interval | time.Duration | 30s | |||
| SampleRate | sample_rate | int32 | 0 | 0 | 99 | |
| TlsV1 | tls_v1 | bool | false | |||
| TlsConfig | tls_config | *tls.Config | ||||
| Deflate | deflate | bool | false | |||
| DeflateLevel | deflate_level | int | 6 | 1 | 9 | |
| Snappy | snappy | bool | false | |||
| OutputBufferSize | output_buffer_size | int64 | 16384 | |||
| OutputBufferTimeout | output_buffer_timeout | time.Duration | 250ms | |||
| MaxInFlight | max_in_flight | 允許的最大的處理中的消息數 | int | 1 | 0 | |
| MsgTimeout | msg_timeout | 消息超時時間,超過這個時間Nsqd會將消息重新入隊 | time.Duration | 0 | 0 | |
| AuthSecret | auth_secret | string |
生產者Producer
總結
Producer源碼見producer.go。
- Producer支持並發,底層通過一個channel將消息發送給一個獨立的協程來最終發布給Nsqd,從而解決並發沖突。
- 一個Producer只支持一個Nsqd。如果要支持集群負載均衡,需要自己實現。
Producer提供了6個方法用於發布消息:
- Publish():阻塞發布1條消息。底層調用"PUB"指令
- PublishAsync():非阻塞發布1條消息。相比Publish(),多了一個額外的doneChan參數,通過此chan來異步接收發布結果。
- MultiPublish():阻塞發布多條消息。底層調用"MPUB"指令
- MultiPublishAsync():非阻塞發布多條消息。通過doneChan來異步接收發布結果。
- DeferredPublish():阻塞發布1條帶延時的消息。相比Publish(),多了一個delay參數來指定延時多久才推送給消費者。底層調用"DPUB"指令
- DeferredPublishAsync():非阻塞發布1條帶延時的消息。通過doneChan來異步接收發布結果。
Producer結構體
每一個Producer實例,均是一個結構體:
type Producer struct {
id int64 // 用於打印日志時標示實例。由instCount全局變量控制,從0開始,每創建一個Producer或Consumer時+1
addr string // 連接的Nsqd的地址
conn producerConn // 連接實例
config Config // 配置參數
logger []logger
logLvl LogLevel
logGuard sync.RWMutex
responseChan chan []byte // conn收到Nsqd生產成功的響應后,通過此chan告知router(),router再通知到生產消息的線程
errorChan chan []byte // conn收到Nsqd錯誤信息的響應后,通過此chan告知router(),router再通知到生產消息的線程
closeChan chan int // conn斷開時通過此chan告知router()結束
transactionChan chan *ProducerTransaction // 生產過程將消息推送到這個chan,再異步接收成功的結果
transactions []*ProducerTransaction // 一個先入先出隊列,用於router協程處理消息寫入結果
state int32 // 連接狀態,初始狀態/連接斷開/已連接
concurrentProducers int32 // 統計正在等待發往transactionChan的消息數,Producer在退出前會將這些消息置為ErrNotConnected
stopFlag int32
exitChan chan int
wg sync.WaitGroup // 用於等待router()協程退出
guard sync.Mutex // Producer全局鎖
}
Producer發布消息的方法介紹
Producer提供了6個方法用於發布消息:
- Publish():阻塞發布1條消息。底層調用"PUB"指令
- PublishAsync():非阻塞發布1條消息。相比Publish(),多了一個額外的doneChan參數,通過此chan來異步接收發布結果。
- MultiPublish():阻塞發布多條消息。底層調用"MPUB"指令
- MultiPublishAsync():非阻塞發布多條消息。通過doneChan來異步接收發布結果。
- DeferredPublish():阻塞發布1條帶延時的消息。相比Publish(),多了一個delay參數來指定延時多久才推送給消費者。底層調用"DPUB"指令
- DeferredPublishAsync():非阻塞發布1條帶延時的消息。通過doneChan來異步接收發布結果。
上述6個方法,最終均調用sendCommandAsync()方法來完成發送。
對於阻塞調用的3個方法,會先調用sendCommand(),創建一個臨時chan,再調用sendCommandAsync()。
- sendCommand()方法創建一個臨時的doneChan來接收發布結果。
- sendCommandAsync()負責將消息寫入Producer.transactionChan。下一章節的router協程負責接收並將消息發往Nsqd,並將發布結果通過doneChan返回。如果連接尚未創建,這里會自動重建連接。
func (w *Producer) sendCommand(cmd *Command) error {
doneChan := make(chan *ProducerTransaction) // 臨時的doneChan用於接收發布結果
err := w.sendCommandAsync(cmd, doneChan, nil) // 發送消息
if err != nil {
close(doneChan)
return err
}
t := <-doneChan // 等待發送結果
return t.Error
}
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
args []interface{}) error {
// keep track of how many outstanding producers we're dealing with
// in order to later ensure that we clean them all up...
atomic.AddInt32(&w.concurrentProducers, 1)
defer atomic.AddInt32(&w.concurrentProducers, -1)
if atomic.LoadInt32(&w.state) != StateConnected {
err := w.connect() // 未連接時自動重建連接
if err != nil {
return err
}
}
t := &ProducerTransaction{
cmd: cmd,
doneChan: doneChan,
Args: args,
}
select {
case w.transactionChan <- t: // 將消息發送給router協程
case <-w.exitChan:
return ErrStopped
}
return nil
}
sendCommand()方法是Producer實現並發的核心,即並發的消息發布,最終都會寫入transactionChan channel,由router協程獨立處理,不存在並發沖突。
Producer主要的輔助方法介紹
自動創建連接
nsq.NewProducer()方法只創建Producer實例,連接會在后續自動管理。
每次發送指令Ping()或sendCommandAsync()時,如果尚未連接,會自動調用Producer.connect()方法。
// 創建連接,修改連接狀態,啟動router()協程
func (w *Producer) connect() error
router協程異步發送和接收消息響應
Producer通過起一個router協程異步發送和接收消息響應的方式來實現Producer並發寫入的問題。無論用戶有多少個線程在生產消息,最終都得調用sendCommandAsync()方法將消息寫入一個chan,並由router單協程處理,這就避免了並發沖突。
router協程在上文的Producer.connect()方法被啟動。
Router()方法起了個for循環持續監聽幾個chan,我們重點關注:
- transactionChan:所有消息最終均通過sendCommandAsync()方法寫入這個chan。router協程負責將從transactionChan收到的消息,寫入Producer.conn,並最終發送到Nsqd。
- responseChan:Nsqd每正確接收到一個消息,會響應一個確認幀回來。Producer.conn則通過此chan來告知router消息寫入成功。
- errorChan:同responseChan,收到錯誤信息時,通過此chan告知router。
無論是寫入成功還是有錯誤,router協程均調用popTransaction()方法來處理。這個方法有個細節,Nsqd並沒有告知寫入成功或失敗的消息是哪條,Producer又是怎么知道的呢?原理是底層使用的TCP通訊,同學們可以回想下TCP的特點,TCP是有序的。寫入消息的只有router一個協程,所以消息是按順序寫入的,恰恰Nsqd端也是單線程處理同一個生產者。所以router收到的響應,必然是針對transactions隊列中第1條消息的(這是一個用切片實現的先入先出隊列,router會在寫入conn的同時將消息寫入這個隊列)。
收到Nsqd的響應后,router將結束寫入ProducerTransaction.doneChan,用於通知消息的寫入協程。
func (w *Producer) router() {
for {
select {
case t := <-w.transactionChan: // 這是待發布的消息
w.transactions = append(w.transactions, t) // 先入先出隊列,用於處理消息發布結果
err := w.conn.WriteCommand(t.cmd) // 發布消息
if err != nil {
w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
w.close()
}
case data := <-w.responseChan: // 發布成功的響應
w.popTransaction(FrameTypeResponse, data) // 處理發布結果,將結果寫入doneChan
case data := <-w.errorChan: // 發布失敗的響應
w.popTransaction(FrameTypeError, data) // 處理發布結果,將結果寫入doneChan
case <-w.closeChan:
goto exit
case <-w.exitChan:
goto exit
}
}
exit:
w.transactionCleanup()
w.wg.Done()
w.log(LogLevelInfo, "exiting router")
}
func (w *Producer) popTransaction(frameType int32, data []byte) {
t := w.transactions[0]
w.transactions = w.transactions[1:] // 發布成功或失敗的消息,出隊
if frameType == FrameTypeError {
t.Error = ErrProtocol{string(data)} // 發布失敗的錯誤信息
}
t.finish() // 通知到doneChan
}
func (t *ProducerTransaction) finish() {
if t.doneChan != nil {
t.doneChan <- t
}
}
Ping
// Ping方法一般用於剛創建的Producer實例。自動connect()方法創建連接,並發送一條Nop指令,以確認連接是否正常
func (w *Producer) Ping() error
優雅退出
注意:主動退出Producer時,建議使用Stop()方法,用於結束正在等待發送的消息,同時結束router協程。否則兩者將可能一直阻塞
// Stop()方法用於優雅退出當前Producer。
// 正在等待發送的消息將被置為ErrNotConnected或ErrStopped
func (w *Producer) Stop()
消費者Consumer
總結
Producer源碼見producer.go。
- Producer支持並發,底層通過一個channel將消息發送給一個獨立的協程來最終發布給Nsqd,從而解決並發沖突。
- 一個Producer只支持一個Nsqd。如果要支持集群負載均衡,需要自己實現。
Producer提供了6個方法用於發布消息:
- Publish():阻塞發布1條消息。底層調用"PUB"指令
- PublishAsync():非阻塞發布1條消息。相比Publish(),多了一個額外的doneChan參數,通過此chan來異步接收發布結果。
- MultiPublish():阻塞發布多條消息。底層調用"MPUB"指令
- MultiPublishAsync():非阻塞發布多條消息。通過doneChan來異步接收發布結果。
- DeferredPublish():阻塞發布1條帶延時的消息。相比Publish(),多了一個delay參數來指定延時多久才推送給消費者。底層調用"DPUB"指令
- DeferredPublishAsync():非阻塞發布1條帶延時的消息。通過doneChan來異步接收發布結果。
Consumer源碼見consumer.go。
- Consumer支持並發,調用AddConcurrentHandlers()方法指定創建多個handlerLoop協程進行處理即可。
- ConnectToNSQD()和ConnectToNSQDs()方法可以指定1個或多個Nsqd創建連接,但不具備服務發現和動態調整。
- ConnectToNSQLookupd()和ConnectToNSQLookupds()方法可以指定1個或多個lookup創建連接,支持服務發現和定時動態調整。
Consumer結構體
每個Consumer實例對應於一個Consumer結構體:
type Consumer struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
messagesReceived uint64
messagesFinished uint64
messagesRequeued uint64
totalRdyCount int64
backoffDuration int64
backoffCounter int32
maxInFlight int32
mtx sync.RWMutex
logger []logger
logLvl LogLevel
logGuard sync.RWMutex
behaviorDelegate interface{}
id int64
topic string
channel string
config Config
rngMtx sync.Mutex
rng *rand.Rand
needRDYRedistributed int32
backoffMtx sync.Mutex
incomingMessages chan *Message
rdyRetryMtx sync.Mutex
rdyRetryTimers map[string]*time.Timer
pendingConnections map[string]*Conn
connections map[string]*Conn
nsqdTCPAddrs []string
// used at connection close to force a possible reconnect
lookupdRecheckChan chan int
lookupdHTTPAddrs []string
lookupdQueryIndex int
wg sync.WaitGroup
runningHandlers int32
stopFlag int32
connectedFlag int32
stopHandler sync.Once
exitHandler sync.Once
// read from this channel to block until consumer is cleanly stopped
StopChan chan int
exitChan chan int
}
注冊回調函數和消息消費
Consumer每收到一條消息,會調用我們指定的回調接口來消費消息。
這個回調接口如下:
// 返回nil表示消費成功,Consumer將向Nsqd發送FIN指令銷毀消息。
// 非nil表示消費失敗或需要重復消費,Consumer將向Nsqd發送REQ指令將消息重新入隊推送。
type Handler interface {
HandleMessage(message *Message) error
}
我們在啟動Consumer,需要先用一個結構體實現上述Handler接口,將調用AddHandler()方法將該結構體傳給Consumer:
/*自定義結構體示例*/
type myMessageHandler struct {} // 自定義結構體,用於實現nsq.Handler接口
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
if len(m.Body) == 0 {
return nil
}
err := processMessage(m.Body)
return err
}
// 在啟動Consumer前調用:consumer.AddHandler(&myMessageHandler{})方法將結構體傳遞給Consumer
每調用一次consumer.AddHandler()方法會啟動一個handlerLoop協程用於循環接收消息:
// 必須在Consumer連接之前調用AddHandler()方法,否則會拋panic
// 每調用一次AddHandler()就創建一個handlerLoop協程,可以多次調用來創建多個協程
func (r *Consumer) AddHandler(handler Handler) {
r.AddConcurrentHandlers(handler, 1)
}
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
if atomic.LoadInt32(&r.connectedFlag) == 1 {
panic("already connected") // 必須在Consumer連接之前調用AddHandler()方法,否則會拋panic
}
atomic.AddInt32(&r.runningHandlers, int32(concurrency))
for i := 0; i < concurrency; i++ {
go r.handlerLoop(handler) // 每調用一次AddHandler()就創建一個handlerLoop協程,可以多次調用來創建多個協程
}
}
如果消費速度較慢,可以在連接之前直接調用AddConcurrentHandlers()以指定創建多個協程來並發處理
每個handlerLoop協程都在監聽incomingMessages,收到消息則調用Handler消費:
func (r *Consumer) handlerLoop(handler Handler) {
r.log(LogLevelDebug, "starting Handler")
for {
message, ok := <-r.incomingMessages // 可以創建多個協程並發監聽這個channel
if !ok {
goto exit
}
if r.shouldFailMessage(message, handler) {
message.Finish()
continue
}
err := handler.HandleMessage(message)
if err != nil {
r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
if !message.IsAutoResponseDisabled() {
message.Requeue(-1)
}
continue
}
if !message.IsAutoResponseDisabled() {
message.Finish()
}
}
exit:
r.log(LogLevelDebug, "stopping Handler")
if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
r.exit()
}
}
創建連接
Consumer提供了2個方法用於創建連接:
- ConnectToNSQD():指定一個Nsqd連接。推薦使用lookupd以便自動服務發現。
- ConnectToNSQDs():指定多個Nsqd連接。推薦使用lookupd以便自動服務發現。
注意,直接連接Nsqd的方法不推薦使用,建議使用下一章節的lookup服務發現,以便自動發現和管理。
ConnectToNSQDs()方法就是個for循環,調用多次ConnectToNSQD()。
ConnectToNSQD()方法比較長,關鍵的點是,如果未注冊回調方法,會報錯;置狀態為已連接;新建一個連接,建立連接時會發送"IDENTIFY"指令通告此連接的相關參數;向Nsqd發送Sub指令開始訂閱;訂閱前連接暫時放在pendingConnections中,訂閱成功后從pendingConnections移到connections中。
ConnectToNSQD()最終對connections中所有連接執行maybeUpdateRDY()方法用於調整RDY計數。
func (r *Consumer) ConnectToNSQD(addr string) error {
if atomic.LoadInt32(&r.stopFlag) == 1 {
return errors.New("consumer stopped")
}
if atomic.LoadInt32(&r.runningHandlers) == 0 { // 如果未注冊回調方法,會報錯
return errors.New("no handlers")
}
atomic.StoreInt32(&r.connectedFlag, 1)
conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
conn.SetLoggerLevel(r.getLogLevel())
format := fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel)
for index := range r.logger {
conn.SetLoggerForLevel(r.logger[index], LogLevel(index), format)
}
r.mtx.Lock()
_, pendingOk := r.pendingConnections[addr]
_, ok := r.connections[addr]
if ok || pendingOk {
r.mtx.Unlock()
return ErrAlreadyConnected
}
r.pendingConnections[addr] = conn
if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 {
r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr)
}
r.mtx.Unlock()
r.log(LogLevelInfo, "(%s) connecting to nsqd", addr)
cleanupConnection := func() {
r.mtx.Lock()
delete(r.pendingConnections, addr)
r.mtx.Unlock()
conn.Close()
}
resp, err := conn.Connect()
if err != nil {
cleanupConnection()
return err
}
if resp != nil {
if resp.MaxRdyCount < int64(r.getMaxInFlight()) {
r.log(LogLevelWarning,
"(%s) max RDY count %d < consumer max in flight %d, truncation possible",
conn.String(), resp.MaxRdyCount, r.getMaxInFlight())
}
}
cmd := Subscribe(r.topic, r.channel)
err = conn.WriteCommand(cmd)
if err != nil {
cleanupConnection()
return fmt.Errorf("[%s] failed to subscribe to %s:%s - %s",
conn, r.topic, r.channel, err.Error())
}
r.mtx.Lock()
delete(r.pendingConnections, addr)
r.connections[addr] = conn
r.mtx.Unlock()
// pre-emptive signal to existing connections to lower their RDY count
for _, c := range r.conns() {
r.maybeUpdateRDY(c)
}
return nil
}
maybeUpdateRDY()方法調用perConnMaxInFlight()獲取當前連接的最大允許RDY計數,最大允許RDY計數為配置參數maxInFlight / 當前連接數。其中maxInFligh如果未主動配置的話默認為1,那么所有連接的RDY均為1。之后再調用updateRDY()方法來調整RDY計數。最終每個連接會向Nsqd發送RDY指令調整計數。
整段代碼調用較長,也沒太多可分析的點,所以代碼就不貼了。
maybeUpdateRDY()只是初調整,還有一個由NewConsumer()啟動的rdyLoop()協程會定時根據每個連接的狀態進一步調整,調整原則是將長時間未收到消息的連接的RDY置為最小值1
講了這么多,那么消息是怎么接收並傳遞過來的呢?
前面的handlerLoop協程,監聽incomingMessages通道。所以我們全局搜incomingMessages通道,發現Consumer.onConnMessage()方法會向這個通道寫入消息。而這個方法由consumerConnDelegate.OnMessage()調用。進一步定位到每個連接的Conn.readLoop()方法。
Conn.readLoop()方法的邏輯比較簡單,就是一個for循環阻塞等待Nsqd推送消息,解析后如果是一條消息,則調用consumerConnDelegate.OnMessage()將消息寫入到incomingMessages通道。
func (c *Conn) readLoop() {
for {
frameType, data, err := ReadUnpackedResponse(c) // 阻塞等待Nsqd推送消息,並完成消息解析
switch frameType {
case FrameTypeResponse:
c.delegate.OnResponse(c, data)
case FrameTypeMessage:
msg, err := DecodeMessage(data)
if err != nil {
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, err)
goto exit
}
msg.Delegate = delegate
msg.NSQDAddress = c.String()
atomic.AddInt64(&c.messagesInFlight, 1)
atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())
c.delegate.OnMessage(c, msg) // 將消息寫入incomingMessages通道
case FrameTypeError:
c.log(LogLevelError, "protocol error - %s", data)
c.delegate.OnError(c, data)
default:
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
}
}
}
服務發現和自動連接管理
Consumer提供了2個方法用於服務發現:
- ConnectToNSQLookupd():通過lookupd服務發現,再自動創建連接。支持定時更新服務發現。
- ConnectToNSQLookupds():通過多個lookupd服務發現,再自動創建連接。支持定時更新服務發現。
ConnectToNSQLookupds()方法就是為每個lookupd調用一次ConnectToNSQLookupd()。
我們來關注ConnectToNSQLookupd()做了啥。ConnectToNSQLookupd()也要求必須先注冊回調。對於第1個lookup,創建lookupdLoop協程完成自動的服務發現。后續的lookup只需要加入到lookupdHTTPAddrs中,由lookupdLoop協程來協調。
func (r *Consumer) ConnectToNSQLookupd(addr string) error {
if atomic.LoadInt32(&r.stopFlag) == 1 {
return errors.New("consumer stopped")
}
if atomic.LoadInt32(&r.runningHandlers) == 0 {
return errors.New("no handlers") // 同ConnectToNSQD(),必須先注冊回調
}
if err := validatedLookupAddr(addr); err != nil {
return err
}
atomic.StoreInt32(&r.connectedFlag, 1) // 置連接位
r.mtx.Lock()
for _, x := range r.lookupdHTTPAddrs {
if x == addr {
r.mtx.Unlock()
return nil
}
}
r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr) // 將lookupd的addr加入到lookupdHTTPAddrs切片
numLookupd := len(r.lookupdHTTPAddrs)
r.mtx.Unlock()
// 只有第1個loopupd才會創建lookupdLoop協程。
if numLookupd == 1 {
r.queryLookupd() // 執行一次服務發現,里面最多重復3次,如果都失敗了,也不影響程序繼續運行
r.wg.Add(1)
go r.lookupdLoop() // 創建lookupdLoop協程完成自動的服務發現
}
return nil
}
我們重點看下lookupdLoop協程的工作,這是服務發現和自動管理的關鍵:
lookupdLoop協程啟動時先用jitter變量執行了一段隨機延時,用於防止多個消費者監聽同一Topic且同時重啟的情況。由r.config.LookupdPollInterval變量創建了一個定時器,這是定時服務發現的間隔,查看config.go可知這個參數最小10ms,最大5m,默認為60s。
接着lookupdLoop協程進入for循環,主要做兩件事,一是通過ticker.C定時做一次服務發現,二是通過r.lookupdRecheckChan做一次服務發現,這個通道是在連接異常斷開時會觸發。
func (r *Consumer) lookupdLoop() {
// add some jitter so that multiple consumers discovering the same topic,
// when restarted at the same time, dont all connect at once.
r.rngMtx.Lock()
jitter := time.Duration(int64(r.rng.Float64() *
r.config.LookupdPollJitter * float64(r.config.LookupdPollInterval)))
r.rngMtx.Unlock()
var ticker *time.Ticker
select {
case <-time.After(jitter):
case <-r.exitChan:
goto exit
}
ticker = time.NewTicker(r.config.LookupdPollInterval)
for {
select {
case <-ticker.C:
r.queryLookupd()
case <-r.lookupdRecheckChan:
r.queryLookupd()
case <-r.exitChan:
goto exit
}
}
exit:
if ticker != nil {
ticker.Stop()
}
r.log(LogLevelInfo, "exiting lookupdLoop")
r.wg.Done()
}
具體的服務發現由queryLookupd()方法完成,該方法每次執行調用nextLookupdEndpoint()方法取下個lookup並通過http的方式發送lookup指令,參數包含需要消費的Topic。最多請求3個lookup,有1次成功即停止,如果3次都失敗就退出等待下一次服務發現。lookup返回lookupResp結構體,內容為包含請求的Topic的Nsqd相關信息列表:
type lookupResp struct {
Channels []string `json:"channels"`
Producers []*peerInfo `json:"producers"`
Timestamp int64 `json:"timestamp"`
}
服務發現信息包含該lookup的channel列表和生產者列表。
然后對每一個生產者的Nsqd調用ConnectToNSQD()方法,該方法會完成去重,已經連接的Nsqd不會重復連接。
性能和故障分析
Consumer
消費者Consumer客戶端對並發和負載較友好,可以通過以下手段來增強並發和負載:
- 同時連接多個Nsqd。注意Nsqd之間不共享數據,需要生產者端將消息寫入不同的Nsqd。
- 調用AddConcurrentHandlers來啟用多個消息處理協程,增大消息消費的能力。
- 合適配置參數增加網絡吞吐量。注意參數配置需要配套合理,比如如果將Flight消息數提高很高,但消費能力不行,就有可能出現Nsqd端大量消息超時的問題。可以提升網絡吞吐量的參數主要有這幾個:
- MaxInFlight:該參數控制Nsqd端允許的已發送但尚未收到FIN幀的消息數,增大該值可以增大網絡中正在發送的消息數。
- OutputBufferSize:增大Nsqd端發送緩沖區的大小。
Producer
一條一條發布消息較慢,如果可以,建議用MPUB相關的接口批量發布。
但一個Nsqd處理能力有限,有時我們需要集群來負載。遺憾的是,Producer與Nsqd是一對一的關系。可以有以下思路優化:
- 不同的Topic發布到不同的Nsqd上。
- 將消息分散發布到多個Nsqd上,需要自行實現。
重復消費
即使只發布一份的消息,在極端情況下也存在重復消費的可能,當某條消息超時(回顧:NSQ源碼剖析(一):NSQD主要結構方法和消息生產消費過程 ),消息會重新入隊。入隊之后如果消費者端因網絡或性能問題現在才完成消費,此時FIN指令將被響應"E_FIN_FAILED"。消息還在隊列中等待第二次推送。
消費順序
Nsqd不保證消息推送順序,這與延時/超時/磁盤隊列等設計有關,這里不展開討論,請回顧:NSQ源碼剖析(一):NSQD主要結構方法和消息生產消費過程
單點問題
存在兩種單點問題,一是異常退出的Nsqd會丟失掉尚在內存的Msg;二是與宕機的Nsqd相連的Producer將無法工作。
宕機時丟失內存Msg
可以有兩種思路進行規避:
- 降低內存channel的大小,甚至是0,讓所有數據都壓入磁盤隊列。這種方式會降低性能,同時磁盤隊列本身也有內存緩沖,存在丟失可能。
- 消息副本。生產者發布消息時,創建副本發布到不同的Nsqd上。這要求消費者有能力識別出重復消費的情況,或者實現消費冪等。
Nsqd宕機導致Producer不工作
前面介紹過Producer與Nsqd是一對一的關系,一旦Nsqd宕機,與之相連的Producer也將無法工作。這種情況可以自行實現異常檢測和故障轉移,比如檢測到宕機時連接到另一台Nsqd實例;或者實現一個自動的負載均衡機制。
無論使用哪種方式,都要求Consumer端支持服務發現,這點上官方api就支持,使用ConnectToNSQLookupd()或ConnectToNSQLookupds()即可。
由於時間因素,本來打算實現一個Producer的負載均衡算法的,現在來不及。有興趣的同學可以嘗試下,或者關注我的github,歡迎參與進來討論和研究。
