NSQ源碼剖析(一):NSQD主要結構方法和消息生產消費過程


1 概述

NSQ包含3個組件:

  • nsqd:每個nsq實例運行一個nsqd進程,負責接收生產者消息、向nsqlookupd注冊、向消費者推送消息
  • nsqlookupd:集群注冊中心,可以有多個,負責接收nsqd的注冊信息,向消費者提供服務發現
  • nsqadmin:用於監控和管理的web ui

生產者將消息寫入到指定的主題Topic,同一個Topic下則可以關聯多個管道Channel,每個Channel都會傳輸對應Topic的完整副本。
消費者則訂閱Channel的消息。於是多個消費者訂閱不同的Channel的話,他們各自都能拿到完整的消息副本;但如果多個消費者訂閱同一個Channel,則是共享的,即消息會隨機發送給其中一個消費者。

接下來我們來分析下nsq的源碼:

nsq各組件均使用上述代碼倉庫,通過apps目錄下的不同的main包啟動。比如nsqd的main函數在apps/nsqd目錄下,其他類同。

本文檔主要分析nsqd的主要結構體和方法,及消息生產和消費的過程。主要以TCP api為例來分析,HTTP/HTTPS的api類同。

2 主要結構體及方法

2.1 NSQD

nsqd/nsqd.go文件,NSQD是主實例,一個nsqd進程創建一個nsqd結構體實例,並通過此結構體的Main()方法啟動所有的服務。

type NSQD struct {
	clientIDSequence int64 // 遞增的客戶端ID,每個客戶端連接均從這里取一個遞增后的ID作為唯一標識

	sync.RWMutex

	opts atomic.Value // 參數選項,真實類型是apps/nsqd/option.go:Options結構體

	dl        *dirlock.DirLock
	isLoading int32
	errValue  atomic.Value
	startTime time.Time

	topicMap map[string]*Topic	// 保存當前所有的topic

	clientLock sync.RWMutex
	clients    map[int64]Client

	lookupPeers atomic.Value

	tcpServer     *tcpServer
	tcpListener   net.Listener
	httpListener  net.Listener
	httpsListener net.Listener
	tlsConfig     *tls.Config

	poolSize int				// 當前工作協程組的協程數量

	notifyChan           chan interface{}
	optsNotificationChan chan struct{}
	exitChan             chan int
	waitGroup            util.WaitGroupWrapper

	ci *clusterinfo.ClusterInfo
}

主要方法

/*
	程序啟動時調用本方法,執行下面的動作:
		- 啟動TCP/HTTP/HTTPS服務
		- 啟動工作協程組:NSQD.queueScanLoop
		- 啟動服務注冊:NSQD.lookupLoop
*/
func (n *NSQD) Main() error

// 負責管理工作協程組的數量,每調用一次NSQD.queueScanWorker()方法啟動一個工作協程
func (n *NSQD) queueScanLoop()

// 由queueScanLoop()調用,負責啟動工作協程組並動態調整協程數量。工作協程的數量為當前的channel數 * 0.25
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int)

// 這是具體的工作協程,監聽workCh,對收到的待處理Channel做兩個動作,一是將超時的消息重新入隊;二是將到時間的延時消息入隊
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int)

/*
	lookupLoop()方法與nsqlookupd建立連接,負責向nsqlookupd注冊topic,並定時發送心跳包
*/
func (n *NSQD) lookupLoop()

2.2 tcpServer

nsqd/tcp.go文件,tcpServer通過Handle()方法接收TCP請求。
tcpServer是nsqd結構的成員,全局也就只有一個實例,但在protocol包的TCPServer方法中,每創建一個新的連接,均會調用一次tcpServer.Handle()

type tcpServer struct {
	ctx   *context
	conns sync.Map
}

主要方法

/*
	p.nsqd.Main()啟動protocol.TCPServer(),這個方法里會為每個客戶端連接創建一個新協程,協程執行tcpServer.Handle()方法
	本方法首先對新連接讀取4字節校驗版本,新連接必須首先發送4字節"  V2"。
	然后阻塞調用nsqd.protocolV2.IOLoop()處理客戶端接下來的請求。
*/
func (p *tcpServer) Handle(clientConn net.Conn)

2.3 protocolV2

nsqd/protocol_v2.go文件,protocolV2負責處理過來的具體的用戶請求。
每個連接均會創建一個獨立的protocolV2實例(由tcpServer.Handle()創建)

type protocolV2 struct {
	ctx *context
}

主要方法

/*
	tcpServer.Handle()阻塞調用本方法
	1. 啟用一個獨立協程向消費者推送消息protocolV2.messagePump()
	2. for循環接收並處理客戶端請求protocolV2.Exec()
*/
func (p *protocolV2) IOLoop(conn net.Conn) error

// 組裝消息並調用protocolV2.Send()發送給消費者
func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error

// 向客戶端發送數據幀
func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error

// 解析客戶端請求的指令,調用對應的指令方法
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error)

// 負責向消費者推送消息
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool)

// 下面這組方法是NSQD支持的指令對應的處理方法
func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) AUTH(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) RDY(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) CLS(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) NOP(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error)
func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error)

2.4 clientV2

nsqd/client_v2.go文件,保存每個客戶端的連接信息。
clientV2實例由protocolV2.IOLoop()創建,每個連接均有一個獨立的實例。

type clientV2 struct {
	// 64bit atomic vars need to be first for proper alignment on 32bit platforms
	ReadyCount    int64
	InFlightCount int64
	MessageCount  uint64
	FinishCount   uint64
	RequeueCount  uint64

	pubCounts map[string]uint64

	writeLock sync.RWMutex
	metaLock  sync.RWMutex

	ID        int64
	ctx       *context
	UserAgent string

	// original connection
	net.Conn

	// connections based on negotiated features
	tlsConn     *tls.Conn
	flateWriter *flate.Writer

	// reading/writing interfaces
	Reader *bufio.Reader
	Writer *bufio.Writer

	OutputBufferSize    int
	OutputBufferTimeout time.Duration

	HeartbeatInterval time.Duration

	MsgTimeout time.Duration

	State          int32
	ConnectTime    time.Time
	Channel        *Channel
	ReadyStateChan chan int
	ExitChan       chan int

	ClientID string
	Hostname string

	SampleRate int32

	IdentifyEventChan chan identifyEvent
	SubEventChan      chan *Channel

	TLS     int32
	Snappy  int32
	Deflate int32

	// re-usable buffer for reading the 4-byte lengths off the wire
	lenBuf   [4]byte
	lenSlice []byte

	AuthSecret string
	AuthState  *auth.State
}

2.5 Topic

nsqd/topic.go文件,對應於每個topic實例,由系統啟動時創建或者發布消息/消費消息時自動創建。

type Topic struct {
	messageCount uint64	// 累計消息數
	messageBytes uint64	// 累計消息體的字節數

	sync.RWMutex

	name              string				// topic名,生產和消費時需要指定此名稱
	channelMap        map[string]*Channel	// 保存每個channel name和channel指針的映射
	backend           BackendQueue			// 磁盤隊列,當內存memoryMsgChan滿時,寫入硬盤隊列
	memoryMsgChan     chan *Message			// 消息優先存入這個內存chan
	startChan         chan int
	exitChan          chan int
	channelUpdateChan chan int
	waitGroup         util.WaitGroupWrapper
	exitFlag          int32
	idFactory         *guidFactory

	ephemeral      bool
	deleteCallback func(*Topic)
	deleter        sync.Once

	paused    int32
	pauseChan chan int

	ctx *context
}

主要方法

/*
	下面兩個方法負責將消息寫入topic,底層均調用topic.put()方法
	1. topic.memoryMsgChan未滿時,優先寫入內存memoryMsgChan
	2. 否則,寫入磁盤topic.backend
*/
func (t *Topic) PutMessage(m *Message) error
func (t *Topic) PutMessages(msgs []*Message) error

/*
	NewTopic創建新的topic時會為每個topic啟動一個獨立線程來處理消息推送,即messagePump()
	此方法循環隨機從內存memoryMsgChan和磁盤隊列backend中取消息寫入到topic下每一個chnnel中
*/
func (t *Topic) messagePump()

2.6 channel

nsqd/channel.go文件,對應於每個channel實例

type Channel struct {
	requeueCount uint64
	messageCount uint64
	timeoutCount uint64

	sync.RWMutex

	topicName string
	name      string
	ctx       *context

	backend BackendQueue		// 磁盤隊列,當內存memoryMsgChan滿時,寫入硬盤隊列

	memoryMsgChan chan *Message	// 消息優先存入這個內存chan
	exitFlag      int32
	exitMutex     sync.RWMutex

	// state tracking
	clients        map[int64]Consumer
	paused         int32
	ephemeral      bool
	deleteCallback func(*Channel)
	deleter        sync.Once

	// Stats tracking
	e2eProcessingLatencyStream *quantile.Quantile

	// TODO: these can be DRYd up
	deferredMessages map[MessageID]*pqueue.Item	// 保存尚未到時間的延遲消費消息
	deferredPQ       pqueue.PriorityQueue		// 保存尚未到時間的延遲消費消息,最小堆
	deferredMutex    sync.Mutex
	inFlightMessages map[MessageID]*Message		// 保存已推送尚未收到FIN的消息
	inFlightPQ       inFlightPqueue				// 保存已推送尚未收到FIN的消息,最小堆
	inFlightMutex    sync.Mutex
}

主要方法

/*
	將消息寫入channel,邏輯與topic的一致,內存未滿則優先寫內存chan,否則寫入磁盤隊列
*/
func (c *Channel) PutMessage(m *Message) error
func (c *Channel) put(m *Message) error

// 消費超時相關
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error
func (c *Channel) pushInFlightMessage(msg *Message) error
func (c *Channel) popInFlightMessage(clientID int64, id MessageID) (*Message, error)
func (c *Channel) addToInFlightPQ(msg *Message)
func (c *Channel) removeFromInFlightPQ(msg *Message)
func (c *Channel) processInFlightQueue(t int64) bool

// 延時消費相關
func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error
func (c *Channel) pushDeferredMessage(item *pqueue.Item) error
func (c *Channel) popDeferredMessage(id MessageID) (*pqueue.Item, error)
func (c *Channel) addToDeferredPQ(item *pqueue.Item)
func (c *Channel) processDeferredQueue(t int64) bool

3 啟動過程

nsqd的main函數在apps/nsqd/main.go文件。
啟動時調用了一個第三方包svc,主要作用是攔截syscall.SIGINT/syscall.SIGTERM這兩個信號,最終還是調用了main.go下的3個方法:

  • program.Init():windows下特殊操作
  • program.Start():加載參數和配置文件、加載上一次保存的Topic信息並完成初始化、創建nsqd並調用p.nsqd.Main()啟動
  • program.Stop():退出處理

p.nsqd.Main()的邏輯也很簡單,代碼不貼了,依次啟動了TCP服務、HTTP服務、HTTPS服務這3個服務。除此之外,還啟動了以下兩個協程:

  • queueScanLoop:消息延時/超時處理
  • lookupLoop:服務注冊

TCPServer
protocol包的TCPServer的核心代碼就是下面這幾行,循環等待客戶端連接,並為每個連接創建一個獨立的協程:

func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
	for {
		// 等待生產者或消費者連接
		clientConn, err := listener.Accept()

		// 每創建一個連接wg +1
		wg.Add(1)
		go func() {
			// 每個連接均啟動一個獨立的協程來接收處理請求
			handler.Handle(clientConn)
			wg.Done()
		}()
	}

	// 等待所有協程退出
	wg.Wait()

	return nil
}

TCPServer的核心是為每個連接啟動的協程處理方法handler.Handle(clientConn),實際調用的是下面這個方法,連接建立時先讀取4字節,必須是" V2",然后啟動prot.IOLoop(clientConn)處理接下來的客戶端請求:

func (p *tcpServer) Handle(clientConn net.Conn) {
	// 無論是生產者還是消費者,建立連接時,必須先發送4字節的"  V2"進行版本校驗
	buf := make([]byte, 4)
	_, err := io.ReadFull(clientConn, buf)
	protocolMagic := string(buf)

	var prot protocol.Protocol
	switch protocolMagic {
	case "  V2":
		prot = &protocolV2{ctx: p.ctx}
	default:
		protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
		clientConn.Close()
		p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
			clientConn.RemoteAddr(), protocolMagic)
		return
	}

	// 版本校驗通過,保存連接信息,key-是ADDR,value是當前連接指針
	p.conns.Store(clientConn.RemoteAddr(), clientConn)

	// 啟動
	err = prot.IOLoop(clientConn)
	if err != nil {
		p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
	}

	p.conns.Delete(clientConn.RemoteAddr())
}

4 消費和生產過程

4.1 消息生產

生產者pub消息時,消息會首先寫入對應topic的隊列(內存優先,內存滿了寫磁盤),topic的messagePump()方法再將消息拷貝給每個channel。
每個channel均各執一份完整的消息。

1.消息寫入topic
消息生產由生產者調用PUB/MPUB/DPUB這類指令實現,底層都是調用topic.PutMessage(msg),進一步調用topic.put(msg):

func (t *Topic) put(m *Message) error {
	select {
	case t.memoryMsgChan <- m:	// 優先寫入內存memoryMsgChan
	default:					// 當內存case失敗即memoryMsgChan滿時,走default,將msg以字節形式寫入磁盤隊列topic.backend
		b := bufferPoolGet()
		err := writeMessageToBackend(b, m, t.backend)
		bufferPoolPut(b)
		t.ctx.nsqd.SetHealth(err)
		if err != nil {
			t.ctx.nsqd.logf(LOG_ERROR,
				"TOPIC(%s) ERROR: failed to write message to backend - %s",
				t.name, err)
			return err
		}
	}
	return nil
}

消息寫入topic的邏輯比較簡單,優先寫memoryMsgChan,如果memoryMsgChan滿了,則寫入磁盤隊列topic.backend。
這里留個思考題:NSQ是否支持不寫內存,全部寫磁盤隊列?

2.topic將消息復制給每個channel
第二章介紹結構體和方法時,介紹了topic結構體的messagePump()方法,正是這個方法將第1步寫入的消息復制給每個channel的:

func (t *Topic) messagePump() {
	/* 准備工作有代碼我們略過 */
	// 主消息處理循環
	for {
		select {
		case msg = <-memoryMsgChan:
		case buf = <-backendChan:
			msg, err = decodeMessage(buf)
			if err != nil {
				t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
				continue
			}
		case <-t.channelUpdateChan:
			chans = chans[:0]
			t.RLock()
			for _, c := range t.channelMap {
				chans = append(chans, c)
			}
			t.RUnlock()
			if len(chans) == 0 || t.IsPaused() {
				memoryMsgChan = nil
				backendChan = nil
			} else {
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		case <-t.pauseChan:
			if len(chans) == 0 || t.IsPaused() {
				memoryMsgChan = nil
				backendChan = nil
			} else {
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		case <-t.exitChan:
			goto exit
		}

		for i, channel := range chans {
			chanMsg := msg
			/* channel消費消息時,需要處理延時/超時等問題,所以這里復制了消息,給每個channel傳遞的是獨立的消息實例 */
			if i > 0 {
				chanMsg = NewMessage(msg.ID, msg.Body)
				chanMsg.Timestamp = msg.Timestamp
				chanMsg.deferred = msg.deferred
			}
			if chanMsg.deferred != 0 {
				channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
				continue
			}
			err := channel.PutMessage(chanMsg)
			if err != nil {
				t.ctx.nsqd.logf(LOG_ERROR,
					"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
					t.name, msg.ID, channel.name, err)
			}
		}
	}
}

topic.messagePump()方法代碼還蠻長的,前面是些准備工作,主要就是后面的for循環。其中for循環中select的前兩項,memoryMsgChan來源於topic.memoryMsgChan,而backendChan則是topic.backend.ReadChan(),分別對應於內存和磁盤隊列。注意只有這兩個case會往下傳遞消息,其他的case處理退出和更新機制的,會continue或exit外層的for循環。
雖然通道channel是有序的,但select的case具有隨機性,這就決定了每輪循環讀的是內存還是磁盤是隨機的,消息的消費順序是不可控的。
select語句獲取的消息,交給第2層for循環處理,邏輯比較簡單,遍歷每一個chan,調用channel.PutMessage()寫入。由於每個channel對應於不同的消費者,有不同的延時/超時和消費機制,所以這里拷貝了message實例。

4.2 消息消費

每個連接均會啟動一個運行protocolV2.messagePump()方法的協程,這個協程負責監聽channel的消息隊列並向客戶端推送消息。客戶端只有觸發SUB指令之后,才會將channel傳遞給protocolV2.messagePump(),這之后消費推送才會正式開啟。
啟動消息推送
前面講Tcpserver時有提到,客戶端創建連接時,會調用tcpserver.Handle(),里面再調用protocolV2.IOLoop()。protocolV2.IOLoop()方法開頭有下面這行:

	go p.messagePump(client, messagePumpStartedChan)

這行創建了一個獨立線程,調用的protocolV2.messagePump()負責向消費者推送消息。
有個小細節是無論是生產者還是消費者,都會創建這個協程,protocolV2.messagePump()創建后並不會立即推送消息,而是需要調用SUB指令,以protocolV2.SUB()方法為例,方法末尾有這么一行:

	client.SubEventChan <- channel

將當前消費者訂閱的channel傳入client.SubEventChan,這個會由protocolV2.messagePump()接收,這個方法核心是下面這個for循環(限於篇幅,我省略了大量無關代碼):

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
	for {
		if subChannel == nil || !client.IsReadyForMessages() {
			// the client is not ready to receive messages...
			memoryMsgChan = nil
			backendMsgChan = nil
			flusherChan = nil
			// force flush
			client.writeLock.Lock()
			err = client.Flush()
			client.writeLock.Unlock()
			if err != nil {
				goto exit
			}
			flushed = true
		} else if flushed {
			// last iteration we flushed...
			// do not select on the flusher ticker channel
			memoryMsgChan = subChannel.memoryMsgChan
			backendMsgChan = subChannel.backend.ReadChan()
			flusherChan = nil
		}
		
		select {
		case subChannel = <-subEventChan:
			// you can't SUB anymore
			subEventChan = nil
		case b := <-backendMsgChan:
			if sampleRate > 0 && rand.Int31n(100) > sampleRate {
				continue
			}

			msg, err := decodeMessage(b)
			if err != nil {
				p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
				continue
			}
			msg.Attempts++

			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()
			err = p.SendMessage(client, msg)
			if err != nil {
				goto exit
			}
			flushed = false
		case msg := <-memoryMsgChan:
			if sampleRate > 0 && rand.Int31n(100) > sampleRate {
				continue
			}
			msg.Attempts++

			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()
			err = p.SendMessage(client, msg)
			if err != nil {
				goto exit
			}
			flushed = false
		case <-client.ExitChan:
			goto exit
		}
	}
}

客戶端建立連接初始,subChannel為空,循環一直走第1個if語句。直到客戶端調用SUB指令,select語句執行"case subChannel = <-subEventChan:",此時subChannel非空,接下來backendMsgChan和memoryMsgChan被賦值,此后開始推送消息:

  • 消息會隨機從內存和磁盤隊列取,因為如果內存和磁盤都有數據,select是隨機的
  • 消息通過protocolV2.SendMessage()推送給消費者

當多個消費者訂閱同一個channel時情況會如何?
上面我們提到消費者發起SUB指令訂閱消息,protocolV2.SUB()會將chan傳給protocolV2.messagePump(),即這一行“client.SubEventChan <- channel”,那么我們來看下這個channel變量怎么來的:

func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
	...
	
	topic := p.ctx.nsqd.GetTopic(topicName)
	channel = topic.GetChannel(channelName)
	
	...
	client.SubEventChan <- channel

SUB方法包含多種邏輯:

  • 當channel不存在時,topic.GetChannel()方法自動創建並與這個消費者綁定
  • 當channel存在,比如事先通過http-api創建好了,但沒有消費者訂閱,則當前消費者獨立綁定這個channel
  • 當channel存在,且已經有消費者訂閱了,topic.GetChannel()方法依然會返回這個channel,這時就有多個消費者同時訂閱了這個channel,大家共用一個通道chan變量

由於是多個消費者共用一個通道chan變量,每個消費者都有一個for select在循環監聽這個通道,根據chan變量的特性,消費會隨機發送給一位消費者,且一條消息只會推送給一個消費者。

消費超時處理
protocolV2.messagePump()方法,無論是“case b := <-backendMsgChan:”還是“case msg := <-memoryMsgChan:”,在向消費者推送消息前都調用了下面這行代碼:

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
	subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) // 省略其他代碼
}

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
	msg.pri = now.Add(timeout).UnixNano() // pri成員保存本消息超時時間
	err := c.pushInFlightMessage(msg)
	c.addToInFlightPQ(msg)
}

channel.StartInFlightTimeout()將消息保存到channel的inFlightMessages和inFlightPQ隊列中,這兩個緩存是用來處理消費超時的。
值得注意的一個小細節是c.addToInFlightPQ(msg)將msg壓入最小堆時,將msg在數組的偏移量保存到了msg.index成員中(最小堆底層是數組實現)

我們先簡單看下FIN指令會做啥:

func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) {
	err = client.Channel.FinishMessage(client.ID, *id) // 省略其他代碼
}

func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
	// 省略其他代碼
	msg, err := c.popInFlightMessage(clientID, id)

	c.removeFromInFlightPQ(msg)
}

FIN的動作比較簡單,主要就是調用channel.FinishMessage()方法把上面寫入超時緩存的msg給刪除掉。

FIN從inFlightMessages中刪除消息比較容易,這是個map,key是msg.id。客戶端發送FIN消息時附帶了msg.id。但如何從最小堆inFlightPQ中刪除對應的msg呢?前面提到在入堆時的一個細節,即保存了msg的偏移量,此時正好用上。通過msg.index直接定位到msg的位置並調整堆即可。

說了這么多,最小堆的作用是啥?別急,接下來我們看下超時邏輯:
超時邏輯由程序啟動時開啟的工作線程組來處理,即NSQD.queueScanLoop()方法:

func (n *NSQD) queueScanLoop() {
	n.resizePool(len(channels), workCh, responseCh, closeCh)

	for {
		select {
		case <-workTicker.C: // 定時觸發工作
			if len(channels) == 0 {
				continue
			}
		case <-refreshTicker.C: // 動態調整協程組的數量
			channels = n.channels()
			n.resizePool(len(channels), workCh, responseCh, closeCh)
			continue
		case <-n.exitChan:
			goto exit
		}

		num := n.getOpts().QueueScanSelectionCount
		if num > len(channels) {
			num = len(channels)
		}

	loop:
		for _, i := range util.UniqRands(num, len(channels)) {
			workCh <- channels[i] // 觸發協程組工作
		}

		numDirty := 0
		for i := 0; i < num; i++ {
			if <-responseCh {
				numDirty++
			}
		}

		if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
			goto loop
		}
	}
}

NSQD.queueScanLoop()方法主要有一個for循環,內層是一個select和一個loop循環。select中,第1個定時器case <-workTicker.C的作用是定時觸發工作,只有這個case會跳出select走到下面的loop。第2個定時器負責啟動工作協程組並動態調整協程數量,我們來看下第2個定時器調用的resizePool()方法:

func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
	idealPoolSize := int(float64(num) * 0.25) // 協程數量設定為channel數的1/4
	if idealPoolSize < 1 {
		idealPoolSize = 1
	} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
		idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
	}
	for {
		if idealPoolSize == n.poolSize {		// 當協程數量達到協程數量設定為channel數的1/4時,退出
			break
		} else if idealPoolSize < n.poolSize {	// 否則如果當前協程數大於目標值,則通過closeCh通知部分協程退出
			// contract
			closeCh <- 1
			n.poolSize--
		} else {								// 否則協程數不夠,則啟動新的協程
			// expand
			n.waitGroup.Wrap(func() {
				n.queueScanWorker(workCh, responseCh, closeCh)
			})
			n.poolSize++
		}
	}
}

resizePool()方法上面的注釋已經說的很清楚了,作用就是保持工作協程數量為當前channel數的1/4。

接下來我們看具體的工作邏輯,queueScanWorker()方法:

func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
	for {
		select {
		case c := <-workCh:
			now := time.Now().UnixNano()
			dirty := false
			if c.processInFlightQueue(now) {
				dirty = true
			}
			if c.processDeferredQueue(now) {
				dirty = true
			}
			responseCh <- dirty
		case <-closeCh:
			return
		}
	}
}

queueScanWorker()方法的代碼很短,一是監聽closeCh的退出信號;二是監聽workCh的工作信號。workCh會將需要處理的channel傳入,然后調用processInFlightQueue()清理超時的消息,調用processDeferredQueue()清理到時間的延時消息:

func (c *Channel) processInFlightQueue(t int64) bool {
	dirty := false
	for {
		c.inFlightMutex.Lock()
		msg, _ := c.inFlightPQ.PeekAndShift(t)
		c.inFlightMutex.Unlock()

		if msg == nil {
			goto exit
		}
		dirty = true

		_, err := c.popInFlightMessage(msg.clientID, msg.ID)
		if err != nil {
			goto exit
		}
		atomic.AddUint64(&c.timeoutCount, 1)
		c.RLock()
		client, ok := c.clients[msg.clientID]
		c.RUnlock()
		if ok {
			client.TimedOutMessage()
		}
		c.put(msg)
	}

exit:
	return dirty
}

func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) {
	if len(*pq) == 0 {
		return nil, 0
	}

	x := (*pq)[0]
	if x.pri > max {
		return nil, x.pri - max
	}
	pq.Pop()

	return x, 0
}

前面提到msg.pri成員保存本消息超時時間,所以PeekAndShift()返回的是最小堆里已經超時且超時時間最長的那條消息。processInFlightQueue()則將消息從超時隊列中刪,同時將消息重新put進channel。注意此時超時的消息put進channel后實際是排在隊尾的,消費順序將發生改變。
processInFlightQueue()方法如果存在超時消息,返回值dirty標識true。queueScanWorker()將dirty寫入responseCh。再往回看,queueScanLoop()方法統計了dirty的數量,超過一定比例會繼續執行loop,而不是等待下一次定時執行。

4.2 延遲消費

生產者調用DPUB發布的消息,可以指定延時多少再推送給消費者。
我們來看下DPUB的邏輯:

func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
	timeoutMs, err := protocol.ByteToBase10(params[2])
	timeoutDuration := time.Duration(timeoutMs) * time.Millisecond
	msg := NewMessage(topic.GenerateID(), messageBody)
	msg.deferred = timeoutDuration
	err = topic.PutMessage(msg)
}

從上面截取的PUB()方法代碼可以看出,DPUB的消息會將延時時間寫入msg.deferred成員。4.1章節第2部分介紹的Topic.messagePump()方法有下面這段:

func (t *Topic) messagePump() {
			if chanMsg.deferred != 0 {
				channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
				continue
			}
}

當chanMsg.deferred != 0時表示延時消息,此時不是直接調用putMessage()方法寫入channel,而是調用channel.PutMessageDeferred(chanMsg, chanMsg.deferred),消息被寫入了延時隊列Channel.deferredMessages和Channel.deferredPQ。之后的邏輯是在工作協程組NSQD.queueScanLoop()中被識別並put進channel,這與超時的處理邏輯是一樣的,不展開說。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM