剖析nsq消息隊列(四) 消息的負載處理


剖析nsq消息隊列-目錄
實際應用中,一部分服務集群可能會同時訂閱同一個topic,並且處於同一個channel下。當nsqd有消息需要發送給訂閱客戶端去處理時,發給哪個客戶端是需要考慮的,也就是我要說的消息的負載。

如果不考慮負載情況,把隨機的把消息發送到某一個客服端去處理消息,如果機器的性能不同,可能發生的情況就是某一個或幾個客戶端處理速度慢,但還有大量新的消息需要處理,其他的客戶端處於空閑狀態。理想的狀態是,找到當前相對空閑的客戶端去處理消息。

nsq的處理方式是客戶端主動向nsqd報告自已的可處理消息數量(也就是RDY命令)。nsqd根據每個連接的客戶端的可處理消息的狀態來隨機把消息發送到可用的客戶端,來進行消息處理

如下圖所示:

客戶端更新RDY

從第一篇帖子的例子中我們就有配置consumer的config

	config := nsq.NewConfig()
	config.MaxInFlight = 1000
	config.MaxBackoffDuration = 5 * time.Second
	config.DialTimeout = 10 * time.Second

MaxInFlight 來設置最大的處理中的消息數量,會根據這個變量計算在是否更新RDY
初始化的時候 客戶端會向連接的nsqd服務端來發送updateRDY來設置最大處理數,

func (r *Consumer) maybeUpdateRDY(conn *Conn) {
	inBackoff := r.inBackoff()
	inBackoffTimeout := r.inBackoffTimeout()
	if inBackoff || inBackoffTimeout {
		r.log(LogLevelDebug, "(%s) skip sending RDY inBackoff:%v || inBackoffTimeout:%v",
			conn, inBackoff, inBackoffTimeout)
		return
	}

	remain := conn.RDY()
	lastRdyCount := conn.LastRDY()
	count := r.perConnMaxInFlight()

	// refill when at 1, or at 25%, or if connections have changed and we're imbalanced
	if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) {
		r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)",
			conn, count, remain, lastRdyCount)
		r.updateRDY(conn, count)
	} else {
		r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)",
			conn, count, remain, lastRdyCount)
	}
}

當剩余的可用處理數量remain 小於等於1,或者小於最后一次設置的可用數量lastRdyCount的1/4時,或者可用連接平均的maxInFlight大於0並且小於remain時,則更新RDY狀態

當有多個nsqd時,會把最大的消息進行平均計算:

// perConnMaxInFlight calculates the per-connection max-in-flight count.
//
// This may change dynamically based on the number of connections to nsqd the Consumer
// is responsible for.
func (r *Consumer) perConnMaxInFlight() int64 {
	b := float64(r.getMaxInFlight())
	s := b / float64(len(r.conns()))
	return int64(math.Min(math.Max(1, s), b))
}

當有消息從nsqd發送過來后也會調用maybeUpdateRDY方法,計算是否需要發送RDY命令

func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
	atomic.AddInt64(&r.totalRdyCount, -1)
	atomic.AddUint64(&r.messagesReceived, 1)
	r.incomingMessages <- msg
	r.maybeUpdateRDY(c)
}

上面就是主要的處理邏輯,但還有一些邏輯,就是當消息處理發生錯誤時,nsq有自己的退避算法backoff也會更新RDY 簡單來說就是當發現有處理錯誤時,來進行重試和指數退避,在退避期間RDY會為0,重試時會先放嘗試RDY為1看有沒有錯誤,如果沒有錯誤則全部放開,這個算法這篇帖子我就不詳細說了。

服務端nsqd選擇客戶端進行發送消息

同時訂閱同一topic的客戶端(comsumer)有很多個,每個客戶端根據自己的配置或狀態發送RDY命令到nsqd表明自己能處理多少消息量
nsqd服務端會檢查每個客戶端的的狀態是否可以發送消息。也就是IsReadyForMessages方法,判斷inFlightCount是否大於readyCount,如果大於或者等於就不再給客戶端發送數據,等待Ready后才會再給客戶端發送數據

func (c *clientV2) IsReadyForMessages() bool {
	if c.Channel.IsPaused() {
		return false
	}

	readyCount := atomic.LoadInt64(&c.ReadyCount)
	inFlightCount := atomic.LoadInt64(&c.InFlightCount)

	c.ctx.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCount, inFlightCount)

	if inFlightCount >= readyCount || readyCount <= 0 {
		return false
	}

	return true

每一次發送消息inFlightCount會+1並保存到發送中的隊列中,當客戶端發送FIN會-1在之前的帖子中有說過。

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
	// ...
	for {
		// 檢查訂閱狀態和消息是否可處理狀態	
		if subChannel == nil || !client.IsReadyForMessages() {
			// the client is not ready to receive messages...
			memoryMsgChan = nil
			backendMsgChan = nil
			flusherChan = nil
			// ...
			flushed = true
		} else if flushed {
			memoryMsgChan = subChannel.memoryMsgChan
			backendMsgChan = subChannel.backend.ReadChan()
			flusherChan = nil
		} else {
			memoryMsgChan = subChannel.memoryMsgChan
			backendMsgChan = subChannel.backend.ReadChan()
			flusherChan = outputBufferTicker.C
		}

		select {
		case <-flusherChan:
			// ...
		// 消息處理			
		case b := <-backendMsgChan:
			client.SendingMessage()
			// ...
		case msg := <-memoryMsgChan:
			client.SendingMessage()		
			//...
		}
	}
// ...
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM