修改rocketmq官方代碼測試:
package com.alibaba.middleware.race.rocketmq; import java.util.Scanner; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendCallback; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * Producer,模擬rocket mq使用中可能出現的問題,學習如何排查q問題 */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException { DefaultMQProducer producer = new DefaultMQProducer("procedure_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); final String topics = "TOPIC-IT-WORKER-TEST"; for (int i = 0; i < 1000; i++) { @SuppressWarnings("resource") Scanner reader=new Scanner(System.in); int key = reader.nextInt(); final String message = " order-message - " + i + " key: " + key; byte[] body = message.getBytes(); Message msgToBroker = new Message(topics, "tag-push", String.valueOf(key), body); producer.send(msgToBroker, new SendCallback() { public void onSuccess(SendResult sendResult) { System.out.println(message); } public void onException(Throwable throwable) { throwable.printStackTrace(); } }); } } }
package com.alibaba.middleware.race.rocketmq; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; import java.util.Scanner; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.subscribe("TOPIC-IT-WORKER-TEST", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { Scanner reader=new Scanner(System.in); reader.hasNext(); byte[] body = msg.getBody(); if (body.length == 2 && body[0] == 0 && body[1] == 0) { System.out.println("Got the end signal"); continue; } String paymentMessage = new String(body); System.out.println(paymentMessage + " key: " + msg.getKeys() + " tag: " + msg.getTags()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.alibaba.race</groupId> <artifactId>preliminary.demo</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.5.3</version> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptors> <descriptor>src/main/resources/assembly.xml</descriptor> </descriptors> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>install</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <!-- com.alibaba.middleware.race.jstorm-2.1.1版本默認的日志框架是logback,為了避免日志沖突,排除掉log4j--> <dependencies> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency> </dependencies> </project>
增加selector選擇器,根據key選擇進入的Broker隊列 producer.send(msgToBroker, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msgToBroker, Object arg) { // 根據key來選擇隊列 Integer id = (Integer) arg; int index = id % mqs.size(); for (MessageQueue mq : mqs) { System.out.println("current queue: " + mq.getQueueId()); } System.out.println("select id: " + index); return mqs.get(index); } }, key, new SendCallback() { public void onSuccess(SendResult sendResult) { System.out.println(message); } public void onException(Throwable throwable) { throwable.printStackTrace(); } });
編譯后啟動服務端和客戶端 進入target目錄 啟動生產者生產數據:java -Drocketmq.namesrv.addr=127.0.0.1:9876 -cp preliminary.demo-1.0-SNAPSHOT.jar com.alibaba.middleware.race.rocketmq.Producer 啟動消費者消費數據:java -Drocketmq.namesrv.addr=127.0.0.1:9876 -cp preliminary.demo-1.0-SNAPSHOT.jar com.alibaba.middleware.race.rocketmq.Consumer
啟動&參數修改
mqnamesrv 啟動NameServer jps - NamesrvStartup mqbroker -n localhost:9876 啟動broker jps - BrokerStartup 默認端口10911 mqadmin updateBrokerConfig -c DefaultCluster -n 127.0.0.1:9876 -k listenPort -v 10911 更新broker參數配置
查看當前系統狀態
mqadmin clusterList -n 127.0.0.1:9876
查看當前所有topicList/創建
mqadmin topicList -n 127.0.0.1:9876
mqadmin updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t topic名稱
查看broker狀態
mqadmin brokerStatus -n 127.0.0.1:9876 -b 127.0.0.1:10911
查看某個topic的狀態
mqadmin topicStatus -n 127.0.0.1:9876 -t TOPIC-IT-WORKER-TEST
當前可見,producer只發送了一條消息,Max offset為1,最后收到消息的時間是last updated,由於配置四個broker都是本機,只有第一個收到了當前第一條消息
第二張圖為發了四條消息之后的狀態,看起來可能就是輪詢的,因為當我增加4條key為1的msg之后,仍然是四個節點每個兩條
查看連接的procedure/consumer
mqadmin producerConnection -n 127.0.0.1:9876 -g procedure_group_name -t TOPIC-IT-WORKER-TEST
mqadmin consumerConnection -n 127.0.0.1:9876 -g consumer_group_name
查看某個key對應的msg
mqadmin queryMsgByKey -n 127.0.0.1:9876 -t TOPIC-IT-WORKER-TEST-1 -k 1
因為之前發送了5條key為1的數據,所以這里可以看到是5條,每條都有一個MESSAGE ID
根據ID查看對應的MSG
mqadmin queryMsgById -g consumer_group_name -i AC1F78B700002A9F00000000000A3208 -n 127.0.0.1:9876
根據位置偏移查詢上面的那條數據
mqadmin queryMsgByOffset -n 127.0.0.1:9876 -o 1 -t TOPIC-IT-WORKER-TEST-1 -i 1 -b izm5e210z0uiwyavdbmpxaz
查看消費詳情
mqadmin consumerProgress -n 127.0.0.1:9876 -g consumer_group_name
這里消費了一條,一共八條,差7條沒有消費
重置消費端offset
mqadmin resetOffsetByTime -n 127.0.0.1:9876 -g consumer_group_name -t TOPIC-IT-WORKER-TEST-1 -f true -s 1536820000
打印broker中某個隊列里的消息
mqadmin printMsgByQueue -a izm5e210z0uiwyavdbmpxaz -t T1 -n 127.0.0.1:9876 -i 1 -p true -d true
這里可以看出,storeSize最后多了一條原因就是最后的body里12是兩位,廢話。。。
直接打印消息
mqadmin printMsg -t TOPIC-IT-WORKER-TEST-1 -n 127.0.0.1:9876
The most commonly used mqadmin commands are: updateTopic Update or create topic deleteTopic Delete topic from broker and NameServer. updateSubGroup Update or create subscription group deleteSubGroup Delete subscription group from broker. updateBrokerConfig Update broker's config updateTopicPerm Update topic perm topicRoute Examine topic route info topicStatus Examine topic Status info topicClusterList get cluster info for topic brokerStatus Fetch broker runtime status data queryMsgById Query Message by Id queryMsgByKey Query Message by Key queryMsgByUniqueKey Query Message by Unique key queryMsgByOffset Query Message by offset queryMsgByUniqueKey Query Message by Unique key printMsg Print Message Detail printMsgByQueue Print Message Detail sendMsgStatus send msg to broker. brokerConsumeStats Fetch broker consume stats data producerConnection Query producer's socket connection and client version consumerConnection Query consumer's socket connection, client version and subscription consumerProgress Query consumers's progress, speed consumerStatus Query consumer's internal data structure cloneGroupOffset clone offset from other group. clusterList List all of clusters topicList Fetch all topic list from name server updateKvConfig Create or update KV config. deleteKvConfig Delete KV config. wipeWritePerm Wipe write perm of broker in all name server resetOffsetByTime Reset consumer offset by timestamp(without client restart). updateOrderConf Create or update or delete order conf cleanExpiredCQ Clean expired ConsumeQueue on broker. cleanUnusedTopic Clean unused topic on broker. startMonitoring Start Monitoring statsAll Topic and Consumer tps stats allocateMQ Allocate MQ checkMsgSendRT check message send response time clusterRT List All clusters Message Send RT getNamesrvConfig Get configs of name server. updateNamesrvConfig Update configs of name server. getBrokerConfig Get broker config by cluster or special broker! queryCq Query cq command.