nsq源碼閱讀筆記之nsqd(四)——Channel


Channel相關的代碼主要位於nsqd/channel.gonsqd/nsqd.go中。

Channel與Topic的關系

Channel是消費者訂閱特定Topic的一種抽象。對於發往Topic的消息,nsqd向該Topic下的所有Channel投遞消息,而同一個Channel只投遞一次,Channel下如果存在多個消費者,則隨機選擇一個消費者做投遞。這種投遞方式可以被用作消費者負載均衡。

Channel從屬於特定Topic,可以認為是Topic的下一級。在同一個Topic之下可以有零個或多個Channel。 
和Topic一樣,Channel同樣有永久和臨時之分,永久的Channel只能通過顯式刪除銷毀,臨時的Channel在最后一個消費者斷開連接的時候被銷毀。

與服務於生產者的Topic不同,Channel直接面向消費者。

在代碼上Channel和Topic有許多相似之處,對於和Topic相同或者相似的部分,以下不再贅述,可以參考Topic相關博文。

Channel的創建

Channel和Topic在創建的時候都會初始化結構,初始化backend,創建消息循環,不同的是Channel在創建時多了給e2eProcessingLatencyStream賦值的以及initPQ部分。

其中e2eProcessingLatencyStream主要用於統計消息投遞的延遲等,將在以后的博文中敘述。

initPQ函數創建了兩個字典inFlightMessagesdeferredMessages和兩個隊列inFlightPQdeferredPQ。在nsq中inFlight指的是正在投遞但還沒確認投遞成功的消息,defferred指的是投遞失敗,等待重新投遞的消息。initPQ創建的字典和隊列主要用於索引和存放這兩類消息。其中兩個字典使用消息ID作索引。

inFlightPQ使用newInFlightPqueue初始化,InFlightPqueue位於nsqd\in_flight_pqueue.gonsqd\in_flight_pqueue.go是nsq實現的一個優先級隊列,提供了常用的隊列操作,值得學習。

deferredPQ使用pqueue.New初始化,pqueue位於nsqd\pqueue.go,也是一個優先級隊列。

待投遞消息進入Channel

在分析Topic時提到,消息進入Topic的消息循環后會被投遞到該Topic下所有的Channel,由Channel的PutMessage函數進行處理。

PutMessage判斷當前Channel是否已經被銷毀,若未銷毀,則調用put函數進行處理,最后,自增消息計數器。

Channel的put函數與Topic的同名函數相似,可以參考Topic。

Channel對消息的處理

進入Channel的消息在messagePump函數中處理,該函數也與Topic的同名函數相似:消息都從memory和backend兩個來源接收,然后解碼消息后處理。與Topic不同的是,channel在投遞消息前,會自增msg.Attempts,該變量用於保存投遞嘗試的次數。

在消息投遞前會將bufferedCount置為1,在投遞后置為0。該變量在Depth函數中被調用。

Deepth函數返回內存,磁盤以及正在投遞的消息數量之和,也就是尚未投遞成功的消息數。

messagePump函數在投遞消息時將消息送入clientMsgChan,隨后被nsqd\protocol_v2.gomessagePump函數處理。

在protocolV2的messagePump函數中,消息被通過投送到相應消費者。投遞時首先調用Channel的StartInFlightTimeout函數

該函數填充消息的消費者ID、投送時間、優先級,然后調用pushInFlightMessage函數將消息放入inFlightMessages字典中。最后調用addToInFlightPQ將消息放入inFlightPQ隊列中。

至此,消息投遞流程完成,接下來需要等待消費者對投送結果的反饋。消費者通過發送FINREQTOUCH來回復對消息的處理結果。

關於TCP protocol相關的內容,在后續博文分析。以下只分析與Channel相關的部分。

消息投送結果處理

消息投送成功的處理

消費者發送FIN,表明消息已經被接收並正確處理。

FIN消息在與Channel相關的部分交由FinishMessage處理。最后調用addToInFlightPQ將消息放入inFlightPQ隊列中。FinishMessage分別調用popInFlightMessageremoveFromInFlightPQ將消息從inFlightMessagesinFlightPQ中刪除。最后,統計該消息的投遞情況。

消息投送失敗的處理

客戶端發送REQ,表明消息投遞失敗,需要再次被投遞。

Channel在RequeueMessage函數對消息投遞失敗進行處理。該函數將消息從inFlightMessagesinFlightPQ中刪除,隨后進行重新投遞。

發送REQ時有一個附加參數timeout,該值為0時表示立即重新投遞,大於0時表示等待timeout時間之后投遞。

立即投遞使用doRequeue函數,該函數簡單地調用put函數重新進行消息的投遞,並自增requeueCount,該變量在統計消息投遞情況時用到。

如果timeout大於0,則調用StartDeferredTimeout進行延遲投遞。首先計算延遲投遞的時間點,然后調用pushDeferredMessage將消息加入deferredMessage字典,最后將消息放入deferredPQ隊列。延遲投遞的消息會被專門的worker掃描並在延遲投遞的時間點后進行投遞。 
需要注意的是,立即重新投遞的消息不會進入deferredPQ隊列。

消息的超時值的重置

消費者發送TOUCH,表明該消息的超時值需要被重置。

這個過程比較簡單,從inFlightPQ中取出消息,設置新的超時值后重新放入隊列,新的超時值由當前時間、客戶端通過IDENTIFY設置的超時值、配置中允許的最大超時值MaxMsgTimeout共同決定。

消息的超時和延遲投遞

消息超時和延遲投遞的處理流程層次比較多:

首先是在nsqd\nsqd.go中啟動的用於定時掃描的goroutine。該goroutine執行queueScanLoop函數

該函數使用若干個worker來掃描並處理當前在投遞中以及等待重新投遞的消息。worker的個數由配置和當前Channel數量共同決定。

首先,初始化3個gochannel:workCh、responseCh、closeCh,分別控制worker的輸入、輸出和銷毀。

然后獲取當前的Channel集合,並且調用resizePool函數來啟動指定數量的worker。

最后進入掃描的循環。在循環中,等待兩個定時器,workTickerrefreshTicker,定時時間分別由由配置中的QueueScanIntervalQueueScanRefreshInterval決定。這種由等待定時器觸發的循環避免了函數持續的執行影響性能,而Golang的特性使得這種機制在寫法上非常簡潔。

  1. workTicker定時器觸發掃描流程。 
    nsqd采用了Redis的probabilistic expiration算法來進行掃描。首先從所有Channel中隨機選取部分Channel,然后遍歷被選取的Channel,投到workerChan中,並且等待反饋結果,結果有兩種,dirty和非dirty,如果dirty的比例超過配置中設定的QueueScanDirtyPercent,那么不進入休眠,繼續掃描,如果比例較低,則重新等待定時器觸發下一輪掃描。這種機制可以在保證處理延時較低的情況下減少對CPU資源的浪費。
  2. refreshTicker定時器觸發更新Channel列表流程。 
    這個流程比較簡單,先獲取一次Channel列表, 
    再調用resizePool重新分配worker。

接下來再看看resizePool的實現。

這個部分比較簡單。注意一點,當需要的worker數量超過之前分配的數量時,通過向closeCh投遞消息使多余的worker銷毀,如果需要的數量比之前的多,則通過queueScanWorker創建新的worker。

queueScanWorker接收workCh發來的消息,處理,並且通過responseCh反饋消息。收到closeCh時則關閉。由於所有worker都監聽相同的closeCh,所以當向closeCh發送消息時,隨機關閉一個worker。且由於workChcloseCh的監聽是串行的,所以不存在任務處理到一半時被關閉的可能。這也是nsq中優雅關閉gochannel的的一個例子。

worker處理兩件事:

一是處理inFlight消息

processInFlightQueue取出inFlightPQ頂部的消息,如果當前消息已經超時,則將消息從隊列中移除,並返回消息。由於隊列是優先級隊列,所以如果processInFlightQueue取出的消息為空,則不需要再往后取了,直接返回false表示當前非dirty狀態。如果取到了消息,則說明該消息投遞超時,需要把消息傳入doRequeue立即重新投遞。

二是處理deferred消息

該處理流程與處理inFlight基本相同,不再詳述。

其他操作

Channel中還有些其他函數如ExitingDeleteCloseexitEmptyflushPauseUnPausedoPause 
等與Topic中很接近,不再詳述。

AddClientRemoveClient將在分析Client時討論。

總結

Topic/Channel是發布/訂閱模型的一種實現。Topic對應於發布,Channel對應於訂閱。消費者通過在Topic下生成不同的Channel來接收來自該Topic的消息。通過生成相同的Channel來實現消費者負載均衡。

Channel本身在投遞消息給消費者時維護兩個隊列,一個是inFlight隊列,該隊列存儲正在投遞,但還沒被標記為投遞成功的消息。另一個是deferred隊列,用來存儲需要被延時投遞的消息。

inFlight隊列中消息可能因為投遞超時而失敗,deferred隊列中的消息需要在到達指定時間后進行重新投遞。如果為兩個隊列中的每個消息都分別指定定時器,無疑是非常消耗資源的。因此nsq采用定時掃描隊列的做法。 
在掃描時采用多個worker分別處理。這種類似多線程的處理方式提高了處理效率。nsq在掃描策略上使用了Redis的probabilistic expiration算法,同時動態調整worker的數量,這些優化平衡了效率和資源占用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM