一、部署
- 目錄結構構建:
- docker-compose.yml
version: '3.5' services: rmqnamesrv: image: foxiswho/rocketmq:server container_name: rmqnamesrv ports: - 9876:9876 volumes: - ./data/logs:/opt/logs - ./data/store:/opt/store networks: rmq: aliases: - rmqnamesrv rmqbroker: image: foxiswho/rocketmq:broker container_name: rmqbroker ports: - 10909:10909 - 10911:10911 volumes: - ./data/logs:/opt/logs - ./data/store:/opt/store - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf environment: NAMESRV_ADDR: "rmqnamesrv:9876" JAVA_OPTS: " -Duser.home=/opt" JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m" command: mqbroker -c /etc/rocketmq/broker.conf depends_on: - rmqnamesrv networks: rmq: aliases: - rmqbroker rmqconsole: image: styletang/rocketmq-console-ng container_name: rmqconsole ports: - 10801:8080 environment: JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" depends_on: - rmqnamesrv networks: rmq: aliases: - rmqconsole networks: rmq: name: rmq driver: bridge
- data/brokerconf/broker.conf
version: '3.5' services: rmqnamesrv: image: foxiswho/rocketmq:server container_name: rmqnamesrv ports: - 9876:9876 volumes: - ./data/logs:/opt/logs - ./data/store:/opt/store networks: rmq: aliases: - rmqnamesrv rmqbroker: image: foxiswho/rocketmq:broker container_name: rmqbroker ports: - 10909:10909 - 10911:10911 volumes: - ./data/logs:/opt/logs - ./data/store:/opt/store - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf environment: NAMESRV_ADDR: "rmqnamesrv:9876" JAVA_OPTS: " -Duser.home=/opt" JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m" command: mqbroker -c /etc/rocketmq/broker.conf depends_on: - rmqnamesrv networks: rmq: aliases: - rmqbroker rmqconsole: image: styletang/rocketmq-console-ng container_name: rmqconsole ports: - 10801:8080 environment: JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" depends_on: - rmqnamesrv networks: rmq: aliases: - rmqconsole networks: rmq: name: rmq driver: bridge [root@CENTOS7 rocketmq]# cat data/brokerconf/broker.conf # 是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉 autoCreateSubscriptionGroup=true # Broker 對外服務的監聽端口 listenPort=10911 # 刪除文件時間點,默認凌晨4點 deleteWhen=04 # 文件保留時間,默認48小時 fileReservedTime=120 # commitLog 每個文件的大小默認1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue 每個文件默認存 30W 條,根據業務情況調整 mapedFileSizeConsumeQueue=300000 # destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 # 存儲路徑 # storePathRootDir=/home/hbc/rocketmq-all-4.1.0-incubating/store # commitLog 存儲路徑 # storePathCommitLog=/home/hbc/rocketmq-all-4.1.0-incubating/store/commitlog # 消費隊列存儲 # storePathConsumeQueue=/home/hbc/rocketmq-all-4.1.0-incubating/store/consumequeue # 消息索引存儲路徑 # storePathIndex=/home/hbc/rocketmq-all-4.1.0-incubating/store/index # checkpoint 文件存儲路徑 # storeCheckpoint=/home/hbc/rocketmq-all-4.1.0-incubating/store/checkpoint # abort 文件存儲路徑 # abortFile=/home/hbc/rocketmq-all-4.1.0-incubating/store/abort # 限制的消息大小 maxMessageSize=65536 # flushCommitLogLeastPages=4 # flushConsumeQueueLeastPages=2 # flushCommitLogThoroughInterval=10000 # flushConsumeQueueThoroughInterval=60000 # Broker 的角色 # - ASYNC_MASTER 異步復制Master # - SYNC_MASTER 同步雙寫Master # - SLAVE brokerRole=ASYNC_MASTER # 刷盤方式 # - ASYNC_FLUSH 異步刷盤 # - SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH # 發消息線程池數量 # sendMessageThreadPoolNums=128 # 拉消息線程池數量 # pullMessageThreadPoolNums=128 brokerIP1=xx:xx:xx:xx #部署機內網ip
- rocketmq目錄啟動:docker-compose up -d
- 訪問: http://your_ip:10801
二、客戶端使用
- 依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency>
- 生產者
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("agent-server"); // Specify name server addresses. producer.setNamesrvAddr("your_ip:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 1; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
- 消費者
import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // Instantiate with specified consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("agent-server"); // Specify name server addresses. consumer.setNamesrvAddr("your_ip:9876"); // Subscribe one more topics to consume. *表示訂閱所有tags consumer.subscribe("TopicTest", "*"); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started.%n"); } }
三、問題記錄
- 發消息一直彈出出現closeChannel: close the connection to remote address[] result: true:
原因: 沒配置brokerIP1和brokerIP2時,broker會根據當前網卡選擇一個IP監聽 解決:打開data/brokerconf/broker.conf 指定當前broker監聽的IP brokerIP1=192.168.29.18
四、 常用命令
1 創建/修改Topic: updateTopic 2 刪除:deleteTopic 3 創建/修改訂閱組:updateSubGroup 4 刪除訂閱組:deleteSubGroup 5 更新Broker配置:updateBrokerConfig 6 更新Topic讀寫權限:updateTopicPerm 7 查詢topic路由信息:TopicRoute 8 查看Topic列表信息:TopicList 9 查看Topic統計信息:TopicStatus 10 根據時間查詢消息:printMsg 11 根據消息id查詢消息:queryMsgById 12 查看集群消息:clusterList
sh bin/mqadmin TopicStatus -n localhost:9876 -t handsomecui.local
五、基本原理
- NameServer:整個集群的注冊中心和配置中心,管理集群的元數據。包括 Topic 信息和路由信息、Producer 和 Consumer 的客戶端注冊信息、Broker 的注冊信息。
- Broker:負責接收消息的生產和消費請求,並進行消息的持久化和消息的讀取。
- Producer:負責生產消息。
- Consumer:負責消費消息。
在實際生產和消費消息的過程中,NameServer 為生產者和消費者提供 Meta 數據,以確定消息該發往哪個 Broker 或者該從哪個 Broker 拉取消息。
有了 Meta 數據后,生產者和消費者就可以直接和 Broker 交互了。這種點對點的交互方式最大限度降低了消息傳遞的中間環節,縮短了鏈路耗時。
1. Producer:生產者,負責消息的生產和發送。與 NameServer 集群的一個節點建立長連接,定期從 NameServer 獲取 其訂閱的 Topic 的路由信息,然后向 Topic 所在的 broker 發送消息。 2. Consumer:消費者,負責消息的拉取和消費。Consumer 與 NameServer 集群的某個節點建立長連接,然后從 NameServer 上獲取可以消費的 Topic 中某個 MessageQueue 所在 Broker 的路由信息,
然后與其建立長連接,從而不斷的拉取消息進行消費(同一個ConsumerGroup下的所有Consumer消費的內容合起來才是所訂閱的Topic內容的整體,從而可以達到負載均衡的目的)。 3. NameServer:整個消息隊列的狀態服務器,集群的各個組件通過它來了解全局的信息,各個角色的機器會定期向 NameServer 上報自己的狀態,如果超時不上報,NameServer 會認為某個機器出故障不可用,
其他組件會把這個機器從可用列表中移除。NamerServer 可以部署多個,相互之間獨立,其他角色同時向多個機器上報狀態信息,從而達到熱備份的目的。 4. Broker是RocketMQ的核心,它負責接收來自Producer發過來的消息、處理Consumer的消費消息的請求、消息的持久化存儲、消息的HA機制以及消息在服務端的過濾。
六、網絡模型
- RocketMQ 使用 Netty 框架實現高性能的網絡傳輸。
- RocketMQ 的 Broker 端基於 Netty 實現了主從 Reactor 模型。架構如下:
-
具體流程: 1. eventLoopGroupBoss 作為 acceptor 負責接收客戶端的連接請求 2. eventLoopGroupSelector 負責 NIO 的讀寫操作 3. NettyServerHandler 讀取 IO 數據,並對消息頭進行解析 4. disatch 過程根據注冊的消息 code 和 processsor 把不同的事件分發給不同的線程。由 processTable 維護(類型為 HashMap)
七、實現原理
- 消息的生產
RocketMQ 支持三種消息發送方式:同步發送、異步發送和 One-Way 發送。One-Way 發送時客戶端無法確定服務端消息是否投遞成功,因此是不可靠的發送方式。 * 同步發送:注冊 ResponseFuture 到 responseTable,發送 Request 請求,並同步等待 Response 返回。 * 異步發送:注冊 ResponseFuture 到 responseTable,發送 Request 請求,不需要同步等待 Response 返回,當 Response 返回后會調用注冊的
Callback 方法,從而異步獲取發送的結果。 * One-Way:發送 Request 請求,不需要等待 Response 返回,不需要觸發 Callback 方法回調。1. 客戶端 API 調 DefaultMQProducer 的 send 方法進行消息的發送。 2. makeSureStateOk 檢查客戶端的發送服務是否 ok。RocketMQ 客戶端維護了一個單例的 MQClientInstance,
可通過 start 和 shutdown 來管理相關的網絡服務。 3. tryToFindTopicPublishInfo 用來獲取 Topic 的 Meta 信息,主要是可選的 MessageQueue 列表。 4. selectOneMessageQueue 根據當前的故障容錯機制,路由到一個特定的 MessageQueue。 5. sendKernelImpl 的核心方法是調用 NettyRemotingClient 的 sendMessage 方法,該方法中會根據用戶選擇的發送策略進行區別處理,
時序圖中只體現了同步發送的方式。 6. invokeSync 通過調用 Netty 的 channel.writeAndFlush 把消息的字節流發送到 TCP 的 Socket 緩沖區,至此客戶端消息完成發送。 -
消息的接收
1. Broker 通過 Netty 接收 RequestCode 為 SEND_MESSAGE 的請求,並把該請求交給 SendMessageProcessor 進行處理。 2. SendMessageProcessor 先解析出 SEND_MESSAGE 報文中的消息頭信息(Topic、queueId、producerGroup 等),並調用存儲層進行處理。 3. putMessage 中判斷當前是否滿足寫入條件:Broker 狀態為 running;Broker 為 master 節點;磁盤狀態可寫(磁盤滿則無法寫入);
Topic 長度未超限;消息屬性長度未超限;pageCache 未處於繁忙狀態(pageCachebusy 的依據是 putMessage 寫入 mmap 的耗時,
如果耗時超過 1s,說明由於缺頁導致頁加載慢,此時認定 pageCache 繁忙,拒絕寫入)。 4. 從 MappedFileQueue 中選擇已經預熱過的 MappedFile。 5. AppendMessageCallback 中執行消息的操作 doAppend,直接對 mmap 后的文件的 bytbuffer 進行寫入操作。
八、優化
- 自旋鎖減少上下文切換
RocketMQ 的 CommitLog 為了避免並發寫入,使用一個 PutMessageLock。PutMessageLock 有 2 個實現版本:PutMessageReentrantLock 和 PutMessageSpinLock。 PutMessageReentrantLock 是基於 java 的同步等待喚醒機制;PutMessageSpinLock 使用 Java 的 CAS 原語,通過自旋設值實現上鎖和解鎖。RocketMQ 默認使用
PutMessageSpinLock 以提高高並發寫入時候的上鎖解鎖效率,並減少線程上下文切換次數。 - MappedFile 預熱和零拷貝機制
RocketMQ 消息寫入對延時敏感,為了避免在寫入消息時,CommitLog 文件尚未打開或者文件尚未加載到內存引起的 load 的開銷,RocketMQ 實現了文件預熱機制。 Linux 系統在寫數據時候不會直接把數據寫到磁盤上,而是寫到磁盤對應的 PageCache 中,並把該頁標記為臟頁。當臟頁累計到一定程度或者一定時間后再把數據 flush 到磁盤(當然在此期間如果系統掉電,
會導致臟頁數據丟失)。RocketMQ 實現文件預熱的關鍵代碼如下: public void warmMappedFile(FlushDiskType type, int pages) { ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); int flush = 0; long time = System.currentTimeMillis(); for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) { byteBuffer.put(i, (byte) 0); // force flush when flush disk type is sync if (type == FlushDiskType.SYNC_FLUSH) { if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) { flush = i; mappedByteBuffer.force(); } } ... } // force flush when prepare load finished if (type == FlushDiskType.SYNC_FLUSH) { log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}", this.getFileName(), System.currentTimeMillis() - beginTime); mappedByteBuffer.force(); } this.mlock(); } 代碼分析 1. 對文件進行 mmap 映射。 2. 對整個文件每隔一個 PAGE_SIZE 寫入一個字節,如果是同步刷盤,每寫入一個字節進行一次強制的刷盤。 3. 調用 libc 的 mlock 函數,對文件所在的內存區域進行鎖定。(系統調用 mlock 家族允許程序在物理內存上鎖住它的部分或全部地址空間。
這將阻止 Linux 將這個內存頁調度到交換空間(swap space),即使該程序已有一段時間沒有訪問這段空間)。 - 同步和異步刷盤
RocketMQ 提供了同步刷盤和異步刷盤兩種機制。默認使用異步刷盤機制。 當 CommitLog 在 putMessage() 中收到 MappedFile 成功追加消息到內存的結果后,便會調用 handleDiskFlush() 方法進行刷盤,將消息存儲到文件中。
handleDiskFlush() 便會根據兩種刷盤策略,調用不同的刷盤服務。 抽象類 FlushCommitLogService 負責進行刷盤操作,該抽象類有 3 中實現: * GroupCommitService:同步刷盤 * FlushRealTimeService:異步刷盤 * CommitRealTimeService:異步刷盤並且開啟 TransientStorePool 每個實現類都是一個 ServiceThread 實現類。ServiceThread 可以看做是一個封裝了基礎功能的后台線程服務。有完整的生命周期管理,支持 start、shutdown、weakup、waitForRunning。 同步刷盤流程 1. 所有的 flush 操作都由 GroupCommitService 線程進行處理 2. 當前接收消息的線程封裝一個 GroupCommitRequest,並提交給 GroupCommitService 線程,然后當前線程進入一個 CountDownLatch 的等待 3. 一旦有新任務進來 GroupCommitService 被立即喚醒,並調用 MappedFile.flush 進行刷盤。底層是調用 mappedByteBuffer.force () 4. flush 完成后喚醒等待中的接收消息線程。從而完成同步刷盤流程 異步刷盤流程 1. RocketMQ 每隔 200ms 進行一次 flush 操作(把數據持久化到磁盤) 2. 當有新的消息寫入時候會主動喚醒 flush 線程進行刷盤 3. 當前接收消息線程無須等待 flush 的結果。 - 關閉偏向鎖
在 RocketMQ 的性能測試中,發現存在大量的 RevokeBias 停頓,偏向鎖主要是消除無競爭情況下的同步原語以提高性能,但考慮到 RocketMQ 中該場景比較少,便通過 - XX:-UseBiasedLocking 關閉了偏向鎖特性。 在沒有實際競爭的情況下,還能夠針對部分場景繼續優化。如果不僅僅沒有實際競爭,自始至終,使用鎖的線程都只有一個,那么,維護輕量級鎖都是浪費的。偏向鎖的目標是,減少無競爭且只有一個線程使用鎖的情況下,
使用輕量級鎖產生的性能消耗。輕量級鎖每次申請、釋放鎖都至少需要一次 CAS,但偏向鎖只有初始化時需要一次 CAS。 偏向鎖的使用場景有局限性,只適用於單個線程使用鎖的場景,如果有其他線程競爭,則偏向鎖會膨脹為輕量級鎖。當出現大量 RevokeBias 引起的小停頓時,說明偏向鎖意義不大,
此時通過 - XX:-UseBiasedLocking 進行優化,因此 RocketMQ 的 JVM 參數中會默認加上 - XX:-UseBiasedLocking。
九、存儲
- 消息存儲結構
* ConsumeQueue 與 CommitLog 不同,采用定長存儲結構,如下圖所示。為了實現定長存儲,ConsumeQueue 存儲了消息 Tag 的 Hash Code,在進行 Broker 端消息過濾時,
通過比較 Consumer 訂閱 Tag 的 HashCode 和存儲條目中的 Tag Hash Code 是否一致來決定是否消費消息。 * ReputMessageService 持續地讀取 CommitLog 文件並生成 ConsumeQueue。
2. 順序消費與並行消費
串行消費和並行消費最大的區別在於消費隊列中消息的順序性。順序消費保證了同一個 Queue 中的消費時的順序性。RocketMQ 的順序性依賴於分區鎖的實現。消息消費有推拉兩種模式,我們這里只考慮推這種模式
- 並行消費
1. 並行消費的實現類為 ConsumeMessageConcurrentlyService。 2. PullMessageService 內置一個 scheduledExecutorService 線程池,主要負責處理 PullRequest 請求,從 Broker 端拉取最新的消息返回給客戶端。拉取到的消息會放入 MessageQueue 對應的 ProcessQueue。 3. ConsumeMessageConcurrentlyService 把收到的消息封裝成一個 ConsumeRequest,投遞給內置的 consumeExecutor 獨立線程池進行消費。 4. ConsumeRequest 調用 MessageListener.consumeMessage 執行用戶定義的消費邏輯,返回消費狀態。 5. 如果消費狀態為 SUCCESS。則刪除 ProcessQueue 中的消息,並提交 offset。 6. 如果消費狀態為 RECONSUME。則把消息發送到延時隊列進行重試,並對當前失敗的消息進行延遲處理。
- 串行消費
1. 串行消費的實現類為 ConsumeMessageOrderlyService。 2. PullMessageService 內置一個 scheduledExecutorService 線程池,主要負責處理 PullRequest 請求,從 Broker 端拉取最新的消息返回給客戶端。拉取到的消息會放入 MessageQueue 對應的 ProcessQueue。 3. ConsumeMessageOrderlyService 把收到的消息封裝成一個 ConsumeRequest,投遞給內置的 consumeExecutor 獨立線程池進行消費。 4. 消費時首先獲取 MessageQueue 對應的 objectLock,保證當前進程內只有一個線程在處理對應的的 MessageQueue, 從 ProcessQueue 的 msgTreeMap 中按 offset 從低到高的順序取消息,從而保證了消息的順序性。 5. ConsumeRequest 調用 MessageListener.consumeMessage 執行用戶定義的消費邏輯,返回消費狀態。 6. 如果消費狀態為 SUCCESS。則刪除 ProcessQueue 中的消息,並提交 offset。 7. 如果消費狀態為 SUSPEND。判斷是否達到最大重試次數,如果達到最大重試次數,就把消息投遞到死信隊列,繼續下一條消費;否則消息重試次數 + 1,在延時一段時間后繼續重試。可見,
串行消費如果某條消息一直無法消費成功會造成阻塞,嚴重時會引起消息堆積和關聯業務異常。 - Broker 端的 PullMessage 長連接實現
消息隊列中的消息是由業務觸發而產生的,如果使用周期性的輪詢,不能保證每次都取到消息,且輪詢的頻率過快或者過慢都會對消息的延時有嚴重的影響。
因此 RockMQ 在 Broker 端使用長連接的方式處理 PullMessage 請求。具體實現流程如下: 1. PullRequest 請求中有個參數 brokerSuspendMaxTimeMillis,默認值為 15s,控制請求 hold 的時長。 2. PullMessageProcessor 接收到 Request 后,解析參數,校驗 Topic 的 Meta 信息和消費者的訂閱關系。對於符合要求的請求,從存儲中拉取消息。 3. 如果拉取消息的結果為 PULL_NOT_FOUND,表示當前 MessageQueue 沒有最新消息。 4. 此時會封裝一個 PullRequest 對象,並投遞給 PullRequestHoldService 內部線程的 pullRequestTable 中。 5. PullRequestHoldService 線程會周期性輪詢 pullRequestTable,如果有新的消息或者 hold 時間超時 polling time,就會封裝 Response 請求發給客戶端。 6. 另外 DefaultMessageStore 中定義了 messageArrivingListener,當產生新的 ConsumeQueue 記錄時候,會觸發 messageArrivingListener 回調,立即給客戶端返回最新的消息。 長連接機制使得 RocketMQ 的網絡利用率非常高效,並且最大限度地降低了消息拉取時的等待開銷。實現了毫秒級的消息投遞。