RocketMQ 順序消費只消費一次 坑


rocketMq實現順序消費的原理

produce在發送消息的時候,把消息發到同一個隊列(queue)中,消費者注冊消息監聽器為MessageListenerOrderly,這樣就可以保證消費端只有一個線程去消費消息

注意:是把把消息發到同一個隊列(queue),不是同一個topic,默認情況下一個topic包括4個queue

單個節點(Producer端1個、Consumer端1個)

1、Producer.java 

 

package order;  
  
import java.util.List;  
  
import com.alibaba.rocketmq.client.exception.MQBrokerException;  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;  
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;  
import com.alibaba.rocketmq.client.producer.SendResult;  
import com.alibaba.rocketmq.common.message.Message;  
import com.alibaba.rocketmq.common.message.MessageQueue;  
import com.alibaba.rocketmq.remoting.exception.RemotingException;  
  
/** 
 * Producer,發送順序消息 
 */  
public class Producer {  
    public static void main(String[] args) {  
        try {  
            DefaultMQProducer producer = new DefaultMQProducer("order_Producer");  
            producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  
            producer.start();  
  
            // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",  
            // "TagE" };  
  
            for (int i = 1; i <= 5; i++) {  
  
                Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());  
  
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                        Integer id = (Integer) arg;  
                        int index = id % mqs.size();  
                        return mqs.get(index);  
                    }  
                }, 0);  
  
                System.out.println(sendResult);  
            }  
  
            producer.shutdown();  
        } catch (MQClientException e) {  
            e.printStackTrace();  
        } catch (RemotingException e) {  
            e.printStackTrace();  
        } catch (MQBrokerException e) {  
            e.printStackTrace();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}

2、Consumer.java   (有問題)

 

package order;   
import java.util.List;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.atomic.AtomicLong;   
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;  
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
import com.alibaba.rocketmq.common.message.MessageExt;  
  
/** 
 * 順序消息消費,帶事務方式(應用可控制Offset什么時候提交) 
 */  
public class Consumer1 {  
  
    public static void main(String[] args) throws MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  
        /** 
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> 
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        consumer.subscribe("TopicOrderTest", "*");  
  
 consumer.registerMessageListener(new MessageListenerOrderly() {  
            AtomicLong consumeTimes = new AtomicLong(0);  
  
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
                // 設置自動提交  
                context.setAutoCommit(true);  
                for (MessageExt msg : msgs) {  
                    System.out.println(msg + ",內容:" + new String(msg.getBody()));  
                }  
  
                try {  
                    TimeUnit.SECONDS.sleep(5L);  
                } catch (InterruptedException e) {  
  
                    e.printStackTrace();  
                }  
                ;  
  
                return ConsumeOrderlyStatus.SUCCESS; } });  
  
        consumer.start();  
  
        System.out.println("Consumer1 Started.");  
    }  
  
}

 

 

這個地方有一個大坑,注冊監聽類的時候,不能使用匿名內部類。不然的話,只會消費一次,然后消費者就 掛了……掛了……掛了…… 

監聽類要單獨寫!!!

 

正確消費者寫法:

自定義監聽類:

MyMessageListener

public class MyMessageListener implements  MessageListenerOrderly {

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        // 設置自動提交
        context.setAutoCommit(true);
        for (MessageExt msg : msgs) {
            System.out.println(msg + ",內容:" + new String(msg.getBody()));
        }

        try {
            TimeUnit.SECONDS.sleep(5L);
        } catch (InterruptedException e) {

            e.printStackTrace();
        }

        return ConsumeOrderlyStatus.SUCCESS;
    }
}

Consumer.java   

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
        consumer.setNamesrvAddr("101.200.33.225:9876");

        /**
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicOrderTest", "*");

 MyMessageListener myMessageListener = new MyMessageListener();
consumer.registerMessageListener(myMessageListener);
consumer.start(); System.out.println(
"Consumer1 Started."); } }

 

參考:https://www.cnblogs.com/antain/p/rocketmq.html

           http://www.cnblogs.com/520playboy/p/6750023.html

           http://dbaplus.cn/news-21-1123-1.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM