RocketMQ系列:rocketmq的benchmark工具


1.環境參考

benchmark環境搭建:參考單機快速搭建單broker環境

被壓測環境:rocketmq的dledger集群

2.源碼位置

https://github.com/apache/rocketmq/tree/master/example/src/main/java/org/apache/rocketmq/example/benchmark

3.工具清單

consumer.sh:消息消費的benchmark工具
producer.sh: 消息生產benchmark工具(同步非批處理模式)

3.1 producer.sh

3.1.1 幫助

sh producer.sh -h

usage: benchmarkProducer [-h] [-k <arg>] [-n <arg>] [-s <arg>] [-t <arg>] [-w <arg>]
-h,--help Print help
-k,--keyEnable <arg> Message Key Enable, Default: false   //是否使用message key,true則使用timestamp作為message的key
-n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876  //指定nameserver地址
-s,--messageSize <arg> Message Size, Default: 128        //指定消息大小,默認128字節
-t,--topic <arg> Topic name, Default: BenchmarkTest      //指定topic,默認使用BenchmarkTest,如果指定其他記得先創建對應的topic
-w,--threadCount <arg> Thread count, Default: 64         //開啟的發送生產消息的線程數

3.1.2 源碼重要片段

//默認生產組為:benchmark_producer

final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer");
//如果keyEnable為True,則會以時間戳作為message的key
if (keyEnable) {
    msg.setKeys(String.valueOf(beginTimestamp / 1000));
}
//設置producer用於發送消息的線程池大小,-w的值
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);

3.1.3 例子

指定nameserver進行生產消息壓測

sh producer.sh -n xxx.xxx.xxx.xxx:9876

SendTPS:生產消息的TPS

Max RT:最大響應時間(毫秒)

Average RT:平均響應時間(毫秒)

Send Failed:發送失敗的總請求數

Response Failed:返回失敗的總響應數

這里剛開始有發生失敗的原因是由於producer剛啟動,短期內對broker造成了壓力。在實際使用producer的時候,應該對發送失敗的情況進行重新消息重發。

 可以看到控制台里Produce Message TPS為3000多,其中slave是從master同步消息的TPS(備份master的消息數據),master才是實際接收的生產消息TPS。

3.2 consumer.sh

3.2.1 幫助

sh consumer.sh -h

usage: benchmarkConsumer [-e <arg>] [-f <arg>] [-g <arg>] [-h] [-n <arg>] [-p <arg>] [-r <arg>] [-t <arg>]
-e,--expression <arg> filter expression content file path.ie: ./test/expr               //配合filter參數使用,過濾的條件表達式
-f,--filterType <arg> TAG, SQL92                                                        //過濾方式
-g,--group <arg> Consumer group name, Default: benchmark_consumer                       //指定消費組,默認為benchmark_consumer
-h,--help Print help
-n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876  //指定nameserver地址
-p,--group prefix enable <arg> Consumer group name, Default: false                      //是否給消費組添加后綴,默認會給指定的消費組后添加后綴,默認應該是true(提示有問題)
-r,--fail rate <arg> consumer fail rate, default 0                                      //指定消費失敗率,只要沒有超過消費失敗率,消費失敗都會重試
-t,--topic <arg> Topic name, Default: BenchmarkTest                                     //指定topic,默認使用BenchmarkTest,如果指定其他記得先創建對應的topic 

3.2.2 源碼重要片段

#根據指定的消費group生成消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); 
#指定nameserver地址
if (commandLine.hasOption('n')) {
            String ns = commandLine.getOptionValue('n');
            consumer.setNamesrvAddr(ns);
}
#如果沒有指定isSuffixEnable,即-p指定的數值,則會給消費組加上后綴
if (Boolean.parseBoolean(isSuffixEnable)) {
            group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
}
#指定topic的消息過濾器,只消費符合條件的消息
#SQL92語法
#TAG語言
if
(filterType == null || expression == null) { consumer.subscribe(topic, "*"); } else { if (ExpressionType.TAG.equals(filterType)) { String expr = MixAll.file2String(expression); System.out.printf("Expression: %s%n", expr); consumer.subscribe(topic, MessageSelector.byTag(expr)); } else if (ExpressionType.SQL92.equals(filterType)) { String expr = MixAll.file2String(expression); System.out.printf("Expression: %s%n", expr); consumer.subscribe(topic, MessageSelector.bySql(expr)); } else { throw new IllegalArgumentException("Not support filter type! " + filterType); }
}
#如果當前消費比例小於failRate,會稍后進行重試消費,否則直接跳過
if (ThreadLocalRandom.current().nextDouble() < failRate) { statsBenchmarkConsumer.getFailCount().incrementAndGet(); return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
else { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

3.2.3 例子

#從BenchmarkTest消費消息,這里會自動給消費組:test2加上一個后綴

sh consumer.sh -t BenchmarkTest -n nameserver:9876 -g test2

 TPS: 消費的TPS

FAIL:消費失敗的總數

AVG(B2C):broker到Consumer的平均響應時間(毫秒)

AVG(S2C):nameserver到Consumer的平均響應時間(毫秒)

MAX(B2C): broker到Consumer的最大響應時間(毫秒)

MAX(S2C): nameserver到Consumer的最大響應時間(毫秒)

 可以看到控制台里Consumer Message TPS為6w多,遠大於producer的tps,且消費只從Master請求消息。

博主:測試生財(一個不為996而996的測開碼農)

座右銘:專注測試開發與自動化運維,努力讀書思考寫作,為內卷的人生奠定財務自由。

內容范疇:技術提升,職場雜談,事業發展,閱讀寫作,投資理財,健康人生。

csdn:https://blog.csdn.net/ccgshigao

博客園:https://www.cnblogs.com/qa-freeroad/

51cto:https://blog.51cto.com/14900374

微信公眾號:測試生財(定期分享獨家內容和資源)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM