Topic
一個Topic是一個主題。一個系統中,我們可以將消息划成Topic,這樣,將不同的消息發送到不同的queue。
Queue
一個topic下,我們可以設置多個queue,每個queue就是我們平時所說的消息隊列;因為queue是完全從屬於某個特定的topic的,所以當我們要發送消息時,總是要指定該消息所屬的topic是什么。然后equeue就能知道該topic下有幾個queue了。但是到底發送到哪個queue呢?比如一個topic下有4個queue,那對於這個topic下的消息,發送時,到底該發送到哪個queue呢?那必定有個消息被路由的過程。目前equeue的做法是在發送一個消息時,需要用戶指定這個消息對應的topic以及一個用來路由的一個object類型的參數。equeue會根據topic得到所有的queue,然后根據該object參數通過hash code然后取模queue的個數最后得到要發送的queue的編號,從而知道該發送到哪個queue。這個路由消息的過程是在發送消息的這一方做的,也就是下面要說的producer。之所以不在消息服務器上做是因為這樣可以讓用戶自己決定該如何路由消息,具有更大的靈活性。
注意topic與queue的關系如下圖所示:

Producer
就是消息隊列的生產者。我們知道,消息隊列的本質就是實現了publish-subscribe的模式,即生產者-消費者模式。生產者生產消息,消費者消費消息。所以這里的Producer就是用來生產和發送消息的。
Consumer
就是消息隊列的消費者,一個消息可以有多個消費者。
Consumer Group
消費者分組,這可能對大家來說是一個新概念。之所以要搞出一個消費者分組,是為了實現下面要說的集群消費。一個消費者分組中包含了一些消費者,如果這些消費者是要集群消費,那這些消費者會平均消費該分組中的消息。
Broker
equeue中的broker負責消息的中轉,即接收producer發送過來的消息,然后持久化消息到磁盤,然后接收consumer發送過來的拉取消息的請求,然后根據請求拉取相應的消息給consumer。所以,broker可以理解為消息隊列服務器,提供消息的接收、存儲、拉取服務。可見,broker對於equeue來說是核心,它絕對不能掛,一旦掛了,那producer,consumer就無法實現publish-subscribe了。
集群消費
集群消費是指,一個consumer group下的consumer,平均消費topic下的queue。具體如何平均可以看一下下面的架構圖,這里先用文字簡單描述一下。假如一個topic下有4個queue,然后當前有一個consumer group,該分組下有4個consumer,那每個consumer就被分配到該topic下的一個queue,這樣就達到了平均消費topic下的queue的目的。如果consumer group下只有兩個consumer,那每個consumer就消費2個queue。如果有3個consumer,則第一個消費2個queue,后面兩個每個消費一個queue,從而達到盡量平均消費。所以,可以看出,我們應該盡量讓consumer group下的consumer的數目和topic的queue的數目一致或成倍數關系。這樣每個consumer消費的queue的數量總是一樣的,這樣每個consumer服務器的壓力才會差不多。當前前提是這個topic下的每個queue里的消息的數量總是差不多多的。這點我們可以對消息根據某個用戶自己定義的key來進行hash路由來保證。
廣播消費
廣播消費是指一個consumer只要訂閱了某個topic的消息,那它就會收到該topic下的所有queue里的消息,而不管這個consumer的group是什么。所以對於廣播消費來說,consumer group沒什么實際意義。consumer可以在實例化時,我們可以指定是集群消費還是廣播消費。
消費進度(offset)
消費進度是指,當一個consumer group里的consumer在消費某個queue里的消息時,equeue是通過記錄消費位置(offset)來知道當前消費到哪里了。以便該consumer重啟后繼續從該位置開始消費。比如一個topic有4個queue,一個consumer group有4個consumer,則每個consumer分配到一個queue,然后每個consumer分別消費自己的queue里的消息。equeue會分別記錄每個consumer對其queue的消費進度,從而保證每個consumer重啟后知道下次從哪里開始繼續消費。實際上,也許下次重啟后不是由該consumer消費該queue了,而是由group里的其他consumer消費了,這樣也沒關系,因為我們已經記錄了這個queue的消費位置了。所以可以看出,消費位置和consumer其實無關,消費位置完全是queue的一個屬性,用來記錄當前被消費到哪里了。另外一點很重要的是,一個topic可以被多個consumer group里的consumer訂閱。不同consumer group里的consumer即便是消費同一個topic下的同一個queue,那消費進度也是分開存儲的。也就是說,不同的consumer group內的consumer的消費完全隔離,彼此不受影響。還有一點就是,對於集群消費和廣播消費,消費進度持久化的地方是不同的,集群消費的消費進度是放在broker,也就是消息隊列服務器上的,而廣播消費的消費進度是存儲在consumer本地磁盤上的。之所以這樣設計是因為,對於集群消費,由於一個queue的消費者可能會更換,因為consumer group下的consumer數量可能會增加或減少,然后就會重新計算每個consumer該消費的queue是哪些,這個能理解的把?所以,當出現一個queue的consumer變動的時候,新的consumer如何知道該從哪里開始消費這個queue呢?如果這個queue的消費進度是存儲在前一個consumer服務器上的,那就很難拿到這個消費進度了,因為有可能那個服務器已經掛了,或者下架了,都有可能。而因為broker對於所有的consumer總是在服務的,所以,在集群消費的情況下,被訂閱的topic的queue的消費位置是存儲在broker上的,存儲的時候按照不同的consumer group做隔離,以確保不同的consumer group下的consumer的消費進度互補影響。然后,對於廣播消費,由於不會出現一個queue的consumer會變動的情況,所以我們沒必要讓broker來保存消費位置,所以是保存在consumer自己的服務器上。
broker配置文件說明
lstorePathRootDir,數據存放的根目錄
lstorePathCommitLog,commitlog存放的路徑
lmapedFileSizeCommitLog = 1024 * 1024 * 1024,每個commitlog大小,默認為1G
lmapedFileSizeConsumeQueue
= 300000 * 20,消費隊列文件大小,默認為存儲30W條消息,每條消息20個字節,詳細參考ConsumeQueue
lflushIntervalCommitLog = 1000,commit log刷盤間隔,默認1秒
lflushCommitLogTimed =
false,是否定時刷盤,默認為實時刷盤,詳細請參考CommitLog
lflushIntervalConsumeQueue
= 1000,消費隊列刷盤間隔,默認為1秒
lfileReservedTime = 72,文件保留時間(單位小時),默認為3天
ldeleteWhen = “04”,何時觸發刪除文件,默認凌晨4點刪除文件
lBrokerRole brokerRole =
BrokerRole.ASYNC_MASTER,broker的角色:異步復制的master,同步雙寫的master,slave
lflushDiskType =
FlushDiskType.ASYNC_FLUSH,刷盤:同步,異步
lsyncFlushTimeout同步刷盤超時時間,默認為5秒
l13 messageDelayLevel定時消息級別,默認為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m
9m 10m 20m 30m 1h 2h分別對應1~18級
同一個訂閱組內不同Consumer實例訂閱不同topic消費混亂問題分析說明
兩個應用的消費者使用相同的consumer group,但消費不同的topic,造成兩個應用各有一半消息丟失。原因:offset是共用的,offset只與groupname有關。如下官方文檔截圖:

作者:jackcooper
鏈接:https://www.jianshu.com/p/cc108aeb08ac
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並注明出處。