金霖 2019-07-01 15:43:03
RocketMq簡介
基本概念
Produce
消息生產者,負責產生消息,一般由業務系統負責產生消息。
Consumer
消息消費者,負責消費消息,一般是后台系統負責異步消費。
Push Consumer
Consumer的一種,應用通常向Consumer對象注冊一個Listener接口,一旦收到消息,Consumer對象立刻回調Listener接口方法。
Pull Consumer
Consumer的一種,應用通常主動調用Consumer的拉消息方法從Broker拉消息,主動權由應用控制。
ProducerGroup
一類Producer的集合名稱,這類Producer通常發送一類消息,且發送邏輯一致。
Consumer Group
一類Consumer的集合名稱,這類Consumer通常消費一類消息,且消費邏輯一致。
Broker
消息中轉角色,負責存儲消息,轉發消息,一般也稱為Server
NameService
負責路由,管理。
廣播消費
一條消息被多個Consumer消費,即使這些Consumer屬於同一個Consumer Group,消息也會被Consumer Group中的每個Consumer都消費一次,廣播消費中的Consumer Group概念可以認為在消息划分方面無意義。
集群消費
一個Consumer Group中的Consumer實例平均分攤消費消息。例如某個Topic有9條消息,其中一個Consumer Group有3個實例(可能是3個進程,或者3台機器),那么每個實例只消費其中的3條消息。
消費方式如圖:
部署架構設計如圖:
-
Name Server是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。
-
Broker部署相對復雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與Name Server集群中的所有節點建立長連接,定時注冊Topic信息到所有Name Server。
-
Producer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。
-
Consumer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。
Producer、Consumer、topic關系如圖:
圖中紅字出現的原因:
在不同的 JVM 中啟動了多個 Consumer,並且給相同的 Consumer ID 配置了不同的 Topic,
或者是相同的 Topic 但 Tag 不同,最終導致訂閱關系不一致,消息不符合預期
生產問題總結
- 不同jvm同一topic下,tag的訂閱不一致,導致消息狀態不一致,產生消費堵塞;
- 同一個consumer分組下面的consumer的topic訂閱不一致,導致消息狀態不一致,產生消費堵塞;
- 某一個consumer下線之后,下線狀態沒有同步整個集群;
- 某個broker下沒有消費者注冊;
- 網絡不穩定或是消費者consumer重啟,消息重復消費
生產經驗總結
生產中出現的問題最多的就是:消息狀態不一致,導致消息擠壓,比如前面提到的a、b兩個問題。解決方案:“在不同的 JVM 中啟動了多個 Consumer,並且給相同的 Consumer ID 配置了不同的 Topic,或者是相同的 Topic 但 Tag 不同,最終導致訂閱關系不一致,消息不符合預期”;
問題3、4其實從解決方案上來說都是屬於同一類問題。解決方案:查看相應消息對應的consumer分組下面的topic上是否有注冊上應該注冊的消費者。
如圖:
如果有問題只能選擇性的重啟某個或是全部broken或者所有的NameService。
問題5的解決方案:業務上做去重處理,可以根據業務上的唯一訂單號,也可以根據rocketMq的messageId來做限制
安全與高可用
刷盤策略
RocketMQ的所有消息都是持久化的,先寫入系統PAGECACHE,然后刷盤,可以保證內存與磁盤都有一份數據,訪問時,直接從內存讀取。可分類為同步刷盤、異步刷盤,如圖:
同步雙寫/異步復制
異步復制的實現思路非常簡單,Slave啟動一個線程,不斷從Master拉取Commit Log中的數據,然后在異步build出Consume Queue數據結構。
同步雙寫主備都寫成功的時候才向應用返回成功。整個實現過程基本同Mysql主從同步類似。
NameService
nameservice主要是起到路由功能,注冊和發現服務。NameService是無狀態的,可以部署多套,可保證高可用
Broker部署結構
###單個Master
這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用,不建議線上環境使用
多Master模式
一個集群無Slave,全是Master,例如2個Master或者3個Master。
優點:配置簡單,單個Master宕機或重啟維護對應用無影響,在磁盤配置為RAID10時,即使機器宕機不可恢復情況下,由於RAID10磁盤非常可靠,消息也不會丟(異步刷盤丟失少量消息,同步刷盤一條不丟),性能最高。
缺點:單台機器宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受到受到影響。
多Master多Slave模式,異步復制
每個Master配置一個Slave,有多對Master-Slave,HA采用異步復制方式,主備有短暫消息延遲,毫秒級。
優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,因為Master宕機后,消費者仍然可以從Slave消費,此過程對應用透明。不需要人工干預。性能同多Master模式幾乎一樣。
缺點:Master宕機,磁盤損壞情況,會丟失少量消息。
多Master多Slave模式,同步雙寫
每個Master配置一個Slave,有多對Master-Slave,HA采用同步雙寫方式,主備都寫成功,向應用返回成功。
優點:數據與服務都無單點,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高
缺點:性能比異步復制模式略低,大約低10%左右,發送單個消息的RT會略高。目前主宕機后,備機不能自動切換為主機,后續會支持自動切換功能。
Broker重啟對客戶端的影響
Broker重啟可能會導致正在發往這台機器的的消息發送失敗,RocketMQ提供了一種優雅關閉Broker的方法,通過執行以下命令會清除Broker的寫權限,過40s后,所有客戶端都會更新Broker路由信息,此時再關閉Broker就不會發生發送消息失敗的情況,因為所有消息都發往了其他Broker。
sh mqadmin wipeWritePerm-b brokerName -n namesrvAddr
運維步驟及常見問題:
線上環境架構
一般生產環境采用多Master多Slave模式,異步復制方式,當前線上Master部署部署在一台機,Slave部署在另一台機,如圖:
關鍵位置配置 –- broker-a.properties
brokerClusterName=rocketmq-sxzy #所屬集群名稱 brokerName=broker-a #broker名字,注意此處不同的配置文件填寫的不一樣 #nameServer地址,分號分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876 brokerId=0 #0表示Master,>0 表示 Slave deleteWhen=04 #刪除文件時間點,默認凌晨 4點 fileReservedTime=120 #文件保留時間,默認 48 小時 defaultTopicQueueNums=4 #在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數 autoCreateTopicEnable=true #是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉 autoCreateSubscriptionGroup=true #是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉 #Broker 對外服務的監聽端口 listenPort=10911 mapedFileSizeConsumeQueue=300000 #ConsumeQueue每個文件默認存30W條,根據業務情況調整 brokerRole=ASYNC_MASTER # 異步復制Master flushDiskType=ASYNC_FLUSH #異步刷盤
重啟腳本
#!/bin/sh #kill 進程 echo "正在關閉 mq-server" /data/cloud2/RocketMq/bin/mqshutdown namesrv sleep 1 ps -ef |grep mqnamesrv |grep -v grep |awk '{print $2}' |xargs kill -9 echo "正在關閉 mq-broker"= /data/cloud2/RocketMq/bin/mqshutdown broker sleep 1 ps -ef |grep broker|grep -v grep |awk '{print $2}' |xargs kill -9 #啟動進程 echo "正在啟動mq-server" nohup sh /usr/cloud2/RocketMq/bin/mqnamesrv >/usr/cloud2/RocketMq/logs/mqnamesrv.log 2>&1 & sleep 1 echo "正在啟動broker-a" nohup sh /usr/cloud2/RocketMq/bin/mqbroker -c /usr/cloud2/RocketMq/conf/2m-2s-sync/broker-a.properties > /usr/cloud2/RocketMq/logs/broker-a.log 2>&1 & sleep 1 echo "正在啟動broker-b" nohup sh /usr/cloud2/RocketMq/bin/mqbroker -c /usr/cloud2/RocketMq/conf/2m-2s-sync/broker-b.properties > /usr/cloud2/RocketMq/logs/broker-b.log 2>&1 & sleep 1 jps echo "mq服務啟動完畢,請檢查NamesrvStartup和BrokerStartup是否存在“
MQ控制台
RocketMQDemo
-
使用maven編譯成war包,或從網上下載別人打好的包
-
修改config.properties
rocketmq.namesrv.addr=mqnameserver1:9876;mqnameserver2:9876
-
啟動tomcat,瀏覽器里輸入 http://ip:端口/rocketmq-console 訪問
rocketmq-console-ng
-
使用maven編譯成war包,或從網上下載別人打好的包(網上大多jar)
-
修改application.properties
rocketmq.config.namesrvAddr=mqnameserver1:9876;mqnameserver2:9876
-
啟動tomcat,瀏覽器里輸入 http://ip:端口/rocketmq-console-ng 訪問
或使用jar啟動java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=mqnameserver1:9876;mqnameserver2:9876
創建生產者
創建消費者
異常問題處理:
-
進程不存在::使用ps –ef |grep mq 或使用jps命令查看 Mqname 和
brokerName組進程是否存在 -
磁盤空間滿:如果磁盤滿,根據下面路徑刪除最不重要的文件
#存儲路徑
storePathRootDir=/data/cloud2/RocketMq/data/store_a
#commitLog 存儲路徑
storePathCommitLog=/data/cloud2/RocketMq/data/store_a/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/data/cloud2/RocketMq/data/store_a/consumequeue
#消息索引存儲路徑
storePathIndex=/data/cloud2/RocketMq/data/store_a/index
#checkpoint 文件存儲路徑
storeCheckpoint=/data/cloud2/RocketMq/data/store_a/checkpoint
#abort 文件存儲路徑
abortFile=/data/cloud2/RocketMq/data/store_a/abort -
消息不消費:通過控制台檢查訂閱關系是否一致,和查看消費者終端信息
-
生產者故障:檢查程序topic配置,實在檢查不出問題直接執行上面重啟腳本
現存未解決問題
各種高可用方案沒有具體驗證過。
轉載於:https://blog.csdn.net/qq_25868207/article/details/94393554