前言
首先提出我們的監控訴求,出現如下情況時,希望能夠及時接收到系統告警通知:
- RocketMQ 服務宕機
- RocketMQ 消費者下線
- RocketMQ 消息出現長時間或者大量堆積
本文將通過修改 rocketmq-console源碼的方式,增加RocketMQ 消費者下線 和RocketMQ 消息出現長時間或者大量堆積監控能力。
一. RocketMQ 服務宕機監控告警
這一級別的監控,本質上而言是監控Linux上啟動的Rocket MQ Java進程的運行情況。細分的話,需要監控以下兩個維度:
- Linux Java 進程的CPU 使用率,內存使用量;
- Java 進程本身的JVM的服務質量,GC,並發數,內存分布等
一般的公司在運維方面會有專門的監控組件,如zabbix會做統一處理。
例如這里用簡單的shell腳本+釘釘組裝的最簡單的監控告警方式:
監控的方式有很多,比如簡單點的,我們可以寫一個shell腳本,監控執行rocketmqJava進程的存活狀態,如果rocketmq crash了,發送告警:
#!/bin/bash ## monitor.sh while true do echo "開始監控rocket broker 進程..." PID=$(ps -ef | grep java | grep org.apache.rocketmq.broker.BrokerStartup | awk '{printf $2}'); if [ -z $PID ];then curl 'https://oapi.dingtalk.com/robot/send?access_token=xxxxxx' -H 'Content-Type: application/json' \ -d ' {"msgtype": "text", "text": { "content": "【172.xxx.xxx.xxx】rocketmq broker 進程不存在,可能宕機,請盡快排查!" } }' fi sleep 10 done
效果:
2. RocketMQ的集群組件組成
2.1、rocketmq-console web控制台介紹
官方提供了一個WEB項目,可以查看rocketmq數據和執行一些操作。incubator-rocketmq-externals,這個項目中有一個子模塊叫“rocketmq-console”,這個便是管理控制台項目。Rocket-console做為rocketmq社區維護的產品需要從GitHub上下載,下載地址:https://github.com/apache/rocketmq-externals
各個功能的介紹:
- OPS運維:NameServer的地址的管理
- Dashboard駕駛艙:展示Broker和Topic的柱狀圖和折線圖。
- Cluster集群:Brokder的集群部署情況及每個Broker的詳情。
- Topic主題:對Topic的新增、修改、刪除。對consumer管理、消息位點重置等。
- Consumer消費者:consumer的一些狀態的管理。
- Producer生產者:producer的信息(ip、port、版本等)查看。
- Message消息:根據Topic、Message Key、Message ID三項對Message分組查詢。一般第一項根據Topic查詢比較多。因為據說根據key去查詢有坑。建議id和Topic,id因為唯一最簡單。
- MessageTrace消息軌跡:
一個完整的RocketMQ集群,一般組成關系如下圖所示:
除了核心組成部分:Name Server 和 Broker Cluster 之外,RocketMQ還提供了 mqadmin工具,該工具的具體實現代碼在RocketMQ tools模塊(rocketmq-tools-xxxx.jar)中。但是mqadmin命令行在交互上不夠友好,**rocketmq-console**作為一個社區項目,底層基於mqadmin 核心庫,用Spring Boot+Angularjs實現了一個RocketMQ Web管理端,開發運維人員可以輕松地使用此管理端完成日常運維操作。
3. mqadmin–提供一套命令行工具,做RocketMQ的日常管理維護
3.1、mqadmin 工具在哪兒?
mqadmin本質上是一個Java命令行工具,也就是說執行mqadmin的過程也是執行Java的過程,**mqadmin**的位置和runbroker和mqnamesrv並列:
3.2、mqadmin能做什么?
執行./mqadmin,會在命令行輸出其指令列表:
[root@localhost bin]# ./mqadmin The most commonly used mqadmin commands are: updateTopic Update or create topic deleteTopic Delete topic from broker and NameServer. updateSubGroup Update or create subscription group deleteSubGroup Delete subscription group from broker. updateBrokerConfig Update broker's config updateTopicPerm Update topic perm topicRoute Examine topic route info topicStatus Examine topic Status info topicClusterList get cluster info for topic brokerStatus Fetch broker runtime status data queryMsgById Query Message by Id queryMsgByKey Query Message by Key queryMsgByUniqueKey Query Message by Unique key queryMsgByOffset Query Message by offset printMsg Print Message Detail printMsgByQueue Print Message Detail sendMsgStatus send msg to broker. brokerConsumeStats Fetch broker consume stats data producerConnection Query producer's socket connection and client version consumerConnection Query consumer's socket connection, client version and subscription consumerProgress Query consumers's progress, speed consumerStatus Query consumer's internal data structure cloneGroupOffset clone offset from other group. clusterList List all of clusters topicList Fetch all topic list from name server updateKvConfig Create or update KV config. deleteKvConfig Delete KV config. wipeWritePerm Wipe write perm of broker in all name server resetOffsetByTime Reset consumer offset by timestamp(without client restart). updateOrderConf Create or update or delete order conf cleanExpiredCQ Clean expired ConsumeQueue on broker. cleanUnusedTopic Clean unused topic on broker. startMonitoring Start Monitoring statsAll Topic and Consumer tps stats allocateMQ Allocate MQ checkMsgSendRT check message send response time clusterRT List All clusters Message Send RT getNamesrvConfig Get configs of name server. updateNamesrvConfig Update configs of name server. getBrokerConfig Get broker config by cluster or special broker! queryCq Query cq command. sendMessage Send a message consumeMessage Consume message updateAclConfig Update acl config yaml file in broker deleteAccessConfig Delete Acl Config Account in broker clusterAclConfigVersion List all of acl config version information in cluster updateGlobalWhiteAddr Update global white address for acl Config File in broker See 'mqadmin help <command>' for more information on a specific command. [root@localhost bin]#
例如:
通過命令行查詢消息堆壓:
[root@localhost rocketmq-4.5.2]# sh bin/mqadmin consumerProgress -n 10.200.110.46:9876 RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. #Group #Count #Version #Type #Model #TPS #Diff Total please_rename_unique_group_name_ 0 OFFLINE 0 0 configurationfile-encryption-dem 0 OFFLINE 0 0 groupnamedef 1 V4_5_2 PUSH CLUSTERING 0 0 TOOLS_CONSUMER 0 OFFLINE 0 0 [root@localhost rocketmq-4.5.2]# ls
具體每個指令的作用不是本文的重點,后續會開新的文章介紹~
4、使用 rocketmq-console添加MQ監控告警
我們可以利用rocketmq-console做如下的監控:
- RocketMQ 消費者下線
- RocketMQ 消息出現長時間或者大量堆積
4.1 rocketmq-console的監控告警功能
作為mqadmin的GUI封裝,rocketmq-console基本上具備了mqadmin的功能外,也提供了一些額外的功能,如dashboard面板統計。但是,作為開源源碼部分,rocketmq-console將MQ監控功能做了隱藏,我們需要手動放開。如下是使用rocket-console的監控原理:
當此項功能被放開后,在Consumer菜單下,為每一個consumer-group 的operation 會增加MONITOR CONFIG 選項,如下圖所示:
指標名稱 說明 備注
minCount 當前消費分組的機器數量最小閾值,低於此值將會告警
minCount 當前消費分組允許的最大消息堆積量,高於辭職將會告警
4.2 如何開啟rocketmq-console的監控告警功能
開源的rocketmq-console將此功能隱藏了,可以通過下載源碼,並修改源碼的方式支持。
4.2.1 下載源碼
從github中獲取源碼,rocketmq-externals
地址:https://github.com/apache/rocketmq-externals
4.2.2 導入項目
項目導入后,如下圖所示,rocketmq-console即為控制台代碼
4.2.3 放開console控制台的監控參數配置
默認的rocketmq-console將此功能注釋掉了,修改文件:
~/rocketmq-console/src/resources/static/view/pages/consumer.html,將如下圖所示的代碼放開即可。
4.2.4 開啟定時任務監控,掃描實時數據,做閾值判斷,告警提示
默認情況下,rocketmq-console只定義了定時任務入口,具體的策略沒有任何處理,我們需要根據自己的需求加入自身的告警方式,比如:郵箱,釘釘,短信,微信等等。
其預留的定時任務實現類為:
org.apache.rocketmq.console.task.MonitorTask
定時任務的掃描頻率可根據自身系統要求考量設置。
@Component public class MonitorTask { private Logger logger = LoggerFactory.getLogger(MonitorTask.class); @Resource private MonitorService monitorService; @Resource private ConsumerService consumerService; // @Scheduled(cron = "* * * * * ?") // 定時任務的掃描頻率可根據自身系統要求考量設置 public void scanProblemConsumeGroup() { for (Map.Entry<String, ConsumerMonitorConfig> configEntry : monitorService.queryConsumerMonitorConfig().entrySet()) { GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey()); if (consumeInfo.getCount() < configEntry.getValue().getMinCount() || consumeInfo.getDiffTotal() > configEntry.getValue().getMaxDiffTotal()) { logger.info("op=look consumeInfo {}", JsonUtil.obj2String(consumeInfo)); // notify the alert system //根據自身的要求加如通知方式 } } } }
4.2.5 build修改后的代碼,生成可運行的jar包,然后部署運行
4.2.6 樣例
經筆者改造后的console的控制台可以顯示出 ‘MONITOR CONFIG’ 配置項:
釘釘告警樣例:
5. 總結
rocketmq-console 作為開發運維人員監控MQ的便捷入口,可根據自身要求改造rocketmq-console,rocketmq-console服務本身可以調用所有mqadmin的所有能力,項目本身基於angularjs +spring boot,作為java 開發人員來說拓展成本也比較低。不過前期需要對rocketmq的一些概念和各種衡量標准要有明確的認知。
本文沒有對’mqadmin’的具體指令和設計原理展開,將另開文章解釋,有興趣的可關注下,敬請期待~
另外作者已開通微信訂閱號,精品文章同步更新,歡迎關注~
————————————————
版權聲明:本文為CSDN博主「亦山」的原創文章,遵循CC 4.0 by-sa版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/luanlouis/article/details/88078657