1.介紹
rocketmq由consumer,producer,broker三大塊組成,如何對這三類服務進行運維操作呢?這里介紹rocketmq自帶的命令行工具:mqadmin
2.位置
項目:https://github.com/apache/rocketmq.git
${PWD}/rocketmq/distribution/bin/mqadmin
3.腳本
3.1 mqadmin
#這里ROCKETMQ_HOME對應的是rocketmq的主目錄
if [ -z "$ROCKETMQ_HOME" ] ; then ## resolve links - $0 may be a link to maven's home PRG="$0" # need this for relative symlinks while [ -h "$PRG" ] ; do ls=`ls -ld "$PRG"` link=`expr "$ls" : '.*-> \(.*\)$'` if expr "$link" : '/.*' > /dev/null; then PRG="$link" else PRG="`dirname "$PRG"`/$link" fi done saveddir=`pwd` ROCKETMQ_HOME=`dirname "$PRG"`/.. # make it fully qualified ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd` cd "$saveddir" fi export ROCKETMQ_HOME #這里實際使用的另外一個腳本tools.sh sh ${ROCKETMQ_HOME}/bin/tools.sh org.apache.rocketmq.tools.command.MQAdminStartup $@
3.2 tools.sh
主要作用就是設置JVM參數,然后使用java 啟動對應classs
#=========================================================================================== # Java Environment Setting #=========================================================================================== error_exit () { echo "ERROR: $1 !!" exit 1 } [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!" export JAVA_HOME export JAVA="$JAVA_HOME/bin/java" export BASE_DIR=$(dirname $0)/.. export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH} #=========================================================================================== # JVM Configuration #=========================================================================================== JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m" JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
#這里是實際調用 $JAVA ${JAVA_OPT} $@
3.3 MQAdminStartup.java
MQAdminStartup.java是個入口類,里面注冊了如下的各類命令,如需詳細了解細節可以直接進入${PWD}/rocketmq/tools/src/main/java/org/apache/rocketmq/tools/command目錄下查看:
import org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad; import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand; import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand; import org.apache.rocketmq.tools.command.broker.CleanUnusedTopicCommand; import org.apache.rocketmq.tools.command.broker.GetBrokerConfigCommand; import org.apache.rocketmq.tools.command.broker.SendMsgStatusCommand; import org.apache.rocketmq.tools.command.broker.UpdateBrokerConfigSubCommand; import org.apache.rocketmq.tools.command.cluster.CLusterSendMsgRTCommand; import org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand; import org.apache.rocketmq.tools.command.connection.ConsumerConnectionSubCommand; import org.apache.rocketmq.tools.command.connection.ProducerConnectionSubCommand; import org.apache.rocketmq.tools.command.consumer.ConsumerProgressSubCommand; import org.apache.rocketmq.tools.command.consumer.ConsumerStatusSubCommand; import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand; import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand; import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand; import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand; import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand; import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand; import org.apache.rocketmq.tools.command.message.PrintMessageSubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByIdSubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByKeySubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand; import org.apache.rocketmq.tools.command.message.SendMessageCommand; import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.UpdateKvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.UpdateNamesrvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.WipeWritePermSubCommand; import org.apache.rocketmq.tools.command.offset.CloneGroupOffsetCommand; import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand; import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand; import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand; import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand; import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand; import org.apache.rocketmq.tools.command.topic.TopicClusterSubCommand; import org.apache.rocketmq.tools.command.topic.TopicListSubCommand; import org.apache.rocketmq.tools.command.topic.TopicRouteSubCommand; import org.apache.rocketmq.tools.command.topic.TopicStatusSubCommand; import org.apache.rocketmq.tools.command.topic.UpdateOrderConfCommand; import org.apache.rocketmq.tools.command.topic.UpdateTopicPermSubCommand; import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;
3.4 命令詳解
整體來說,命令分為如下幾大類:
1. 集群相關:
- clusterList:查看集群列表
- clusterRT:測試集群的響應耗時
2. broker相關
- updateBrokerConfig :更新broker的配置
- brokerStatus :獲取broker的運行時狀態數據
- wipeWritePerm:設置某broker為只讀
- getBrokerConfig:獲取broker的配置信息
3.topic相關:
- updateTopic:創建或更新topic
- deleteTopic :從nameserver和broker中刪除topic信息
- updateTopic:更新topic的perm信息
- topicRoute:獲取topic的路由信息
- topicStatus: 獲取topic的當前狀態信息
- topicClusterList:獲取topic對應的集群信息
- topicList:查看集群中的topic列表
- statsAll: 查看所有topic已經對應的consumer的消費進度
- cleanUnusedTopic:清理集群中無用的topic
4.message相關:
- queryMsgById: 通過message的id查詢消息
- queryMsgByKey:通過message的key查詢消息
- queryMsgByUniqueKey:通過message的UniqueKey查詢消息
- queryMsgByOffset:通過偏移查詢message 、
- printMsg:打印某條message的詳情
- printMsgByQueue:打印某個queue(隊列)里的消息的詳情
- sendMsgStatus:向broker發送消息
- sendMessage:發送一條消息
- producerConnection:查詢producer的信息(socket連接,客戶端版本)
5. 消費相關:
- consumerConnection :查詢consumer的信息(socker連接,客戶端版本,消費組)
- consumerProgress :查詢consumer的消費進度,tps
- consumerStatus :查詢consumer的的內部數
- brokerConsumeStats:查看所有topic對應的消費數據(broker的offset,consumer的offset,是否有diff)
- cleanExpiredCQ:清理集群中無用的消費組cloneGroupOffset clone offset from other group.
- cloneGroupOffset :克隆指定topic下某個消費組的消費進度到指定消費組
- resetOffsetByTime:設置consumer的offset到某個時間點
- checkMsgSendRT:測試消費發生的響應耗時
- consumeMessage:消費消息
- queryCq:查詢consumer指定隊列和索引位置的消費信息
- updateSubGroup:創建或者更新消費組(訂閱組)
- deleteSubGroup:從broker中刪除消費組(訂閱組)
6. nameserver相關:
- updateKvConfig:更新nameserver的kv配置
- deleteKvConfig:刪除nameserver的kv配置
- getNamesrvConfig:獲取nameserver的配置
- updateNamesrvConfig:更新nameserver的配置
7. acl相關:
- updateAclConfig:更新acl配置
- deleteAccessConfig:刪除acl配置的賬戶
- clusterAclConfigVersion:查看集群的acl配置版本
- updateGlobalWhiteAddr:更新acl里面的白名單
- getAccessConfigSubCommand:查看acl的配置信息
8. 其他:
- updateOrderConf Create or update or delete order conf
- startMonitoring Start Monitoring
- allocateMQ Allocate MQ
博主:測試生財(一個不為996而996的測開碼農)
座右銘:專注測試開發與自動化運維,努力讀書思考寫作,為內卷的人生奠定財務自由。
內容范疇:技術提升,職場雜談,事業發展,閱讀寫作,投資理財,健康人生。
csdn:https://blog.csdn.net/ccgshigao
博客園:https://www.cnblogs.com/qa-freeroad/
51cto:https://blog.51cto.com/14900374
微信公眾號:測試生財(定期分享獨家內容和資源)