Rocket MQ 問題排查命令


修改rocketmq官方代碼測試:

package com.alibaba.middleware.race.rocketmq;
import java.util.Scanner;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendCallback;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
/**
 * Producer,模擬rocket mq使用中可能出現的問題,學習如何排查q問題
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException {
        DefaultMQProducer producer = new DefaultMQProducer("procedure_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        final String topics = "TOPIC-IT-WORKER-TEST";
        for (int i = 0; i < 1000; i++) {
            @SuppressWarnings("resource")
            Scanner reader=new Scanner(System.in);
            int key = reader.nextInt();
            final String message = " order-message - " + i + " key: " + key;
            byte[] body = message.getBytes();
            Message msgToBroker = new Message(topics, "tag-push", String.valueOf(key), body);
            producer.send(msgToBroker, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
                    System.out.println(message);
                }
                public void onException(Throwable throwable) {
                    throwable.printStackTrace();
                }
            });
        }
    }
}
package com.alibaba.middleware.race.rocketmq;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Scanner;

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("TOPIC-IT-WORKER-TEST", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    Scanner reader=new Scanner(System.in);
                    reader.hasNext();
                    byte[] body = msg.getBody();
                    if (body.length == 2 && body[0] == 0 && body[1] == 0) {
                        System.out.println("Got the end signal");
                        continue;
                    }
                    String paymentMessage = new String(body);
                    System.out.println(paymentMessage + " key: " + msg.getKeys() + " tag: " + msg.getTags());
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.alibaba.race</groupId>
    <artifactId>preliminary.demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.3</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptors>
                        <descriptor>src/main/resources/assembly.xml</descriptor>
                    </descriptors>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>install</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

<!--    com.alibaba.middleware.race.jstorm-2.1.1版本默認的日志框架是logback,為了避免日志沖突,排除掉log4j-->
    <dependencies>
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>3.2.6</version>
        </dependency>
    </dependencies>
</project>
增加selector選擇器,根據key選擇進入的Broker隊列
producer.send(msgToBroker, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msgToBroker, Object arg) {
                    // 根據key來選擇隊列
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    for (MessageQueue mq : mqs) {
                        System.out.println("current queue: " + mq.getQueueId());
                    }
                    System.out.println("select id: " + index);
                    return mqs.get(index);
                }
            }, key, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
                    System.out.println(message);
                }

                public void onException(Throwable throwable) {
                    throwable.printStackTrace();
                }
            });

 

編譯后啟動服務端和客戶端
進入target目錄
啟動生產者生產數據:java -Drocketmq.namesrv.addr=127.0.0.1:9876 -cp preliminary.demo-1.0-SNAPSHOT.jar com.alibaba.middleware.race.rocketmq.Producer
啟動消費者消費數據:java -Drocketmq.namesrv.addr=127.0.0.1:9876 -cp preliminary.demo-1.0-SNAPSHOT.jar com.alibaba.middleware.race.rocketmq.Consumer

 

啟動&參數修改

mqnamesrv   啟動NameServer  jps - NamesrvStartup
mqbroker -n localhost:9876 啟動broker jps - BrokerStartup 默認端口10911
mqadmin updateBrokerConfig -c DefaultCluster -n 127.0.0.1:9876 -k listenPort -v 10911 更新broker參數配置

查看當前系統狀態

mqadmin clusterList -n 127.0.0.1:9876

查看當前所有topicList/創建

mqadmin topicList -n 127.0.0.1:9876
mqadmin updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t topic名稱

查看broker狀態

mqadmin brokerStatus -n 127.0.0.1:9876 -b 127.0.0.1:10911

 查看某個topic的狀態

mqadmin topicStatus -n 127.0.0.1:9876 -t TOPIC-IT-WORKER-TEST
當前可見,producer只發送了一條消息,Max offset為1,最后收到消息的時間是last updated,由於配置四個broker都是本機,只有第一個收到了當前第一條消息
第二張圖為發了四條消息之后的狀態,看起來可能就是輪詢的,因為當我增加4條key為1的msg之后,仍然是四個節點每個兩條

 

查看連接的procedure/consumer

mqadmin producerConnection -n 127.0.0.1:9876 -g procedure_group_name -t TOPIC-IT-WORKER-TEST
mqadmin consumerConnection -n 127.0.0.1:9876 -g consumer_group_name

查看某個key對應的msg

mqadmin queryMsgByKey -n 127.0.0.1:9876 -t TOPIC-IT-WORKER-TEST-1 -k 1
因為之前發送了5條key為1的數據,所以這里可以看到是5條,每條都有一個MESSAGE ID

根據ID查看對應的MSG

mqadmin queryMsgById  -g consumer_group_name -i AC1F78B700002A9F00000000000A3208  -n 127.0.0.1:9876

根據位置偏移查詢上面的那條數據

 mqadmin queryMsgByOffset -n  127.0.0.1:9876 -o 1 -t TOPIC-IT-WORKER-TEST-1 -i 1 -b izm5e210z0uiwyavdbmpxaz

查看消費詳情

mqadmin consumerProgress -n 127.0.0.1:9876 -g consumer_group_name
這里消費了一條,一共八條,差7條沒有消費

重置消費端offset

mqadmin resetOffsetByTime  -n 127.0.0.1:9876 -g consumer_group_name -t TOPIC-IT-WORKER-TEST-1 -f true -s 1536820000

打印broker中某個隊列里的消息

mqadmin printMsgByQueue -a izm5e210z0uiwyavdbmpxaz -t T1 -n 127.0.0.1:9876 -i 1 -p true -d true
這里可以看出,storeSize最后多了一條原因就是最后的body里12是兩位,廢話。。。

直接打印消息

mqadmin printMsg -t TOPIC-IT-WORKER-TEST-1 -n 127.0.0.1:9876

The most commonly used mqadmin commands are:
   updateTopic          Update or create topic
   deleteTopic          Delete topic from broker and NameServer.
   updateSubGroup       Update or create subscription group
   deleteSubGroup       Delete subscription group from broker.
   updateBrokerConfig   Update broker's config
   updateTopicPerm      Update topic perm
   topicRoute           Examine topic route info
   topicStatus          Examine topic Status info
   topicClusterList     get cluster info for topic
   brokerStatus         Fetch broker runtime status data
   queryMsgById         Query Message by Id
   queryMsgByKey        Query Message by Key
   queryMsgByUniqueKey  Query Message by Unique key
   queryMsgByOffset     Query Message by offset
   queryMsgByUniqueKey  Query Message by Unique key
   printMsg             Print Message Detail
   printMsgByQueue      Print Message Detail
   sendMsgStatus        send msg to broker.
   brokerConsumeStats   Fetch broker consume stats data
   producerConnection   Query producer's socket connection and client version
   consumerConnection   Query consumer's socket connection, client version and subscription
   consumerProgress     Query consumers's progress, speed
   consumerStatus       Query consumer's internal data structure
   cloneGroupOffset     clone offset from other group.
   clusterList          List all of clusters
   topicList            Fetch all topic list from name server
   updateKvConfig       Create or update KV config.
   deleteKvConfig       Delete KV config.
   wipeWritePerm        Wipe write perm of broker in all name server
   resetOffsetByTime    Reset consumer offset by timestamp(without client restart).
   updateOrderConf      Create or update or delete order conf
   cleanExpiredCQ       Clean expired ConsumeQueue on broker.
   cleanUnusedTopic     Clean unused topic on broker.
   startMonitoring      Start Monitoring
   statsAll             Topic and Consumer tps stats
   allocateMQ           Allocate MQ
   checkMsgSendRT       check message send response time
   clusterRT            List All clusters Message Send RT
   getNamesrvConfig     Get configs of name server.
   updateNamesrvConfig  Update configs of name server.
   getBrokerConfig      Get broker config by cluster or special broker!
   queryCq              Query cq command.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM