NSQ源碼剖析(二):golang客戶端源碼及性能和故障分析


使用示例

NSQ的golang版客戶端:https://github.com/nsqio/go-nsq
提供了生產者和消費者的接口封裝。
doc.go文件描述的蠻清楚的:
生產者Producer

	// 創建一個生產者實例
	// 值得注意的是,Producer與Nsqd是一對一的關系,消息不會在Nsqd間傳輸
	// 所以如果要實現多Nsqd實例負載
	config := nsq.NewConfig()
	producer, err := nsq.NewProducer("127.0.0.1:4150", config)
	if err != nil {
		log.Fatal(err)
	}

	messageBody := []byte("hello")
	topicName := "topic"

	// 以同步的方式推送消息
	// 用戶也可以選擇其他的api方法以異步或批量的方式推送
	err = p.Publish(topicName, messageBody)
	if err != nil {
		log.Fatal(err)
	}

	// 優雅退出
	producer.Stop()

消費者Consumer

	type myMessageHandler struct {} // 自定義結構體,用於實現nsq.Handler接口

	// 由用戶實現的方法,用於實現nsq.Handler接口
	// nsq.Handler接口只有一個方法,即'HandleMessage(message *Message) error'
	// Consumer收到消息后,會回調此方法完成處理
	func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
		if len(m.Body) == 0 {
			// 空消息返回nil,將自動向Nsqd響應FIN用於銷毀消息
			return nil
		}

		err := processMessage(m.Body)

		// 消息消費異常時可以返回err,將向Nsqd響應REQ。此時本條消息將會重新入隊繼續推送
		return err
	}

	func main() {
		// 創建Consumer實例
		config := nsq.NewConfig()
		consumer, err := nsq.NewConsumer("topic", "channel", config)
		if err != nil {
			log.Fatal(err)
		}

		// 指定自定義的nsq.Handler接口,收到的消息通過回調此接口來消費
		consumer.AddHandler(&myMessageHandler{})

		// 通過nsqlookupd來實現服務自動發現,會定時獲取新的注冊信息。
		// 類似的還有:ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
		err = consumer.ConnectToNSQLookupd("localhost:4161")
		if err != nil {
			log.Fatal(err)
		}

		// 優雅退出
		consumer.Stop()
	}

配置參數

無論是生產者還是消費者,創建時均需要提供一個Config結構體,nsq.NewConfig()方法可以創建並生成一個有默認參數的Config。
Config源碼見config.go。

注意:

  • 只能通過nsq.NewConfig()方法來創建Config實例,該方法提供Config的默認初始化和驗證句柄。否則使用時可能會拋Panic
  • 可以調用Config.Set()方法來修改參數值。注意config實例傳遞給Producer或Consumer時是深度拷貝的,所以傳遞之后修改原有的config實例是無效的。
  • 參數有默認值和可設置的范圍限制,具體可參考下表。

下面是所有可選參數的介紹:
下表僅供參考,不同版本的代碼可能會有改動,請查看自己的config.go文件

參數 參數名 描述 類型 默認值 最小值 最大值
DialTimeout dial_timeout 創建連接的超時時間 time.Duration 1s
ReadTimeout read_timeout 讀超時時間 time.Duration 60s 100ms 5m
WriteTimeout write_timeout 寫超時時間 time.Duration 1s 100ms 5m
LocalAddr local_addr 本機地址 net.Addr 系統自動分配
LookupdPollInterval lookupd_poll_interval lookup間隔 time.Duration 60s 100ms 5m
LookupdPollJitter lookupd_poll_jitter 啟動lookupdLoop前的隨機抖動系數 float64 0.3 0 1
MaxRequeueDelay max_requeue_delay 允許的DPUB指令最大延時 time.Duration 15m 0 60m
DefaultRequeueDelay default_requeue_delay 默認的DPUB延時 time.Duration 90s 0 60m
BackoffStrategy backoff_strategy 出錯時的退避策略相關 BackoffStrategy ExponentialStrategy
MaxBackoffDuration max_backoff_duration 出錯時的退避策略相關 time.Duration 2m 0 60m
BackoffMultiplier backoff_multiplier 出錯時的退避策略相關 time.Duration 1s 0 60m
MaxAttempts max_attempts 最大的消費次數,超過的消息將丟棄 uint16 5 0 65535
LowRdyIdleTimeout low_rdy_idle_timeout 超過這個時間未收到消息則重置RDY time.Duration 10s 1s 5m
LowRdyTimeout low_rdy_timeout 超過這個時間未發送RDY則重置RDY time.Duration 30s 1s 5m
RDYRedistributeInterval rdy_redistribute_interval rdyLoop()檢查重置RDY條件的間隔 time.Duration 5s 1ms 5s
ClientID client_id string short hostname
Hostname hostname string
UserAgent user_agent string
HeartbeatInterval heartbeat_interval time.Duration 30s
SampleRate sample_rate int32 0 0 99
TlsV1 tls_v1 bool false
TlsConfig tls_config *tls.Config
Deflate deflate bool false
DeflateLevel deflate_level int 6 1 9
Snappy snappy bool false
OutputBufferSize output_buffer_size int64 16384
OutputBufferTimeout output_buffer_timeout time.Duration 250ms
MaxInFlight max_in_flight 允許的最大的處理中的消息數 int 1 0
MsgTimeout msg_timeout 消息超時時間,超過這個時間Nsqd會將消息重新入隊 time.Duration 0 0
AuthSecret auth_secret string

生產者Producer

總結

Producer源碼見producer.go。

  • Producer支持並發,底層通過一個channel將消息發送給一個獨立的協程來最終發布給Nsqd,從而解決並發沖突。
  • 一個Producer只支持一個Nsqd。如果要支持集群負載均衡,需要自己實現。

Producer提供了6個方法用於發布消息:

  • Publish():阻塞發布1條消息。底層調用"PUB"指令
  • PublishAsync():非阻塞發布1條消息。相比Publish(),多了一個額外的doneChan參數,通過此chan來異步接收發布結果。
  • MultiPublish():阻塞發布多條消息。底層調用"MPUB"指令
  • MultiPublishAsync():非阻塞發布多條消息。通過doneChan來異步接收發布結果。
  • DeferredPublish():阻塞發布1條帶延時的消息。相比Publish(),多了一個delay參數來指定延時多久才推送給消費者。底層調用"DPUB"指令
  • DeferredPublishAsync():非阻塞發布1條帶延時的消息。通過doneChan來異步接收發布結果。

Producer結構體

每一個Producer實例,均是一個結構體:

type Producer struct {
	id     int64			// 用於打印日志時標示實例。由instCount全局變量控制,從0開始,每創建一個Producer或Consumer時+1
	addr   string			// 連接的Nsqd的地址
	conn   producerConn		// 連接實例
	config Config			// 配置參數

	logger   []logger
	logLvl   LogLevel
	logGuard sync.RWMutex

	responseChan chan []byte	// conn收到Nsqd生產成功的響應后,通過此chan告知router(),router再通知到生產消息的線程
	errorChan    chan []byte	// conn收到Nsqd錯誤信息的響應后,通過此chan告知router(),router再通知到生產消息的線程
	closeChan    chan int		// conn斷開時通過此chan告知router()結束

	transactionChan chan *ProducerTransaction	// 生產過程將消息推送到這個chan,再異步接收成功的結果
	transactions    []*ProducerTransaction		// 一個先入先出隊列,用於router協程處理消息寫入結果
	state           int32						// 連接狀態,初始狀態/連接斷開/已連接

	concurrentProducers int32	// 統計正在等待發往transactionChan的消息數,Producer在退出前會將這些消息置為ErrNotConnected
	stopFlag            int32
	exitChan            chan int
	wg                  sync.WaitGroup	// 用於等待router()協程退出
	guard               sync.Mutex		// Producer全局鎖
}

Producer發布消息的方法介紹

Producer提供了6個方法用於發布消息:

  • Publish():阻塞發布1條消息。底層調用"PUB"指令
  • PublishAsync():非阻塞發布1條消息。相比Publish(),多了一個額外的doneChan參數,通過此chan來異步接收發布結果。
  • MultiPublish():阻塞發布多條消息。底層調用"MPUB"指令
  • MultiPublishAsync():非阻塞發布多條消息。通過doneChan來異步接收發布結果。
  • DeferredPublish():阻塞發布1條帶延時的消息。相比Publish(),多了一個delay參數來指定延時多久才推送給消費者。底層調用"DPUB"指令
  • DeferredPublishAsync():非阻塞發布1條帶延時的消息。通過doneChan來異步接收發布結果。

上述6個方法,最終均調用sendCommandAsync()方法來完成發送。
對於阻塞調用的3個方法,會先調用sendCommand(),創建一個臨時chan,再調用sendCommandAsync()。

  • sendCommand()方法創建一個臨時的doneChan來接收發布結果。
  • sendCommandAsync()負責將消息寫入Producer.transactionChan。下一章節的router協程負責接收並將消息發往Nsqd,並將發布結果通過doneChan返回。如果連接尚未創建,這里會自動重建連接。
func (w *Producer) sendCommand(cmd *Command) error {
	doneChan := make(chan *ProducerTransaction)		// 臨時的doneChan用於接收發布結果
	err := w.sendCommandAsync(cmd, doneChan, nil)	// 發送消息
	if err != nil {
		close(doneChan)
		return err
	}
	t := <-doneChan									// 等待發送結果
	return t.Error
}

func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
	args []interface{}) error {
	// keep track of how many outstanding producers we're dealing with
	// in order to later ensure that we clean them all up...
	atomic.AddInt32(&w.concurrentProducers, 1)
	defer atomic.AddInt32(&w.concurrentProducers, -1)

	if atomic.LoadInt32(&w.state) != StateConnected {
		err := w.connect()	// 未連接時自動重建連接
		if err != nil {
			return err
		}
	}

	t := &ProducerTransaction{
		cmd:      cmd,
		doneChan: doneChan,
		Args:     args,
	}

	select {
	case w.transactionChan <- t:	// 將消息發送給router協程
	case <-w.exitChan:
		return ErrStopped
	}

	return nil
}

sendCommand()方法是Producer實現並發的核心,即並發的消息發布,最終都會寫入transactionChan channel,由router協程獨立處理,不存在並發沖突。

Producer主要的輔助方法介紹

自動創建連接
nsq.NewProducer()方法只創建Producer實例,連接會在后續自動管理。
每次發送指令Ping()或sendCommandAsync()時,如果尚未連接,會自動調用Producer.connect()方法。

// 創建連接,修改連接狀態,啟動router()協程
func (w *Producer) connect() error

router協程異步發送和接收消息響應
Producer通過起一個router協程異步發送和接收消息響應的方式來實現Producer並發寫入的問題。無論用戶有多少個線程在生產消息,最終都得調用sendCommandAsync()方法將消息寫入一個chan,並由router單協程處理,這就避免了並發沖突。
router協程在上文的Producer.connect()方法被啟動。
Router()方法起了個for循環持續監聽幾個chan,我們重點關注:

  • transactionChan:所有消息最終均通過sendCommandAsync()方法寫入這個chan。router協程負責將從transactionChan收到的消息,寫入Producer.conn,並最終發送到Nsqd。
  • responseChan:Nsqd每正確接收到一個消息,會響應一個確認幀回來。Producer.conn則通過此chan來告知router消息寫入成功。
  • errorChan:同responseChan,收到錯誤信息時,通過此chan告知router。

無論是寫入成功還是有錯誤,router協程均調用popTransaction()方法來處理。這個方法有個細節,Nsqd並沒有告知寫入成功或失敗的消息是哪條,Producer又是怎么知道的呢?原理是底層使用的TCP通訊,同學們可以回想下TCP的特點,TCP是有序的。寫入消息的只有router一個協程,所以消息是按順序寫入的,恰恰Nsqd端也是單線程處理同一個生產者。所以router收到的響應,必然是針對transactions隊列中第1條消息的(這是一個用切片實現的先入先出隊列,router會在寫入conn的同時將消息寫入這個隊列)。

收到Nsqd的響應后,router將結束寫入ProducerTransaction.doneChan,用於通知消息的寫入協程。

func (w *Producer) router() {
	for {
		select {
		case t := <-w.transactionChan:	// 這是待發布的消息
			w.transactions = append(w.transactions, t)	// 先入先出隊列,用於處理消息發布結果
			err := w.conn.WriteCommand(t.cmd)			// 發布消息
			if err != nil {
				w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
				w.close()
			}
		case data := <-w.responseChan:	// 發布成功的響應
			w.popTransaction(FrameTypeResponse, data)	// 處理發布結果,將結果寫入doneChan
		case data := <-w.errorChan:		// 發布失敗的響應
			w.popTransaction(FrameTypeError, data)		// 處理發布結果,將結果寫入doneChan
		case <-w.closeChan:
			goto exit
		case <-w.exitChan:
			goto exit
		}
	}

exit:
	w.transactionCleanup()
	w.wg.Done()
	w.log(LogLevelInfo, "exiting router")
}

func (w *Producer) popTransaction(frameType int32, data []byte) {
	t := w.transactions[0]
	w.transactions = w.transactions[1:]		// 發布成功或失敗的消息,出隊
	if frameType == FrameTypeError {
		t.Error = ErrProtocol{string(data)}	// 發布失敗的錯誤信息
	}
	t.finish()								// 通知到doneChan
}

func (t *ProducerTransaction) finish() {
	if t.doneChan != nil {
		t.doneChan <- t
	}
}

Ping

// Ping方法一般用於剛創建的Producer實例。自動connect()方法創建連接,並發送一條Nop指令,以確認連接是否正常
func (w *Producer) Ping() error

優雅退出
注意:主動退出Producer時,建議使用Stop()方法,用於結束正在等待發送的消息,同時結束router協程。否則兩者將可能一直阻塞

// Stop()方法用於優雅退出當前Producer。
// 正在等待發送的消息將被置為ErrNotConnected或ErrStopped
func (w *Producer) Stop()

消費者Consumer

總結

Producer源碼見producer.go。

  • Producer支持並發,底層通過一個channel將消息發送給一個獨立的協程來最終發布給Nsqd,從而解決並發沖突。
  • 一個Producer只支持一個Nsqd。如果要支持集群負載均衡,需要自己實現。

Producer提供了6個方法用於發布消息:

  • Publish():阻塞發布1條消息。底層調用"PUB"指令
  • PublishAsync():非阻塞發布1條消息。相比Publish(),多了一個額外的doneChan參數,通過此chan來異步接收發布結果。
  • MultiPublish():阻塞發布多條消息。底層調用"MPUB"指令
  • MultiPublishAsync():非阻塞發布多條消息。通過doneChan來異步接收發布結果。
  • DeferredPublish():阻塞發布1條帶延時的消息。相比Publish(),多了一個delay參數來指定延時多久才推送給消費者。底層調用"DPUB"指令
  • DeferredPublishAsync():非阻塞發布1條帶延時的消息。通過doneChan來異步接收發布結果。

Consumer源碼見consumer.go。

  • Consumer支持並發,調用AddConcurrentHandlers()方法指定創建多個handlerLoop協程進行處理即可。
  • ConnectToNSQD()和ConnectToNSQDs()方法可以指定1個或多個Nsqd創建連接,但不具備服務發現和動態調整。
  • ConnectToNSQLookupd()和ConnectToNSQLookupds()方法可以指定1個或多個lookup創建連接,支持服務發現和定時動態調整。

Consumer結構體

每個Consumer實例對應於一個Consumer結構體:

type Consumer struct {
	// 64bit atomic vars need to be first for proper alignment on 32bit platforms
	messagesReceived uint64
	messagesFinished uint64
	messagesRequeued uint64
	totalRdyCount    int64
	backoffDuration  int64
	backoffCounter   int32
	maxInFlight      int32

	mtx sync.RWMutex

	logger   []logger
	logLvl   LogLevel
	logGuard sync.RWMutex

	behaviorDelegate interface{}

	id      int64
	topic   string
	channel string
	config  Config

	rngMtx sync.Mutex
	rng    *rand.Rand

	needRDYRedistributed int32

	backoffMtx sync.Mutex

	incomingMessages chan *Message

	rdyRetryMtx    sync.Mutex
	rdyRetryTimers map[string]*time.Timer

	pendingConnections map[string]*Conn
	connections        map[string]*Conn

	nsqdTCPAddrs []string

	// used at connection close to force a possible reconnect
	lookupdRecheckChan chan int
	lookupdHTTPAddrs   []string
	lookupdQueryIndex  int

	wg              sync.WaitGroup
	runningHandlers int32
	stopFlag        int32
	connectedFlag   int32
	stopHandler     sync.Once
	exitHandler     sync.Once

	// read from this channel to block until consumer is cleanly stopped
	StopChan chan int
	exitChan chan int
}

注冊回調函數和消息消費

Consumer每收到一條消息,會調用我們指定的回調接口來消費消息。
這個回調接口如下:

// 返回nil表示消費成功,Consumer將向Nsqd發送FIN指令銷毀消息。
// 非nil表示消費失敗或需要重復消費,Consumer將向Nsqd發送REQ指令將消息重新入隊推送。
type Handler interface {
	HandleMessage(message *Message) error
}

我們在啟動Consumer,需要先用一個結構體實現上述Handler接口,將調用AddHandler()方法將該結構體傳給Consumer:

/*自定義結構體示例*/
type myMessageHandler struct {} // 自定義結構體,用於實現nsq.Handler接口

func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
	if len(m.Body) == 0 {
		return nil
	}

	err := processMessage(m.Body)

	return err
}

// 在啟動Consumer前調用:consumer.AddHandler(&myMessageHandler{})方法將結構體傳遞給Consumer

每調用一次consumer.AddHandler()方法會啟動一個handlerLoop協程用於循環接收消息:

// 必須在Consumer連接之前調用AddHandler()方法,否則會拋panic
// 每調用一次AddHandler()就創建一個handlerLoop協程,可以多次調用來創建多個協程
func (r *Consumer) AddHandler(handler Handler) {
	r.AddConcurrentHandlers(handler, 1)
}

func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
	if atomic.LoadInt32(&r.connectedFlag) == 1 {
		panic("already connected") // 必須在Consumer連接之前調用AddHandler()方法,否則會拋panic
	}

	atomic.AddInt32(&r.runningHandlers, int32(concurrency))
	for i := 0; i < concurrency; i++ {
		go r.handlerLoop(handler) // 每調用一次AddHandler()就創建一個handlerLoop協程,可以多次調用來創建多個協程
	}
}

如果消費速度較慢,可以在連接之前直接調用AddConcurrentHandlers()以指定創建多個協程來並發處理

每個handlerLoop協程都在監聽incomingMessages,收到消息則調用Handler消費:

func (r *Consumer) handlerLoop(handler Handler) {
	r.log(LogLevelDebug, "starting Handler")

	for {
		message, ok := <-r.incomingMessages // 可以創建多個協程並發監聽這個channel
		if !ok {
			goto exit
		}

		if r.shouldFailMessage(message, handler) {
			message.Finish()
			continue
		}

		err := handler.HandleMessage(message)
		if err != nil {
			r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
			if !message.IsAutoResponseDisabled() {
				message.Requeue(-1)
			}
			continue
		}

		if !message.IsAutoResponseDisabled() {
			message.Finish()
		}
	}

exit:
	r.log(LogLevelDebug, "stopping Handler")
	if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
		r.exit()
	}
}

創建連接

Consumer提供了2個方法用於創建連接:

  • ConnectToNSQD():指定一個Nsqd連接。推薦使用lookupd以便自動服務發現。
  • ConnectToNSQDs():指定多個Nsqd連接。推薦使用lookupd以便自動服務發現。

注意,直接連接Nsqd的方法不推薦使用,建議使用下一章節的lookup服務發現,以便自動發現和管理。

ConnectToNSQDs()方法就是個for循環,調用多次ConnectToNSQD()。
ConnectToNSQD()方法比較長,關鍵的點是,如果未注冊回調方法,會報錯;置狀態為已連接;新建一個連接,建立連接時會發送"IDENTIFY"指令通告此連接的相關參數;向Nsqd發送Sub指令開始訂閱;訂閱前連接暫時放在pendingConnections中,訂閱成功后從pendingConnections移到connections中。
ConnectToNSQD()最終對connections中所有連接執行maybeUpdateRDY()方法用於調整RDY計數。

func (r *Consumer) ConnectToNSQD(addr string) error {
	if atomic.LoadInt32(&r.stopFlag) == 1 {
		return errors.New("consumer stopped")
	}

	if atomic.LoadInt32(&r.runningHandlers) == 0 { // 如果未注冊回調方法,會報錯
		return errors.New("no handlers")
	}

	atomic.StoreInt32(&r.connectedFlag, 1)

	conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
	conn.SetLoggerLevel(r.getLogLevel())
	format := fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel)
	for index := range r.logger {
		conn.SetLoggerForLevel(r.logger[index], LogLevel(index), format)
	}
	r.mtx.Lock()
	_, pendingOk := r.pendingConnections[addr]
	_, ok := r.connections[addr]
	if ok || pendingOk {
		r.mtx.Unlock()
		return ErrAlreadyConnected
	}
	r.pendingConnections[addr] = conn
	if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 {
		r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr)
	}
	r.mtx.Unlock()

	r.log(LogLevelInfo, "(%s) connecting to nsqd", addr)

	cleanupConnection := func() {
		r.mtx.Lock()
		delete(r.pendingConnections, addr)
		r.mtx.Unlock()
		conn.Close()
	}

	resp, err := conn.Connect()
	if err != nil {
		cleanupConnection()
		return err
	}

	if resp != nil {
		if resp.MaxRdyCount < int64(r.getMaxInFlight()) {
			r.log(LogLevelWarning,
				"(%s) max RDY count %d < consumer max in flight %d, truncation possible",
				conn.String(), resp.MaxRdyCount, r.getMaxInFlight())
		}
	}

	cmd := Subscribe(r.topic, r.channel)
	err = conn.WriteCommand(cmd)
	if err != nil {
		cleanupConnection()
		return fmt.Errorf("[%s] failed to subscribe to %s:%s - %s",
			conn, r.topic, r.channel, err.Error())
	}

	r.mtx.Lock()
	delete(r.pendingConnections, addr)
	r.connections[addr] = conn
	r.mtx.Unlock()

	// pre-emptive signal to existing connections to lower their RDY count
	for _, c := range r.conns() {
		r.maybeUpdateRDY(c)
	}

	return nil
}

maybeUpdateRDY()方法調用perConnMaxInFlight()獲取當前連接的最大允許RDY計數,最大允許RDY計數為配置參數maxInFlight / 當前連接數。其中maxInFligh如果未主動配置的話默認為1,那么所有連接的RDY均為1。之后再調用updateRDY()方法來調整RDY計數。最終每個連接會向Nsqd發送RDY指令調整計數。
整段代碼調用較長,也沒太多可分析的點,所以代碼就不貼了。
maybeUpdateRDY()只是初調整,還有一個由NewConsumer()啟動的rdyLoop()協程會定時根據每個連接的狀態進一步調整,調整原則是將長時間未收到消息的連接的RDY置為最小值1

講了這么多,那么消息是怎么接收並傳遞過來的呢?
前面的handlerLoop協程,監聽incomingMessages通道。所以我們全局搜incomingMessages通道,發現Consumer.onConnMessage()方法會向這個通道寫入消息。而這個方法由consumerConnDelegate.OnMessage()調用。進一步定位到每個連接的Conn.readLoop()方法。
Conn.readLoop()方法的邏輯比較簡單,就是一個for循環阻塞等待Nsqd推送消息,解析后如果是一條消息,則調用consumerConnDelegate.OnMessage()將消息寫入到incomingMessages通道。

func (c *Conn) readLoop() {
	for {
		frameType, data, err := ReadUnpackedResponse(c) // 阻塞等待Nsqd推送消息,並完成消息解析
		
		switch frameType {
		case FrameTypeResponse:
			c.delegate.OnResponse(c, data)
		case FrameTypeMessage:
			msg, err := DecodeMessage(data)
			if err != nil {
				c.log(LogLevelError, "IO error - %s", err)
				c.delegate.OnIOError(c, err)
				goto exit
			}
			msg.Delegate = delegate
			msg.NSQDAddress = c.String()

			atomic.AddInt64(&c.messagesInFlight, 1)
			atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())

			c.delegate.OnMessage(c, msg)	// 將消息寫入incomingMessages通道
		case FrameTypeError:
			c.log(LogLevelError, "protocol error - %s", data)
			c.delegate.OnError(c, data)
		default:
			c.log(LogLevelError, "IO error - %s", err)
			c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
		}
	}
}

服務發現和自動連接管理

Consumer提供了2個方法用於服務發現:

  • ConnectToNSQLookupd():通過lookupd服務發現,再自動創建連接。支持定時更新服務發現。
  • ConnectToNSQLookupds():通過多個lookupd服務發現,再自動創建連接。支持定時更新服務發現。

ConnectToNSQLookupds()方法就是為每個lookupd調用一次ConnectToNSQLookupd()。
我們來關注ConnectToNSQLookupd()做了啥。ConnectToNSQLookupd()也要求必須先注冊回調。對於第1個lookup,創建lookupdLoop協程完成自動的服務發現。后續的lookup只需要加入到lookupdHTTPAddrs中,由lookupdLoop協程來協調。

func (r *Consumer) ConnectToNSQLookupd(addr string) error {
	if atomic.LoadInt32(&r.stopFlag) == 1 {
		return errors.New("consumer stopped")
	}
	if atomic.LoadInt32(&r.runningHandlers) == 0 {
		return errors.New("no handlers") // 同ConnectToNSQD(),必須先注冊回調
	}

	if err := validatedLookupAddr(addr); err != nil {
		return err
	}

	atomic.StoreInt32(&r.connectedFlag, 1) // 置連接位

	r.mtx.Lock()
	for _, x := range r.lookupdHTTPAddrs {
		if x == addr {
			r.mtx.Unlock()
			return nil
		}
	}
	r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)	// 將lookupd的addr加入到lookupdHTTPAddrs切片
	numLookupd := len(r.lookupdHTTPAddrs)
	r.mtx.Unlock()

	// 只有第1個loopupd才會創建lookupdLoop協程。
	if numLookupd == 1 {
		r.queryLookupd()	// 執行一次服務發現,里面最多重復3次,如果都失敗了,也不影響程序繼續運行
		r.wg.Add(1)
		go r.lookupdLoop()	// 創建lookupdLoop協程完成自動的服務發現
	}

	return nil
}

我們重點看下lookupdLoop協程的工作,這是服務發現和自動管理的關鍵:
lookupdLoop協程啟動時先用jitter變量執行了一段隨機延時,用於防止多個消費者監聽同一Topic且同時重啟的情況。由r.config.LookupdPollInterval變量創建了一個定時器,這是定時服務發現的間隔,查看config.go可知這個參數最小10ms,最大5m,默認為60s。
接着lookupdLoop協程進入for循環,主要做兩件事,一是通過ticker.C定時做一次服務發現,二是通過r.lookupdRecheckChan做一次服務發現,這個通道是在連接異常斷開時會觸發。

func (r *Consumer) lookupdLoop() {
	// add some jitter so that multiple consumers discovering the same topic,
	// when restarted at the same time, dont all connect at once.
	r.rngMtx.Lock()
	jitter := time.Duration(int64(r.rng.Float64() *
		r.config.LookupdPollJitter * float64(r.config.LookupdPollInterval)))
	r.rngMtx.Unlock()
	var ticker *time.Ticker

	select {
	case <-time.After(jitter):
	case <-r.exitChan:
		goto exit
	}

	ticker = time.NewTicker(r.config.LookupdPollInterval)

	for {
		select {
		case <-ticker.C:
			r.queryLookupd()
		case <-r.lookupdRecheckChan:
			r.queryLookupd()
		case <-r.exitChan:
			goto exit
		}
	}

exit:
	if ticker != nil {
		ticker.Stop()
	}
	r.log(LogLevelInfo, "exiting lookupdLoop")
	r.wg.Done()
}

具體的服務發現由queryLookupd()方法完成,該方法每次執行調用nextLookupdEndpoint()方法取下個lookup並通過http的方式發送lookup指令,參數包含需要消費的Topic。最多請求3個lookup,有1次成功即停止,如果3次都失敗就退出等待下一次服務發現。lookup返回lookupResp結構體,內容為包含請求的Topic的Nsqd相關信息列表:

type lookupResp struct {
	Channels  []string    `json:"channels"`
	Producers []*peerInfo `json:"producers"`
	Timestamp int64       `json:"timestamp"`
}

服務發現信息包含該lookup的channel列表和生產者列表。
然后對每一個生產者的Nsqd調用ConnectToNSQD()方法,該方法會完成去重,已經連接的Nsqd不會重復連接。

性能和故障分析

Consumer

消費者Consumer客戶端對並發和負載較友好,可以通過以下手段來增強並發和負載:

  • 同時連接多個Nsqd。注意Nsqd之間不共享數據,需要生產者端將消息寫入不同的Nsqd。
  • 調用AddConcurrentHandlers來啟用多個消息處理協程,增大消息消費的能力。
  • 合適配置參數增加網絡吞吐量。注意參數配置需要配套合理,比如如果將Flight消息數提高很高,但消費能力不行,就有可能出現Nsqd端大量消息超時的問題。可以提升網絡吞吐量的參數主要有這幾個:
    • MaxInFlight:該參數控制Nsqd端允許的已發送但尚未收到FIN幀的消息數,增大該值可以增大網絡中正在發送的消息數。
    • OutputBufferSize:增大Nsqd端發送緩沖區的大小。

Producer

一條一條發布消息較慢,如果可以,建議用MPUB相關的接口批量發布。

但一個Nsqd處理能力有限,有時我們需要集群來負載。遺憾的是,Producer與Nsqd是一對一的關系。可以有以下思路優化:

  • 不同的Topic發布到不同的Nsqd上。
  • 將消息分散發布到多個Nsqd上,需要自行實現。

重復消費

即使只發布一份的消息,在極端情況下也存在重復消費的可能,當某條消息超時(回顧:NSQ源碼剖析(一):NSQD主要結構方法和消息生產消費過程 ),消息會重新入隊。入隊之后如果消費者端因網絡或性能問題現在才完成消費,此時FIN指令將被響應"E_FIN_FAILED"。消息還在隊列中等待第二次推送。

消費順序

Nsqd不保證消息推送順序,這與延時/超時/磁盤隊列等設計有關,這里不展開討論,請回顧:NSQ源碼剖析(一):NSQD主要結構方法和消息生產消費過程

單點問題

存在兩種單點問題,一是異常退出的Nsqd會丟失掉尚在內存的Msg;二是與宕機的Nsqd相連的Producer將無法工作。

宕機時丟失內存Msg
可以有兩種思路進行規避:

  • 降低內存channel的大小,甚至是0,讓所有數據都壓入磁盤隊列。這種方式會降低性能,同時磁盤隊列本身也有內存緩沖,存在丟失可能。
  • 消息副本。生產者發布消息時,創建副本發布到不同的Nsqd上。這要求消費者有能力識別出重復消費的情況,或者實現消費冪等。

Nsqd宕機導致Producer不工作
前面介紹過Producer與Nsqd是一對一的關系,一旦Nsqd宕機,與之相連的Producer也將無法工作。這種情況可以自行實現異常檢測和故障轉移,比如檢測到宕機時連接到另一台Nsqd實例;或者實現一個自動的負載均衡機制。
無論使用哪種方式,都要求Consumer端支持服務發現,這點上官方api就支持,使用ConnectToNSQLookupd()或ConnectToNSQLookupds()即可。

由於時間因素,本來打算實現一個Producer的負載均衡算法的,現在來不及。有興趣的同學可以嘗試下,或者關注我的github,歡迎參與進來討論和研究。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM