與Channel相關的代碼主要位於nsqd/channel.go
, nsqd/nsqd.go
中。
Channel與Topic的關系
Channel是消費者訂閱特定Topic的一種抽象。對於發往Topic的消息,nsqd向該Topic下的所有Channel投遞消息,而同一個Channel只投遞一次,Channel下如果存在多個消費者,則隨機選擇一個消費者做投遞。這種投遞方式可以被用作消費者負載均衡。
Channel從屬於特定Topic,可以認為是Topic的下一級。在同一個Topic之下可以有零個或多個Channel。
和Topic一樣,Channel同樣有永久和臨時之分,永久的Channel只能通過顯式刪除銷毀,臨時的Channel在最后一個消費者斷開連接的時候被銷毀。
與服務於生產者的Topic不同,Channel直接面向消費者。
在代碼上Channel和Topic有許多相似之處,對於和Topic相同或者相似的部分,以下不再贅述,可以參考Topic相關博文。
Channel的創建
Channel和Topic在創建的時候都會初始化結構,初始化backend,創建消息循環,不同的是Channel在創建時多了給e2eProcessingLatencyStream
賦值的以及initPQ
部分。
其中e2eProcessingLatencyStream
主要用於統計消息投遞的延遲等,將在以后的博文中敘述。
initPQ
函數創建了兩個字典inFlightMessages
、deferredMessages
和兩個隊列inFlightPQ
、deferredPQ
。在nsq中inFlight指的是正在投遞但還沒確認投遞成功的消息,defferred指的是投遞失敗,等待重新投遞的消息。initPQ
創建的字典和隊列主要用於索引和存放這兩類消息。其中兩個字典使用消息ID作索引。
inFlightPQ
使用newInFlightPqueue
初始化,InFlightPqueue
位於nsqd\in_flight_pqueue.go
。nsqd\in_flight_pqueue.go
是nsq實現的一個優先級隊列,提供了常用的隊列操作,值得學習。
deferredPQ
使用pqueue.New
初始化,pqueue
位於nsqd\pqueue.go
,也是一個優先級隊列。
待投遞消息進入Channel
在分析Topic時提到,消息進入Topic的消息循環后會被投遞到該Topic下所有的Channel,由Channel的PutMessage
函數進行處理。
PutMessage
判斷當前Channel是否已經被銷毀,若未銷毀,則調用put
函數進行處理,最后,自增消息計數器。
Channel的put
函數與Topic的同名函數相似,可以參考Topic。
Channel對消息的處理
進入Channel的消息在messagePump
函數中處理,該函數也與Topic的同名函數相似:消息都從memory和backend兩個來源接收,然后解碼消息后處理。與Topic不同的是,channel在投遞消息前,會自增msg.Attempts
,該變量用於保存投遞嘗試的次數。
在消息投遞前會將bufferedCount
置為1,在投遞后置為0。該變量在Depth
函數中被調用。
Deepth
函數返回內存,磁盤以及正在投遞的消息數量之和,也就是尚未投遞成功的消息數。
messagePump
函數在投遞消息時將消息送入clientMsgChan
,隨后被nsqd\protocol_v2.go
的messagePump
函數處理。
在protocolV2的messagePump
函數中,消息被通過投送到相應消費者。投遞時首先調用Channel的StartInFlightTimeout
函數
該函數填充消息的消費者ID、投送時間、優先級,然后調用pushInFlightMessage
函數將消息放入inFlightMessages
字典中。最后調用addToInFlightPQ
將消息放入inFlightPQ
隊列中。
至此,消息投遞流程完成,接下來需要等待消費者對投送結果的反饋。消費者通過發送FIN
、REQ
、TOUCH
來回復對消息的處理結果。
關於TCP protocol相關的內容,在后續博文分析。以下只分析與Channel相關的部分。
消息投送結果處理
消息投送成功的處理
消費者發送FIN
,表明消息已經被接收並正確處理。
FIN消息在與Channel相關的部分交由FinishMessage
處理。最后調用addToInFlightPQ
將消息放入inFlightPQ
隊列中。FinishMessage
分別調用popInFlightMessage
和removeFromInFlightPQ
將消息從inFlightMessages
和inFlightPQ
中刪除。最后,統計該消息的投遞情況。
消息投送失敗的處理
客戶端發送REQ
,表明消息投遞失敗,需要再次被投遞。
Channel在RequeueMessage
函數對消息投遞失敗進行處理。該函數將消息從inFlightMessages
和inFlightPQ
中刪除,隨后進行重新投遞。
發送REQ
時有一個附加參數timeout,該值為0時表示立即重新投遞,大於0時表示等待timeout時間之后投遞。
立即投遞使用doRequeue
函數,該函數簡單地調用put
函數重新進行消息的投遞,並自增requeueCount
,該變量在統計消息投遞情況時用到。
如果timeout大於0,則調用StartDeferredTimeout
進行延遲投遞。首先計算延遲投遞的時間點,然后調用pushDeferredMessage
將消息加入deferredMessage
字典,最后將消息放入deferredPQ
隊列。延遲投遞的消息會被專門的worker掃描並在延遲投遞的時間點后進行投遞。
需要注意的是,立即重新投遞的消息不會進入deferredPQ
隊列。
消息的超時值的重置
消費者發送TOUCH
,表明該消息的超時值需要被重置。
這個過程比較簡單,從inFlightPQ
中取出消息,設置新的超時值后重新放入隊列,新的超時值由當前時間、客戶端通過IDENTIFY
設置的超時值、配置中允許的最大超時值MaxMsgTimeout
共同決定。
消息的超時和延遲投遞
消息超時和延遲投遞的處理流程層次比較多:
首先是在nsqd\nsqd.go
中啟動的用於定時掃描的goroutine。該goroutine執行queueScanLoop
函數
該函數使用若干個worker來掃描並處理當前在投遞中以及等待重新投遞的消息。worker的個數由配置和當前Channel數量共同決定。
首先,初始化3個gochannel:workCh、responseCh、closeCh,分別控制worker的輸入、輸出和銷毀。
然后獲取當前的Channel集合,並且調用resizePool
函數來啟動指定數量的worker。
最后進入掃描的循環。在循環中,等待兩個定時器,workTicker
和refreshTicker
,定時時間分別由由配置中的QueueScanInterval
和QueueScanRefreshInterval
決定。這種由等待定時器觸發的循環避免了函數持續的執行影響性能,而Golang的特性使得這種機制在寫法上非常簡潔。
workTicker
定時器觸發掃描流程。
nsqd采用了Redis的probabilistic expiration算法來進行掃描。首先從所有Channel中隨機選取部分Channel,然后遍歷被選取的Channel,投到workerChan
中,並且等待反饋結果,結果有兩種,dirty和非dirty,如果dirty的比例超過配置中設定的QueueScanDirtyPercent
,那么不進入休眠,繼續掃描,如果比例較低,則重新等待定時器觸發下一輪掃描。這種機制可以在保證處理延時較低的情況下減少對CPU資源的浪費。refreshTicker
定時器觸發更新Channel列表流程。
這個流程比較簡單,先獲取一次Channel列表,
再調用resizePool
重新分配worker。
接下來再看看resizePool
的實現。
這個部分比較簡單。注意一點,當需要的worker數量超過之前分配的數量時,通過向closeCh
投遞消息使多余的worker銷毀,如果需要的數量比之前的多,則通過queueScanWorker
創建新的worker。
queueScanWorker
接收workCh
發來的消息,處理,並且通過responseCh
反饋消息。收到closeCh
時則關閉。由於所有worker都監聽相同的closeCh
,所以當向closeCh
發送消息時,隨機關閉一個worker。且由於workCh
和closeCh
的監聽是串行的,所以不存在任務處理到一半時被關閉的可能。這也是nsq中優雅關閉gochannel的的一個例子。
worker處理兩件事:
一是處理inFlight消息
processInFlightQueue
取出inFlightPQ
頂部的消息,如果當前消息已經超時,則將消息從隊列中移除,並返回消息。由於隊列是優先級隊列,所以如果processInFlightQueue
取出的消息為空,則不需要再往后取了,直接返回false表示當前非dirty狀態。如果取到了消息,則說明該消息投遞超時,需要把消息傳入doRequeue
立即重新投遞。
二是處理deferred消息
該處理流程與處理inFlight基本相同,不再詳述。
其他操作
Channel中還有些其他函數如Exiting
、Delete
、Close
、exit
、Empty
、flush
、Pause
、UnPause
、doPause
等與Topic中很接近,不再詳述。
AddClient
和RemoveClient
將在分析Client時討論。
總結
Topic/Channel是發布/訂閱模型的一種實現。Topic對應於發布,Channel對應於訂閱。消費者通過在Topic下生成不同的Channel來接收來自該Topic的消息。通過生成相同的Channel來實現消費者負載均衡。
Channel本身在投遞消息給消費者時維護兩個隊列,一個是inFlight隊列,該隊列存儲正在投遞,但還沒被標記為投遞成功的消息。另一個是deferred隊列,用來存儲需要被延時投遞的消息。
inFlight隊列中消息可能因為投遞超時而失敗,deferred隊列中的消息需要在到達指定時間后進行重新投遞。如果為兩個隊列中的每個消息都分別指定定時器,無疑是非常消耗資源的。因此nsq采用定時掃描隊列的做法。
在掃描時采用多個worker分別處理。這種類似多線程的處理方式提高了處理效率。nsq在掃描策略上使用了Redis的probabilistic expiration算法,同時動態調整worker的數量,這些優化平衡了效率和資源占用。