1.環境參考
benchmark環境搭建:參考單機快速搭建單broker環境
被壓測環境:rocketmq的dledger集群
2.源碼位置
https://github.com/apache/rocketmq/tree/master/example/src/main/java/org/apache/rocketmq/example/benchmark
3.工具清單
consumer.sh:消息消費的benchmark工具
producer.sh: 消息生產benchmark工具(同步非批處理模式)
3.1 producer.sh
3.1.1 幫助
sh producer.sh -h
usage: benchmarkProducer [-h] [-k <arg>] [-n <arg>] [-s <arg>] [-t <arg>] [-w <arg>] -h,--help Print help -k,--keyEnable <arg> Message Key Enable, Default: false //是否使用message key,true則使用timestamp作為message的key -n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876 //指定nameserver地址 -s,--messageSize <arg> Message Size, Default: 128 //指定消息大小,默認128字節 -t,--topic <arg> Topic name, Default: BenchmarkTest //指定topic,默認使用BenchmarkTest,如果指定其他記得先創建對應的topic -w,--threadCount <arg> Thread count, Default: 64 //開啟的發送生產消息的線程數
3.1.2 源碼重要片段
//默認生產組為:benchmark_producer final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer");
//如果keyEnable為True,則會以時間戳作為message的key if (keyEnable) { msg.setKeys(String.valueOf(beginTimestamp / 1000)); }
//設置producer用於發送消息的線程池大小,-w的值 final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
3.1.3 例子
指定nameserver進行生產消息壓測
sh producer.sh -n xxx.xxx.xxx.xxx:9876
SendTPS:生產消息的TPS
Max RT:最大響應時間(毫秒)
Average RT:平均響應時間(毫秒)
Send Failed:發送失敗的總請求數
Response Failed:返回失敗的總響應數
這里剛開始有發生失敗的原因是由於producer剛啟動,短期內對broker造成了壓力。在實際使用producer的時候,應該對發送失敗的情況進行重新消息重發。
可以看到控制台里Produce Message TPS為3000多,其中slave是從master同步消息的TPS(備份master的消息數據),master才是實際接收的生產消息TPS。
3.2 consumer.sh
3.2.1 幫助
sh consumer.sh -h
usage: benchmarkConsumer [-e <arg>] [-f <arg>] [-g <arg>] [-h] [-n <arg>] [-p <arg>] [-r <arg>] [-t <arg>] -e,--expression <arg> filter expression content file path.ie: ./test/expr //配合filter參數使用,過濾的條件表達式 -f,--filterType <arg> TAG, SQL92 //過濾方式 -g,--group <arg> Consumer group name, Default: benchmark_consumer //指定消費組,默認為benchmark_consumer -h,--help Print help -n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876 //指定nameserver地址 -p,--group prefix enable <arg> Consumer group name, Default: false //是否給消費組添加后綴,默認會給指定的消費組后添加后綴,默認應該是true(提示有問題) -r,--fail rate <arg> consumer fail rate, default 0 //指定消費失敗率,只要沒有超過消費失敗率,消費失敗都會重試 -t,--topic <arg> Topic name, Default: BenchmarkTest //指定topic,默認使用BenchmarkTest,如果指定其他記得先創建對應的topic
3.2.2 源碼重要片段
#根據指定的消費group生成消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
#指定nameserver地址 if (commandLine.hasOption('n')) { String ns = commandLine.getOptionValue('n'); consumer.setNamesrvAddr(ns);
}
#如果沒有指定isSuffixEnable,即-p指定的數值,則會給消費組加上后綴
if (Boolean.parseBoolean(isSuffixEnable)) { group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
}
#指定topic的消息過濾器,只消費符合條件的消息
#SQL92語法
#TAG語言
if (filterType == null || expression == null) { consumer.subscribe(topic, "*"); } else { if (ExpressionType.TAG.equals(filterType)) { String expr = MixAll.file2String(expression); System.out.printf("Expression: %s%n", expr); consumer.subscribe(topic, MessageSelector.byTag(expr)); } else if (ExpressionType.SQL92.equals(filterType)) { String expr = MixAll.file2String(expression); System.out.printf("Expression: %s%n", expr); consumer.subscribe(topic, MessageSelector.bySql(expr)); } else { throw new IllegalArgumentException("Not support filter type! " + filterType); }
}
#如果當前消費比例小於failRate,會稍后進行重試消費,否則直接跳過
if (ThreadLocalRandom.current().nextDouble() < failRate) { statsBenchmarkConsumer.getFailCount().incrementAndGet(); return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} else { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
3.2.3 例子
#從BenchmarkTest消費消息,這里會自動給消費組:test2加上一個后綴
sh consumer.sh -t BenchmarkTest -n nameserver:9876 -g test2
TPS: 消費的TPS
FAIL:消費失敗的總數
AVG(B2C):broker到Consumer的平均響應時間(毫秒)
AVG(S2C):nameserver到Consumer的平均響應時間(毫秒)
MAX(B2C): broker到Consumer的最大響應時間(毫秒)
MAX(S2C): nameserver到Consumer的最大響應時間(毫秒)
可以看到控制台里Consumer Message TPS為6w多,遠大於producer的tps,且消費只從Master請求消息。
博主:測試生財(一個不為996而996的測開碼農)
座右銘:專注測試開發與自動化運維,努力讀書思考寫作,為內卷的人生奠定財務自由。
內容范疇:技術提升,職場雜談,事業發展,閱讀寫作,投資理財,健康人生。
csdn:https://blog.csdn.net/ccgshigao
博客園:https://www.cnblogs.com/qa-freeroad/
51cto:https://blog.51cto.com/14900374
微信公眾號:測試生財(定期分享獨家內容和資源)