RocketMQ-quickstart(批量消費問題)


基本概念:

  Producer:消息生產者,負責生產消息,一般由業務系統負責生產消息。

  Consumer:消息消費者,負責消費消息,一般是后台系統負責異步消費。

  Push Consumer:Consumer的一種,應用通常向Consumer對象注冊一個Listener接口,一旦收到消息,Consumer對象立刻回調Linsener接口方法

  Pull Consumer:Consumer的一種,應用通常主動調用Consumer的拉消息方法從Broker拉消息,主動權由應用控制

  Consumer Group:一類Consumer的集合名稱,這類Consumer通常消費一類消息,且消費邏輯一致。

  Broker:消息中轉角色,負責存儲消息,轉發消息,一般也稱為Server,在JMS規范中成為Provider

  Topic: 一個Topic有四個Queue

代碼示例:

 生產者:

package org.hope.lee.producer;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendCallback;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

public class Producer {
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("push_consumer");
//        producer.setNamesrvAddr("192.168.31.176:9876");
        producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
        try {
            // 設置實例名稱
            producer.setInstanceName("quick_start_producer");
            // 設置重試次數
            producer.setRetryTimesWhenSendFailed(3);
            // 開啟生產者
            producer.start();
            // 創建一條消息
            Message msg = new Message("PushTopic_tt1", "TagB", "OrderID0034", "uniform_just_for_test".getBytes());
            //目前發現3.2.6版本沒有這個方法,3.5.3版本有這個方法,並且必須要設置為false否則會報錯
//            producer.setVipChannelEnabled(false);
            SendResult send = producer.send(msg);
            System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus());
            //發送,並觸發回調函數
            /*producer.send(msg, new SendCallback(){
                //成功回調函數
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult.getSendStatus());
                    System.out.println("成功");
                }
                //異常回調函數
                @Override
                public void onException(Throwable e) {
                    System.out.println("失敗了" +  e.getMessage());
                }
            });*/
            
            //獲取某個主題的消息隊列  
            /*List<MessageQueue> messageQueues = producer  
                    .fetchPublishMessageQueues("PushTopic");  
            System.out.println(messageQueues.size());  */
            
            
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } 
        producer.shutdown();
    }
}

 消費者:

package org.hope.lee.consumer;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.sun.org.apache.xpath.internal.SourceTree;

public class Consumer {
    public static void main(String[] args) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("push_consumer");
        consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
        //批量消費,每次拉取10條
        consumer.setConsumeMessageBatchMaxSize(10);
        try {
//            consumer.setInstanceName("quick_start_consumer");
            //3.2.6這個版本沒有這個方法,3.5.3版本要設置這個方法為false,否則取不到topic
//            consumer.setVipChannelEnabled(false);

            //程序第一次啟動從消息隊列頭取數據
            //如果非第一次啟動,那么按照上次消費的位置繼續消費
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //訂閱PushTopic下Tag為push的消息
            consumer.subscribe("PushTopic_tt1", "*");
//            consumer.subscribe("PushTopic_tt1", "TagA || Tag B || Tage C");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for(MessageExt msg : msgs) {
                        System.out.println("-------->" + msg.getKeys());
                        System.out.println("-------->" + msg.getMsgId());
                        System.out.println("-------->" + msg.getQueueId());
                        System.out.println("-------->" + msg.getQueueOffset());
                        System.out.println("-------->" + msg.getBody().toString());
                        System.out.println("-------->" + msg.toString());
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started."); 
//            consumer.suspend();
            
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        
        
    }
}

 注意:

  在Consumer端,我們用的是DefaultMQPushConsumer這個類,

  所以我們可以設置批量消費,

  但是,List<MessageExt> msgs這里還是只消費一條,所以這段代碼的for循環會產生誤導,直接寫成MessageExt msg = msgs.get()就可以

   那不是說consumer.setConsumeMessageBatchMaxSize(10);不就是沒用了么?其實是這樣的,我們正常的流程一般都是先啟動Consumer端,然后再啟動Producer端。Consumer端都是一條一條的消費的。但是有時候會出現先啟動Producer端,然后再啟動Consumer端這種情況,那這個時候就是會批量消費了,這個參數就會有作用了。

 

 

 https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api

問題:

[1] producer生產消息報 “No route info of this topic” 異常。

解決方案:

阿里的github issues : https://github.com/alibaba/RocketMQ/issues/44

網上參考,把rocketmq的配置文件broker-a.properties中的autoCreateTopicEnable值改成true


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM