NSQ源碼剖析之nsqd


NSQ簡介

NSQ 是實時的分布式消息處理平台,其設計的目的是用來大規模地處理每天數以十億計級別的消息。NSQ 具有分布式和去中心化拓撲結構,該結構具有無單點故障、故障容錯、高可用性以及能夠保證消息的可靠傳遞的特征,是一個成熟的、已在大規模生成環境下應用的產品。

NSQ 由 3 個守護進程組成: 
nsqd 是接收、保存和傳送消息到客戶端的守護進程。 
nsqlookupd 是管理的拓撲信息,維護着所有nsqd的狀態,並提供了最終一致發現服務的守護進程。 
nsqadmin 是一個 Web UI 來實時監控集群和執行各種管理任務。 
NSQ結構圖

這篇文章介紹主要介紹nsqd的實現。

Topic與Channel

Topic與Channel是NSQ中重要的兩個概念。 
生產者將消息寫到Topic中,一個Topic下可以有多個Channel,每個Channel都是Topic的完整副本。 
消費者從Channel處訂閱消息,如果有多個消費者訂閱同一個Channel,Channel中的消息將被傳遞到一個隨機的消費者。

圖片標題

要理解Topic Channel中各種chan的作用,關鍵是要理解golang中如何在並發環境下如何操作一個結構體(多個goroutine同時操作topic),與C/C++多線程操作同一個結構體時加鎖(mutex,rwmutex)不同,go語言中一般是為這個結構體(topic,channel)開啟一個主goroutine(messagePump函數),所有對該結構體的改變的操作都應是該主goroutine完成的,也就不存在並發的問題了,其它goroutine如果想要改變這個結構體則應該向結構體提供的chan中發送消息(msgchan)或者通知(exitchan,updatechan),主goroutine會一直監聽所有的chan,當有消息或者通知到來時做相應的處理。

數據的持久化

了解數據的持久化之前,我們先來看兩個問題? 
1. 往Topic中寫入消息就是將消息發送到Topic.memoryMsgChan中,但是memoryMsgChan是一個固定內存大小的內存隊列,如果隊列滿了怎么辦呢?會阻塞嗎? 
2. 如果消息都存放在memoryMsgChan這個內存隊列中,程序退出了消息就全部丟失了嗎?

NSQ是如何解決的呢,nsq在創建Topic、Channel的時候都會創建一個DiskQueue,DiskQueue負責向磁盤文件中寫入消息、從磁盤文件中讀取消息,是NSQ實現數據持久化的最重要結構。 
以Topic為例,如果向Topic.memoryMsgChan寫入消息但是memoryMsgChan已滿時,nsq會將消息寫到topic.DiskQueue中,DiskQueue會負責將消息內存同步到磁盤上。 
如果從Topic.memoryMsgChan中讀取消息時,但是memoryMsgChan並沒有消息時,就從topic.DiskQueue中取出同步到磁盤文件中的消息。

我們看到topic.backend(diskQueue)負責將消息寫到磁盤並從磁盤中讀取消息,diskQueue提供了兩個chan供外部使用:readChan與writeChan。 
我們來看下diskQueue實現中的幾個要點。

  1. diskQueue在創建時會開啟一個goroutine,從磁盤文件中讀取消息寫到readChan中,外部goroutine可以從readChan中獲取消息;隨時監聽writeChan,當有消息時從wirtechan中取出消息,寫到本地磁盤文件。
  2. diskQueue既要提供文件的讀服務又要提供文件的寫服務,所以要記錄下文件的讀位置(readIndex),寫位置(writeIndex)。每次從文件中讀取消息時使用file.Seek(readindex)定位到文件讀位置然后讀取消息信息,每次往文件中寫入消息時都要file.Seek(writeIndex)定位到寫位置再將消息寫入。
  3. readIndex,writeIndex很重要,程序退出時要將這些信息(meta data)寫到另外的磁盤文件(元信息文件)中,程序啟動時首先讀取元信息文件,在根據元信息文件中的readIndex writeIndex操作存儲信息的文件。
  4. 由於操作系統層也有緩存,調用file.Write()寫入的信息,也可能只是存在緩存中並沒有同步到磁盤,需要顯示調用file.sync()才可以強制要求操作系統把緩存同步到磁盤。可以通過指定創建diskQueue時傳入的syncEvery,syncTimeout來控制調用file.sync()的頻率。syncTimeout是指每隔syncTimeout秒調用一次file.sync(),syncEvery是指每當寫入syncEvery個消息后調用一次file.sync()。這兩個參數都可以在啟動nsqd程序時通過命令行指定。

網絡架構

nsq是一個可靠的、高性能的服務端網絡程序,通過閱讀nsqd的源碼來學習如何搭建一個可靠的網絡服務端程序。

客戶端已成功的與服務器建立鏈接了,每一個客戶端建立連接后,nsqd都會創建一個Client接口體,該結構體內保存一些client的狀態信息。 
每一個Client都會有兩個goroutine,一個goroutine負責讀取客戶端主動發送的各種命令,解析命令,處理命令並將處理結果回復給客戶端。 
另一個goutine負責定時發送心跳信息給客戶端,如果客戶端訂閱某個channel的話則將channel中的將消息通過網絡發送給客戶端。

如果服務端不需要主動推送大量消息給客戶端,一個連接只需要開一個goroutine處理請求並發送回復就可以了,這是最簡單的方式。開啟兩個goroutine操作同一個conn的話就需要注意加鎖了。

我們來看下NSQ中幾個比較重要的命令:

  • NOP 心跳回復,沒有實際意義
  • PUB 發布一個消息到 話題(topic) 
    PUB <topic_name>\n [ 四字節消息的大小 ][ 消息的內容 ]
  • SUB 訂閱話題(topic) /通道(channel) 
    SUB <topic_name> <channel_name>\n
  • RDY 更新 RDY 狀態 (表示客戶端已經准備好接收N 消息) 
    RDY <count>\n
  • FIN 完成一個消息 (表示成功處理) 
    FIN <message_id>\n

生產者產生消息的過程比較簡單,就是一個PUB命令,先讀取四字節的消息大小,然后根據消息大小讀取消息內容,然后將內容寫到topic.MessageChan中。 
我們重點來看下消費者是如何從nsq中讀取消息的。 
1. 消費者首先需要發送SUB命令,告訴nsqd它想訂閱哪個Channel,然后nsqd將該Client與Channel建立對應關系。 
2. 消費者發送RDY命令,告訴服務端它以准備好接受count個消息,服務端則向消費者發送count個消息,如果消費者想繼續接受消息就需要不斷發送RDY命令告訴服務端自己准備好接受消息(類似TCP協議中滑動窗口的概念,消費者並不是按照順序一個個的消費消息,NSQD最多可以同時count個消息給消費者,每推送給消費者一個消息count數目減一,當消費者處理完消息回復FIN指令時count+1)。



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM