剖析nsq消息隊列(三) 消息傳輸的可靠性和持久化[一]


剖析nsq消息隊列-目錄

上兩篇帖子主要說了一下nsq的拓撲結構,如何進行故障處理和橫向擴展,保證了客戶端和服務端的長連接,連接保持了,就要傳輸數據了,nsq如何保證消息被訂閱者消費,如何保證消息不丟失,就是今天要闡述的內容。

nsq topic、channel、和消費我客戶端的結構如上圖,一個topic下有多個channel每個channel可以被多個客戶端訂閱。
消息處理的大概流程:當一個消息被nsq接收后,傳給相應的topic,topic把消息傳遞給所有的channel ,channel根據算法選擇一個訂閱客戶端,把消息發送給客戶端進行處理。
看上去這個流程是沒有問題的,我們來思考幾個問題

  • 網絡傳輸的不確定性,比如超時;客戶端處理消息時崩潰等,消息如何重傳;
  • 如何標識消息被客戶端成功處理完畢;
  • 消息的持久化,nsq服務端重新啟動時消息不丟失;

服務端對發送中的消息處理邏輯

之前的帖子說過客戶端和服務端進行連接后,會啟動一個gorouting來發送信息給客戶端

	go p.messagePump(client, messagePumpStartedChan)

然后會監聽客戶端發過來的命令client.Reader.ReadSlice('\n')
服務端會定時檢查client端的連接狀態,讀取客戶端發過來的各種命令,發送心跳等。每一個連接最終的目的就是監聽channel的消息,發送給客戶端進行消費。
當有消息發送給訂閱客戶端的時候,當然選擇哪個client也是有無則的,這個以后講,

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
	// ...
    for {
		// ...
		case b := <-backendMsgChan:
			if sampleRate > 0 && rand.Int31n(100) > sampleRate {
				continue
			}

			msg, err := decodeMessage(b)
			if err != nil {
				p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
				continue
			}
			msg.Attempts++

			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()
			err = p.SendMessage(client, msg)
			if err != nil {
				goto exit
			}
			flushed = false
		case msg := <-memoryMsgChan:
			if sampleRate > 0 && rand.Int31n(100) > sampleRate {
				continue
			}
			msg.Attempts++

			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()
			err = p.SendMessage(client, msg)
			if err != nil {
				goto exit
			}
			flushed = false
		case <-client.ExitChan:
			goto exit
		}
	}

// ...
}
        

看一下這個方法調用subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout),在發送給客戶端之前,把這個消息設置為在飛翔中,

// pushInFlightMessage atomically adds a message to the in-flight dictionary
func (c *Channel) pushInFlightMessage(msg *Message) error {
	c.inFlightMutex.Lock()
	_, ok := c.inFlightMessages[msg.ID]
	if ok {
		c.inFlightMutex.Unlock()
		return errors.New("ID already in flight")
	}
	c.inFlightMessages[msg.ID] = msg
	c.inFlightMutex.Unlock()
	return nil
}

然后發送給客戶端進行處理。
在發送中的數據,存在的各種不確定性,nsq的處理方式是:對發送給客戶端信息設置為在飛翔中,如果在如果處理成功就把這個消息從飛翔中的狀態中去掉,如果在規定的時間內沒有收到客戶端的反饋,則認為這個消息超時,然后重新歸隊,兩次進行處理。所以無論是哪種特殊情況,nsq統一認為消息為超時。

服務端處理超時消息

nsq對超時消息的處理,借鑒了redis的過期算法,但也不太一樣redis的更復雜一些,因為redis是單線程的,還要處理占用cpu時間等等,nsq因為gorouting的存在要很簡單很多。
簡單來說,就是在nsq啟動的時候啟動協程去處理channel的過期數據

func (n *NSQD) Main() error {
	// ...
	// 啟動協程去處理channel的過期數據    
	n.waitGroup.Wrap(n.queueScanLoop)
	n.waitGroup.Wrap(n.lookupLoop)
	if n.getOpts().StatsdAddress != "" {
		n.waitGroup.Wrap(n.statsdLoop)
	}

	err := <-exitCh
	return err
}

當然不是每一個channel啟動一個協程來處理過期數據,而是有一些規定,我們看一下一些默認值,然后再展開講算法

	return &Options{
		// ...

		HTTPClientConnectTimeout: 2 * time.Second,
		HTTPClientRequestTimeout: 5 * time.Second,
		// 內存最大隊列數
		MemQueueSize:    10000,
		MaxBytesPerFile: 100 * 1024 * 1024,
		SyncEvery:       2500,
		SyncTimeout:     2 * time.Second,

		// 掃描channel的時間間隔
		QueueScanInterval:        100 * time.Millisecond,
		// 刷新掃描的時間間隔        
		QueueScanRefreshInterval: 5 * time.Second,
		QueueScanSelectionCount:  20,
		// 最大的掃描池數量        
		QueueScanWorkerPoolMax:   4,
		// 標識百分比        
		QueueScanDirtyPercent:    0.25,
		// 消息超時
		MsgTimeout:    60 * time.Second,
		MaxMsgTimeout: 15 * time.Minute,
		MaxMsgSize:    1024 * 1024,
		MaxBodySize:   5 * 1024 * 1024,
		MaxReqTimeout: 1 * time.Hour,
		ClientTimeout: 60 * time.Second,

		// ...
	}

這些參數都可以在啟動nsq的時候根據自己需要來指定,我們主要說一下這幾個:

  • QueueScanWorkerPoolMax就是最大協程數,默認是4,這個數是掃描所有channel的最大協程數,當然channel的數量小於這個參數的話,就調整協程的數量,以最小的為准,比如channel的數量為2個,而默認的是4個,那就調掃描的數量為2
  • QueueScanSelectionCount 每次掃描最大的channel數量,默認是20,如果channel的數量小於這個值,則以channel的數量為准。
  • QueueScanDirtyPercent 標識臟數據 channel的百分比,默認為0.25,eg: channel數量為10,則一次最多掃描10個,查看每個channel是否有過期的數據,如果有,則標記為這個channel是有臟數據的,如果有臟數據的channel的數量 占這次掃描的10個channel的比例超過這個百分比,則直接再次進行掃描一次,而不用等到下一次時間點。
  • QueueScanInterval 掃描channel的時間間隔,默認的是每100毫秒掃描一次。
  • QueueScanRefreshInterval 刷新掃描的時間間隔 目前的處理方式是調整channel的協程數量。
    這也就是nsq處理過期數據的算法,總結一下就是,使用協程定時去掃描隨機的channel里是否有過期數據。
func (n *NSQD) queueScanLoop() {
	workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
	responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
	closeCh := make(chan int)

	workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
	refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

	channels := n.channels()
	n.resizePool(len(channels), workCh, responseCh, closeCh)

	for {
		select {
		case <-workTicker.C:
			if len(channels) == 0 {
				continue
			}
		case <-refreshTicker.C:
			channels = n.channels()
			n.resizePool(len(channels), workCh, responseCh, closeCh)
			continue
		case <-n.exitChan:
			goto exit
		}

		num := n.getOpts().QueueScanSelectionCount
		if num > len(channels) {
			num = len(channels)
		}

	loop:
		// 隨機channel    
		for _, i := range util.UniqRands(num, len(channels)) {
			workCh <- channels[i]
		}

		numDirty := 0
		for i := 0; i < num; i++ {
			if <-responseCh {
				numDirty++
			}
		}

		if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
			goto loop
		}
	}

exit:
	n.logf(LOG_INFO, "QUEUESCAN: closing")
	close(closeCh)
	workTicker.Stop()
	refreshTicker.Stop()
}

在掃描channel的時候,如果發現有過期數據后,會重新放回到隊列,進行重發操作。

func (c *Channel) processInFlightQueue(t int64) bool {
	// ...
	for {
		c.inFlightMutex.Lock()
		msg, _ := c.inFlightPQ.PeekAndShift(t)
		c.inFlightMutex.Unlock()

		if msg == nil {
			goto exit
		}
		dirty = true

		_, err := c.popInFlightMessage(msg.clientID, msg.ID)
		if err != nil {
			goto exit
		}
		atomic.AddUint64(&c.timeoutCount, 1)
		c.RLock()
		client, ok := c.clients[msg.clientID]
		c.RUnlock()
		if ok {
			client.TimedOutMessage()
		}
		//重新放回隊列進行消費處理。      
		c.put(msg)
	}

exit:
	return dirty
}

客戶端對消息的處理和響應

之前的帖子中的例子中有說過,客戶端要消費消息,需要實現接口

type Handler interface {
	HandleMessage(message *Message) error
}

在服務端發送消息給客戶端后,如果在處理業務邏輯時,如果發生錯誤則給服務器發送Requeue命令告訴服務器,重新發送消息進處理。如果處理成功,則發送Finish命令

func (r *Consumer) handlerLoop(handler Handler) {
	r.log(LogLevelDebug, "starting Handler")

	for {
		message, ok := <-r.incomingMessages
		if !ok {
			goto exit
		}

		if r.shouldFailMessage(message, handler) {
			message.Finish()
			continue
		}

		err := handler.HandleMessage(message)
		if err != nil {
			r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
			if !message.IsAutoResponseDisabled() {
				message.Requeue(-1)
			}
			continue
		}

		if !message.IsAutoResponseDisabled() {
			message.Finish()
		}
	}

exit:
	r.log(LogLevelDebug, "stopping Handler")
	if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
		r.exit()
	}
}

服務端收到命令后,對飛翔中的消息進行處理,如果成功則去掉,如果是Requeue則執行歸隊和重發操作,或者進行defer隊列處理。

消息的持久化

默認的情況下,只有內存隊列不足時MemQueueSize:10000時,才會把數據保存到文件內進行持久到硬盤。

	select {
	case c.memoryMsgChan <- m:
	default:
		b := bufferPoolGet()
		err := writeMessageToBackend(b, m, c.backend)
		bufferPoolPut(b)
		c.ctx.nsqd.SetHealth(err)
		if err != nil {
			c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
				c.name, err)
			return err
		}
	}
	return nil

如果將 --mem-queue-size 設置為 0,所有的消息將會存儲到磁盤。我們不用擔心消息會丟失,nsq 內部機制保證在程序關閉時將隊列中的數據持久化到硬盤,重啟后就會恢復。
nsq自己開發了一個庫go-diskqueue來持久會消息到內存。這個庫的代碼量不多,理解起來也不難,代碼邏輯我想下一篇再講。
看一下保存在硬盤后的樣子:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM