簡介:
RocketMQ作為一款純java、分布式、隊列模型的開源消息中間件,支持事務消息、順序消息、批量消息、定時消息、消息回溯等。
發展歷程:
1. Metaq(Metamorphosis) 1.x
由開源社區killme2008維護,開源社區非常活躍。https://github.com/killme2008/Metamorphosis
2. Metaq 2.x
於2012年10月份上線,在淘寶內部被廣泛使用。
3. RocketMQ 3.x
Metaq 3.0發布時,產品名稱改為RocketMQ。基於公司內部開源共建原則,RocketMQ項目只維護核心功能,且去除了所有其他運行時的依賴,核心功能最簡化。每個BU的個性化需求都在RocketMQ項目之上進行深度定制。RocketMQ向其他BU提供的僅僅是jar包,例如要定制一個Broker,那么只需要依賴rocketmq-broker這個jar包即可,可通過API進行交互,如果定制client,則依賴rocketmq-client這個jar包,對其提供的api進行再封裝。
特點:
- 支持發布/訂閱(Pub/Sub)和點對點(P2P)消息模型
- 在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞 (RocketMQ可以保證嚴格的消息順序,而ActiveMQ無法保證)
- 支持拉(pull)和推(push)兩種消息模式 (Push好理解,比如在消費者端設置Listener回調;而Pull,控制權在於應用,即應用需要主動的調用拉消息方法從Broker獲取消息,這里面存在一個消費位置記錄的問題(如果不記錄,會導致消息重復消費))
- 單一隊列百萬消息的堆積能力 (RocketMQ提供億級消息的堆積能力,這不是重點,重點是堆積了億級的消息后,依然保持寫入低延遲)
- 支持多種消息協議,如 JMS、MQTT 等
- 分布式高可用的部署架構,滿足至少一次消息傳遞語義 (RocketMQ原生就是支持分布式的,而ActiveMQ原生存在單點性)
- 提供 docker 鏡像用於隔離測試和雲集群部署
- 提供配置、指標和監控等功能豐富的 Dashboard
在Metaq1.x/2.x的版本中,分布式協調采用的是Zookeeper,而RocketMQ自己實現了一個NameServer,更加輕量級,性能更好!
- 組(Group)有Producer/Consumer Group。 ActiveMQ中並沒有Group這個概念,而在RocketMQ中理解Group的機制很重要。通過Group機制,讓RocketMQ天然的支持消息負載均衡!比如某個Topic有9條消息,其中一個Consumer Group有3個實例(3個進程 OR 3台機器),那么每個實例將均攤3條消息!(注意RocketMQ只有一種模式,即發布訂閱模式。)
- 消息失敗重試機制、高效的訂閱者水平擴展能力、強大的API、事務機制等等
專業術語
Producer
消息生產者,生產者的作用就是將消息發送到 MQ,生產者本身既可以產生消息,如讀取文本信息等。也可以對外提供接口,由外部應用來調用接口,再由生產者將收到的消息發送到 MQ。
Producer Group
生產者組,簡單來說就是多個發送同一類消息的生產者稱之為一個生產者組。在這里可以不用關心,只要知道有這么一個概念即可。
Consumer
消息消費者,簡單來說,消費 MQ 上的消息的應用程序就是消費者,至於消息是否進行邏輯處理,還是直接存儲到數據庫等取決於業務需要。
Consumer Group
消費者組,和生產者類似,消費同一類消息的多個 consumer 實例組成一個消費者組。
Topic
Topic 是一種消息的邏輯分類,比如說你有訂單類的消息,也有庫存類的消息,那么就需要進行分類,一個是訂單 Topic 存放訂單相關的消息,一個是庫存 Topic 存儲庫存相關的消息。
Message
Message 是消息的載體。一個 Message 必須指定 topic,相當於寄信的地址。Message 還有一個可選的 tag 設置,以便消費端可以基於 tag 進行過濾消息。也可以添加額外的鍵值對,例如你需要一個業務 key 來查找 broker 上的消息,方便在開發過程中診斷問題。
Tag
標簽可以被認為是對 Topic 進一步細化。一般在相同業務模塊中通過引入標簽來標記不同用途的消息。
Broker
Broker 是 RocketMQ 系統的主要角色,其實就是前面一直說的 MQ。Broker 接收來自生產者的消息,儲存以及為消費者拉取消息的請求做好准備。
Name Server
Name Server 為 producer 和 consumer 提供路由信息。
RocketMQ概述 (轉載copy聲明: https://blog.csdn.net/synsdeng/article/details/77966125 僅供學習,非商業用途 synsdeng)
RocketMQ是一個分布式消息中間件,並支持事務消息、順序消息、批量消息、定時消息、消息回溯等。它里面有幾個區別於標准消息中件間的概念,如Group、Topic、Queue等。系統組成則由Producer、Consumer、Broker、NameServer等。
RocketMQ以Topic來管理不同應用的消息,對於生產者(producer)而言,發送消息時需要指定消息的Topic,對於消費者(consumer)而言,在啟動后需要訂閱相應的Topic,然后可以消費相應的消息。Topic是邏輯上的概念,在物理實現上,一個Topic由多個Queue組成,采用多個Queue的好處是可以將Broker存儲分布式化,提高系統性能。
NameServer的功能,在RocketMQ的前身是使用ZooKeeper。NameServer用於管理所有Broker節點信息,接收Broker的注冊/注銷請求,此外還記錄了Topic與Broker、Queue的對應關系,Broker主備信息。BrokerId為0代表是MasterBroker,否則BrokerId大於0的表示為SlaveBroker,Master和Slave組成一個Broker,具有相同的brokerName。Broker在啟動的時候會去NameServer進行注冊,會維護Broker的存活狀態,Broker每次發送心跳過來的時候都會把Topic信息帶上。NamesrvStartUp為啟動類、NamesrvController為控制類、RouteInfoManager存放了Topic隊列信息以及地址列表等一系列重要數據結構並提供了對應的數據變更接口、DefaultRequestProcessor負責處理所broker發過來的所有網絡消息。各NameServer之間是相互獨立且沒有通信的,通過給Broker的namesrvAddr配置多個NameServer地址,同時向多個NameServer注冊信息來實現NameServer集群。因為NameServer讀寫壓力比較小,所以穩定性較高。相應的生產者/消費者中的namesrvAddr也是配置多個。
Broker,每個Broker都會和NameServer建立一個長連接保持心跳。一個Topic分布在多個Broker上,一個Broker可以配置多個Topic。由於消息分布在各個Broker上,一旦某個Broker宕機,則該Broker上的消息讀寫都會受到影響。所以需要HA機制,RocketMQ的實現方式是master/slave,salve定時從master同步數據,如果master宕機,則slave提供消費服務,但是不能寫入消息。一旦某個broker master宕機,生產者和消費者多久才能發現?受限於rocketmq的網絡連接機制,默認情況下,最多需要30秒,但這個時間可由應用設定參數來縮短時間。這個時間段內,發往該broker的消息都是失敗的,而且該broker的消息無法消費,因為此時消費者不知道該broker已經掛掉。消費者得到master宕機通知后,轉向slave消費,但是slave不能保證master的消息100%都同步過來了,因此會有少量的消息丟失。但是消息最終不會丟的,一旦master恢復,未同步過去的消息會被消費掉。
SendMessageProcessor處理所有發往broker的消息,由BrokerController調用DefaultMessageStore來保存消息(processRequest -> sendMessage/sendBatchMessage -> getMessageStore().putMessage)。消息體由CommitLog記錄,首先會判斷是否為延遲消息,如果是則會改寫Topic,並保存好真實的Topic信息,然后寫入對應的MappedFile(MappedByteBuffer)。如果是異步刷盤,異步同步Slave則消息到這里就算是記錄完了,直接返回producer成功。異步刷盤時,只有機器宕機,才會產生消息丟失,broker掛掉可能會發生,但是機器宕機崩潰是很少發生的,除非突然斷電。如果是同步刷盤,消息寫入物理文件才會返回成功,刷盤本質其實就是調用MappedByteBuffer.force。HA是由master/slave實現,這個也分同步還是異步。然后還有后台線程異步的把CommitLog文件同步到ConsumeQueue中,ConsumeQueue是CommitLog的索引,它記錄了消息在CommitLog中的位置。Producer對應CommitLog,發送的消息寫入CommitLog,Consumer對應ConsumeQueue,消費對應的ConsumeQueue隊列。
無論CommitLog,還是ConsumeQueue,都有一個對應的MappedFileQueue,也就是對應的內存映射文件的鏈表,對外提供一個邏輯上的文件。MapedFileQueue包含了很多MapedFile(AllocateMappedFileService負責創建MappedFile),以及MapedFile的真實大小,MapedFile包含了具體的文件信息,包括文件路徑、文件名、文件起始偏移、寫位移、讀位移,刷盤位移等等信息,同時使用了虛擬內存映射來提高IO效率(MappedByteBuffer)。MapedFile的文件名就是消息在此文件的中初始偏移量(文件的起始偏移量),MapedFile鏈表邏輯上是連續的,就是靠這個機制實現。一個PageSize默認為4k,對應Linux的PageCache緩存大小,一個MapedFile默認最大為1G(所以一個消息最大也是1G,在MessageStoreConfig中配置),異步刷盤線程默認1s觸發一次,但是要滿4頁(16k)才會刷盤,或10s做一次強制刷盤(FlushRealTimeService異步CommitLog刷盤,GroupCommitService同步CommitLog刷盤)。讀寫時根據offset定位到鏈表中,對應的MappedFile進行讀寫。通過MappedFile,就很好的解決了大文件隨機讀的性能問題。MappedFile繼承自ReferenceResource,它里面實現了一個引用計數,獲取和釋放都要增減這個計數,當引用計數為0的時候就會回調cleanup方法。MappedFile的cleanup實現就是通過反射調用cleaner().clean()以釋放映射內存,cleaner方法是在DirectByteBuffer里,MappedByteBuffer實現類就為DirectByteBuffer,但DirectByteBuffer是package的,外面並不能訪問到。
一台Broker上所有消息(不管是什么Topic)都是記錄在一個CommitLog上,CommitLog里面記錄了每條消息的消費情況,是否被消費,由誰消費(queueid),該消息是否持久化等信息,每條消息的長度是不一樣的。同步Slave是由一個單獨的線程順序的同步CommitLog文件,所謂的等待同步Slave成功后才返回,實質是等待同步線程下標到了指定下標而已,所以Master和Slave的CommitLog文件內容及順序都是一致的。CommitLog中存儲的消息格式已經指定好該消息對應的topic及存到consumeQueue中對應的topic的那個隊列(queueid),究竟寫入那個consumequeue的那個queueid,這是由客戶端投遞消息的時候指定的,客戶端做的負載均衡,選擇不同queueid投遞。一般來說客戶端是輪詢queue投遞消息,但如果要保證消息順序,原理就是客戶端把相關消息投遞到同一個queueid,這樣消費者消費的時候就是順序讀取了。只要消息到了CommitLog,發送的消息也就不會丟,有后台線程異步的同步到ConsumeQueue,再由Consumer進行消費。ConsumeQueue是消息的邏輯隊列,相當於字典的目錄,用來指定消息在物理文件CommitLog上的位置。也就是說CommitLog只有一個(順序寫),但ConsumeQueue確有多個(隨機讀),ConsumeQueue與Topic對應。
CommitLog只有一個,寫入消息體的時候為保證消息順序寫入是會加鎖的,加鎖有兩種方式,一種是ReentrantLock,一種是自旋compareAndSet,加鎖的范圍只限定在寫入MappedFile中,刷盤及同步並不在加鎖范圍。
消息寫入CommitLog之后,會有單獨的線程任務(ReputMessageService)每隔1毫秒讀取CommitLog文件,把新的消息信息分別調用CommitLogDispatcherBuildConsumeQueue及CommitLogDispatcherBuildIndex的dispatch方法加入到ConsumeQueue及IndexService中去。IndexService以Message Key構建了索引,可以通過Key來過濾查詢消息。ConsumeQueuek中的消息都是定長的(20字節),消息數量也是固定的(也就是物理文件是固定大小),物理文件名字和CommitLog一樣都是開始偏移量。一個ConsumeQueue只對應一個Topic,包含了消息在CommitLog的開始位置、大小以及tagsCode,只所以使用tag的HashCode就是為了保持長度固定,為保證准確在后面使用tag過濾的時候還會再進行一次字符串比較。消費者消費的時候是先從指定的ConsumeQueue中拉取消息ID以及進行一次簡單的Tag過濾(如果需要的話),然后再一次的讀取CommitLog文件獲取真正的消息體。
如果消息是事務消息且狀態是PREPARED或ROLLBACK的,則不會同步到ConsumeQueue中,見CommitLogDispatcherBuildConsumeQueue.dispatch,只有當消息不是事務消息或事務狀態為COMMIT時才會同步到ConsumeQueue。事務消息是基於二階段提交:一階段,向Broker發送一條PREPARED,記錄在CommitLog中並返回消息偏移位置,本地事務需要提供一個RocketMQ的回調(LocalTransactionExecuter),用於回調執行本地事務。二階段,處理完本地事務后,返回本地事務狀態,根據狀態(COMMIT或ROLLBACK)去設置消息,然后添加到CommitLog中,最后同步到ConsumeQueue進行消費(事務消息記錄了兩次CommitLog一次ConsumeQueue)。
如果消息是延遲消息,則消息會先投遞到SCHEDULE_TOPIC_XXXX中(消息內容也一樣是記錄在CommitLog),這個topic有若干隊列, 每一個隊列對應了一個延遲level(延遲時間並不是精准的),會有一個任務(ScheduleMessageService)去輪詢這些隊列,等時間到了則把消息重新寫入到原來的Topic,然后同步到ConsumeQueue中(延遲消息投遞了兩次,即兩次CommitLog兩次ConsumeQueue)。
PullMessageProcessor處理消費者的請求,由BrokerController調用DefaultMessageStore.getMessage來從指定的Topic下的QueueId隊列的QueueOffset下標開始拉取一批消息。首先根據Topic及QueueId定位到ConsumeQueue,然后根據QueueOffset獲取到MappedFile並且返回指定位置開始的內存映射對象SelectMappedBufferResult。然后開始從指定位置遍歷ConsumeQueue,經過濾Tags后,從CommitLog指定位置獲取消息體,如果有過濾表達式則過濾,通過后把消息加入到結果列表。如果消息體過大,Master剩余物理內存不夠,或者開啟Slave讀取消息,則會設置讓客戶端從Slave拉取消息。如果需要在發送消費消息前進行什么處理,可以注冊ConsumeMessageHook,默認沒有。最后向客戶端寫入消息內容,寫入消息內容有兩種方式:一種是把消息內容讀取出來返回,還有一種是使用Netty的FileRegion領拷貝機制直接把內容從堆外內存中返回,默認為第一種讀取到堆內存返回,這里是不是因為節省的大量小消息的復制還不如堆外內存創建的開銷。RocketMQ拉取消息是長輪詢,如果沒有查詢到消息,條件滿足的話會掛起請求。
部分內容轉自:
作者:馮先生的筆記
鏈接:https://www.jianshu.com/p/824066d70da8
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並注明出處。