nsq的源碼比較簡單,值得一讀,特別是golang開發人員,下面重點介紹nsqd,nsqd是nsq的核心,其他的都是輔助工具,看完這篇文章希望你能對消息隊列的原理和實現有一定的了解。
nsqd是一個守護進程,負責接收,排隊,投遞消息給客戶端,並不保證消息的嚴格順序,nsqd默認監聽一個tcp端口 (4150) 和一個http端口 (4151) 以及一個可選的https端口
對訂閱了同一個topic的同一個channel的消費者使用負載均衡策略,其實就是多個協程消費同一個channel
只要channel存在,即使沒有該channel的消費者,也會將生產者的message緩存到隊列(內存隊列和磁盤隊列)中,當有新的消費者產生后,就開始消費隊列中的所有消息
保證隊列中的 message 至少會被消費一次(在進程意外退出的時候這點都保證不了),並不能保證成功消費一次,即使 nsqd退出,也會將隊列中的消息暫存磁盤上(進程退出的時候會將緩存中的消息存到磁盤上,意外情況如掉電就不行了,緩存中的消息就沒有機會存盤而丟失,在實戰中一般不會使用緩存隊列即內存buffer為0,全部使用磁盤隊列)
限定內存占用,能夠配置nsqd中每個channel隊列在內存中緩存的message數量,一旦channel的buffer寫滿,就將message寫到磁盤中,這點使用golang select的優先級功能,default優先級最低
topic,channel 一旦建立,將會一直存在,要及時在管理台或者用代碼清除無效的 topic 和 channel,避免資源的浪費,每個topic和channel都有獨立的協程處理自身的消息,默認的buffer和其他的一些信息
nsq消息沒有備份,一旦出現進程意外情況退出,可能會出現消息丟失,如沒有消費成功的消息,寫入文件但沒有真正落盤的消息,這種意外情況很難杜絕,像意外退出這種情況kafka,redis等都會遇到這樣的問題,最后都會采用一個折中的策略,定時將數據落盤
//原文:https://www.cnblogs.com/hlxs/p/11445103.html 作者:啊漢
type Topic struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms messageCount uint64 //消息總數量 messageBytes uint64 //消息總長度 sync.RWMutex name string //topic name channelMap map[string]*Channel //保存topic下面的所有channel backend BackendQueue //磁盤隊列 memoryMsgChan chan *Message //內存隊列 startChan chan int exitChan chan int channelUpdateChan chan int waitGroup util.WaitGroupWrapper exitFlag int32 //退出標記 idFactory *guidFactory //生成msg id的工廠 ephemeral bool //是否臨時topic deleteCallback func(*Topic) //刪除topic方法指針 deleter sync.Once paused int32 //暫停標記,1暫停, 0正常 pauseChan chan int ctx *context }
Topic創建
nsqd用map[string]*Topic來保存所有topic,producter在發消息的時候回指定topic,nsqd在收到消息后會判斷topic是否存在,不存在就會自動創建,每創建一個新的topic就會啟動一個協程,用於處理topic相關的消息,如將內存/磁盤中的消息復制給topic中的每個channel、channel數量變化、channel暫停、topic退出
消息結構
// Command represents a command from a client to an NSQ daemon
//原文:https://www.cnblogs.com/hlxs/p/11445103.html 作者:啊漢
type Command struct { Name []byte //命令名稱,可選:IDENTIFY、FIN、RDY、REQ、PUB、MPUB、DPUB、NOP、TOUCH、SUB、CLS、AUTH Params [][]byte //不同的命令做不同解析,涉及到topic的,Params[0]為topic name Body []byte //消息內容 } // WriteTo implements the WriterTo interface and // serializes the Command to the supplied Writer. // // It is suggested that the target Writer is buffered // to avoid performing many system calls. func (c *Command) WriteTo(w io.Writer) (int64, error) { var total int64 var buf [4]byte n, err := w.Write(c.Name) //命名名稱,nsqd根據這個名稱執行相關功能 total += int64(n) if err != nil { return total, err } for _, param := range c.Params { n, err := w.Write(byteSpace) //空格 total += int64(n) if err != nil { return total, err } n, err = w.Write(param) //參數 total += int64(n) if err != nil { return total, err } } n, err = w.Write(byteNewLine) //空行\n total += int64(n) if err != nil { return total, err } //消息內容 if c.Body != nil { bufs := buf[:] binary.BigEndian.PutUint32(bufs, uint32(len(c.Body))) n, err := w.Write(bufs) //消息長度4字節 total += int64(n) if err != nil { return total, err } n, err = w.Write(c.Body) //消息內容 total += int64(n) if err != nil { return total, err } } return total, nil }
nsqd收到這個結構做解析,就能知道命令名稱(干什么),topic name,消息內容等,不同的命令,命令參數不一樣
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) { if bytes.Equal(params[0], []byte("IDENTIFY")) { return p.IDENTIFY(client, params) } err := enforceTLSPolicy(client, p, params[0]) if err != nil { return nil, err } switch { case bytes.Equal(params[0], []byte("FIN")): return p.FIN(client, params) case bytes.Equal(params[0], []byte("RDY")): return p.RDY(client, params) case bytes.Equal(params[0], []byte("REQ")): return p.REQ(client, params) case bytes.Equal(params[0], []byte("PUB")): return p.PUB(client, params) case bytes.Equal(params[0], []byte("MPUB")): return p.MPUB(client, params) case bytes.Equal(params[0], []byte("DPUB")): return p.DPUB(client, params) case bytes.Equal(params[0], []byte("NOP")): return p.NOP(client, params) case bytes.Equal(params[0], []byte("TOUCH")): return p.TOUCH(client, params) case bytes.Equal(params[0], []byte("SUB")): return p.SUB(client, params) case bytes.Equal(params[0], []byte("CLS")): return p.CLS(client, params) case bytes.Equal(params[0], []byte("AUTH")): return p.AUTH(client, params) } return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) }
Topic收到消息
nsqd收到上面這個結構,解析之后,就會執行相關功能,我們以PUB命令為例:
1:讀到空行處,能拿到命令名稱和參數,命令名稱=PUB,命令參數為topicName
2:檢查topicName是否有效
3:獲取消息內容長度,讀取4個字節
4:分配對應內容長度空間,讀取對應長度字節存入
5:獲取topicName信息,沒有就創建
6:構造消息結構體nsqd.Message,自動生成消息id
7:將消息提交給對應的topic,Topic.PutMessage
8:將消息寫入topic對應的內存消息通道,內存消息通道默認大小為10000,如通道滿了則寫入磁盤
Topic中的消息分發給channel
在創建topic的時候回啟動一個協程處理各種消息,其中就包括消費topic中的消息,topic只是將消息投遞到其中的每個channel中,如topic下面有10個channel,則要復制9個nsqd.Message,每個channel一個nsqd.Message,但是消息id和消息內容是一樣的,消息內容並不會被復制,topic收到消息將消息分發給channel就完事了,消息怎么發給消費者,由channel負責
type Channel struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms requeueCount uint64 //重新入隊數量 messageCount uint64 //消息數量 timeoutCount uint64 //超時數量,已經消費,但沒有反饋結果,會重新加入隊列,messageCount不會自增 sync.RWMutex topicName string //topic name name string //channel name ctx *context backend BackendQueue //將消息寫入磁盤的隊列,維護磁盤消息的讀寫 memoryMsgChan chan *Message //內存消息隊列,通道buffer默認10000 exitFlag int32 //退出標記,1表示退出,0沒有退出 exitMutex sync.RWMutex // state tracking clients map[int64]Consumer //連接到這個topic-channel的所有client paused int32 //暫停標記,0不暫停,1暫停,暫停就不會往這個channel中copy消息 ephemeral bool //臨時channel標記,臨時channel不會存到文件中 deleteCallback func(*Channel) //用於從topic中刪除channel deleter sync.Once // Stats tracking e2eProcessingLatencyStream *quantile.Quantile // TODO: these can be DRYd up deferredMessages map[MessageID]*pqueue.Item //延遲消息map,方便查找 deferredPQ pqueue.PriorityQueue //延遲消息隊列 deferredMutex sync.Mutex inFlightMessages map[MessageID]*Message //消費中的消息map,方便查找 inFlightPQ inFlightPqueue //消費中的消息隊列 inFlightMutex sync.Mutex }
client訂閱topic消息
訂閱發送的還是Command這個結構,只不過訂閱沒有消息內容而已,指定topic和channel就行,如果topic和channel不存在都會自動創建,client和server建立的是tcp長連接,server會啟動兩個協程,一個用於發消息,一個用於接收消息,建立連接后,channel會把client加入它的map[int64]Consumer中,key為clientId,當topic收到消息后,會分發給channel,channel通過發消息的協程發給client
channel將消息推給消費者
channel中的消息存在兩個地方:內存通道和磁盤隊列,topic將消息分發給channel時,通過go的select將消息分發給內存通道或是磁盤隊列,由於select的default分支優先級比case低,所以只要內存通道沒滿,就會往內存通道中寫,否則就寫入磁盤,
diskqueue.diskQueue維護着磁盤數據的讀寫,每個非臨時的topic和channel都有這樣一個字段。
發消息的協程就會一直讀內存通道和磁盤隊列中的數據,將消息發給client
nsq消息類型有三種如下:
// frame types const ( FrameTypeResponse int32 = 0 //響應 FrameTypeError int32 = 1 //錯誤 FrameTypeMessage int32 = 2 //消息 )
消息發送給client之后,也不知道消息到底有沒有消費成功,有可能client收到消息之后就崩潰了,所以消息發給client之后,需要client給server發一個FIN消息告訴server,這個消息我消費成功,所以在將消息發送給client之后,消息出了內存隊列/磁盤隊列,進入了一個新的隊列,叫飛行隊列,表示這個消息正在運輸消費中,為了維護在消費中的消息,nsq使用了兩個數據結構:
type inFlightPqueue []*Message inFlightPQ inFlightPqueue //按照超時時間排序的最小堆 inFlightMessages map[MessageID]*Message //保存消息
消息發送給client之后,同時會將消息存入inFlightPQ和inFlightMessages中,inFlightPQ中的消息都設置了超時時間默認是1分鍾,如果1分鍾后還沒有收到client發過來的FIN消息,會將消息重新加入待消費隊列,讓client重新消費,目的是想保證每個消息至少被消費一次,由於消息可保存在內存中,進程可能隨時掛掉並不能保證每個消息都至少被消費一次,如果不用內存隊列,完全使用磁盤隊列,當進程意外崩掉的時候,消息是否丟失要看磁盤隊列的具體實現,完全使用磁盤隊列性能差點,安全性更高
inFlightMessages就是為了方便通過消息id查找消息,收到client發送過來的FIN消息時就會將消息從inFlightPQ和inFlightMessages中刪除,表示這個消息已經消費成功,數據也就被扔掉了
延遲消息
發延遲消息和發普通消息的區別是producter在生成延遲消息的時候指定了延遲時間,單位毫秒,命令:DPUB
延遲消息存在內存中,並沒有存到磁盤中,延遲消息要是存在磁盤中,實現起來還是比較復雜
延遲消息同樣使用了一個隊列和一個map,結構如下:
type Item struct { Value interface{} //*Message Priority int64 //執行的時間戳,單位毫秒 Index int //隊列索引 } type PriorityQueue []*Item deferredPQ pqueue.PriorityQueue deferredMessages map[MessageID]*pqueue.Item deferredPQ和inFlightPQ一樣,是按照時間排序的最小堆
那么nsq是怎么判斷消息超時,延遲消息的執行時間到了呢?
nsq有一個專門的協程來處理這兩種情況,實現也很簡單,就是每100毫秒檢查一次,看是否有超時的消息,延遲消息是否執行時間是否到了,如果消息超時,則重新將消息加入待消費隊列,每次將消息發送給client的時候,重試次數都會加一,即Message.Attempts++
延遲消息執行時間要是到了,就會當做一個普通的消息加入待消費隊列,后面的流程都是一樣的,默認最大延遲時間為1小時,所有的默認值在進程啟動時都是可重新指定的
nsqd啟動過程
1:加載啟動參數
啟動參數定義了結構nsqd.Options,並初始化好了默認值,在進程啟動的時候可以指定對應的值,通過反射將這些參數賦給nsqd.Options,通過nsqd.Options就能方便的使用各個參數
2:加載topic和channel並啟動
在nsqd啟動的時候會加載配置文件nsqd.dat,驗證topic和channel名稱格式是否有效,然后啟動所有topic,該暫停的就暫停,當topic和channel發生變更的時候回將所有信息重新保存到nsqd.dat中,如新增/刪除/暫停/啟動topic和channel會保存文件
topic和channel保存到文件中的結構
type meta struct { Topics []struct { Name string `json:"name"` Paused bool `json:"paused"` Channels []struct { Name string `json:"name"` Paused bool `json:"paused"` } `json:"channels"` } `json:"topics"` }
3:啟動tcp/http/https服務
nsq可以通過tcp和http通過服務,http和https提供的服務是一樣,區別在於協議本身,當client通過tcp和server建立連接后,server會啟動兩個協程,一個用於發消息,一個用於收消息
tcp提供的服務如下:
服務命令 | 服務描述 |
INENTIFY | 認證 |
FIN | 消費完成 |
RDY | 指定可同時處理的消息數量 |
REQ | 消息重新加入隊列 |
PUB | 發布單條消息 |
MPUB | 發布多條消息 |
DPUB | 發布單條延遲消息 |
NOP | 不做任何處理 |
TOUCH | 重新設置消息處理超時時間 |
SUB | 訂閱,訂閱后才能消費消息 |
CLS | 關閉停止消費 |
AUTH | 授權 |
client和server建立連接后,client通過命令INENTIFY將認證信息發給服務端,如果server在啟動的時候指定了授權地址,server就會告訴client你需要認證,client就會通過命令AUTH將秘鑰發給server,server去授權地址進行驗證,驗證通過后,就可以進行正常的消息發布和訂閱了
http和https提供服務如下:
服務名稱 |
發布單條/多條消息 |
topic新增/刪除/情況topic中消息/暫停/啟動 |
channel新增/刪除/情況topic中消息/暫停/啟動 |
nsq狀態信息 |
ping |
啟動參數查詢和修改 |
tcp服務能發布和消費消息,http/https則只能發布消息,發布消息最后調的是同一個接口
端口信息
協議名稱 | 默認端口 |
tcp | 4150 |
http | 4151 |
https | 4152 |
心跳
心跳默認30秒,在認證(INENTIFY)的時候client可以指定心跳時間間隔,server按照心跳給client發消息,消息內容:_heartbeat_,如果發送失敗,發送消息的協程就會退出,這樣server就不在給client發消息了,server如果從client讀消息失敗,接收消息的協程就會退出,關閉和client的連接,從channel中將client移除,這樣就不在收client發來的消息,server中也就沒有client的任何信息了
consumer和producter連着nsqd的同一個端口,為什么consumer能消費消息,而producter卻不會呢?
nsq是個基於發布和訂閱的消息隊列,只有訂閱了才能消費消息,consumer和producter雖然連着同一個端口,consumer在建立連接后,會發送SUB命令,告訴server我要訂閱,而producter並沒有,consumer在發送SUB命令后還會發送RDY命令告訴server能同時處理消息的個數,當rdyCount=0時,server也不會給consumer推消息,所以SUB和RDY這兩個命令缺一不可
nsq消息文件的存取
nsq可以將消息存在內存中或是文件中,存在內存的好處就是速度快,確定就是一旦進程退出消息就丟失了,所以在實戰中消息都會寫到磁盤文件,雖然慢點但不容易丟消息
封裝消息存取文件的實現在
github.com/nsqio/go-diskqueue/diskqueue.go中
topic收到消息后,可以將消息存在內存中或是文件中,當內存channel寫滿之后就會寫入文件,當我們把channel的buffer設置成0后,所有的消息就會寫文件
每個topic都會啟動一個協程將其收到的消息復制給其下面的每個channel,channel在將消息推送給consumer,channel收到topic發過來(函數調用)的消息,可將消息存入內存或是文件
消息寫入內存,topic下面的channel其實是共享一份數據,因為數據都是自讀的,而寫入文件卻是每個channel都有一組文件並將消息吸入,真正做到了讀時復制,每個topic和channel都會實例化一個diskQueue,其結構如下
// diskQueue implements a filesystem backed FIFO queue
//原文:https://www.cnblogs.com/hlxs/p/11445103.html 作者:啊漢
type diskQueue struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms // run-time state (also persisted to disk) readPos int64 //已經讀的位置 writePos int64 //已經寫的位置 readFileNum int64 //正在讀的文件編號 writeFileNum int64 //正在寫的文件編號 depth int64 //沒有消費的消息數量 sync.RWMutex // instantiation time metadata name string // topicName 或者 topicName + ":" + channelName dataPath string //存消息文件的目錄 maxBytesPerFile int64 // currently this cannot change once created minMsgSize int32 //消息最小值 maxMsgSize int32 //消息最大值 syncEvery int64 // number of writes per fsync syncTimeout time.Duration // duration of time per fsync exitFlag int32 //退出標記 needSync bool //強制將文件緩沖區的數據寫入磁盤 // keeps track of the position where we have read // (but not yet sent over readChan) nextReadPos int64 //下次讀的位置 nextReadFileNum int64 //下次讀的文件編號 readFile *os.File //正在讀的文件 writeFile *os.File //正在寫的文件 reader *bufio.Reader //讀緩沖區,默認4K writeBuf bytes.Buffer //寫緩沖區 // exposed via ReadChan() readChan chan []byte //讀channel // internal channels writeChan chan []byte //寫channel writeResponseChan chan error //寫結果通知 emptyChan chan int //刪除所有文件channel emptyResponseChan chan error //刪除通知channel exitChan chan int //退出channel exitSyncChan chan int //退出命令同步等待channel logf AppLogFunc //寫日志 }
文件名命名:目錄 + topicName:channelName + .diskqueue.000001.dat
func (d *diskQueue) fileName(fileNum int64) string { return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum) }
diskQueue在實例化的時候回初始化相關的屬性,當文件大小大於指定文件的最大值時,文件編號writeFileNum就會自增1,新來的消息就會寫入新的文件
按順序讀寫文件,每個消息寫文件的格式是:消息長度(4字節) + 消息內容,這樣讀消息也就很容易了,先讀4字節,知道消息的長度,接着讀消息內容,下一個消息也是這樣讀,當下一個消息讀的位置大於文件的最大值時說明這個文件讀完了,可以從下一個文件開始寫了,
寫文件是同步的,寫完之后直接反饋消息是否寫入成功,由於文件系統的緩存原因,系統並不是把消息馬上寫入磁盤,而是寫入了文件的緩沖區,所以需要定時的將文件緩沖區的內容寫入磁盤,nsq使用了兩個策略將文件緩沖區的內容寫入磁盤。兩個策略同時進行
1:默認每2500條消息強制將文件緩存內容寫入磁盤
2:默認每兩秒強制將文件緩存內容寫入磁盤
在將消息強制寫入磁盤的同時,也會將隊列當前狀態寫入另一個文件,若程序退出,下次啟動后就能正常進行文件的讀寫,寫入內容包括:
1:剩余消息數量
2:正在讀的文件編號
3:讀文件偏移量
4:正在寫的文件編號
5:寫文件偏移量
磁盤文件的刪除,如果一個文件中的消息全部被消費了,那這個文件將被刪除
斷開重連
斷開后如果不能自動重連,那就是死都不知道怎么死的,所以nsq是有斷開重連功能的
server短發現斷開后,不會自動重連,鬼知道你是不是主動斷開,所以server發現斷開了,就將client的相關信息完全刪除,就像client從沒有出現過
client斷開后會自動重連,client分consumer和producer
consumer自動重連:consumer作為消費者就是讀,所以當讀失敗的時候,consumer會關閉讀寫功能,就斷開連接,當consumer收到的所有消息處理完成后,就會自動重連,注意寫失敗並不會自動重連
producer自動重連:producer作為生產者就是寫,所以當寫失敗的時候,producer按照狀態來決定是否重連,如果發現狀態為非連接狀態就連接,收到斷開是不會重連的,在寫失敗的時候才會重連
參考資料
未完。。。