詳解RocketMQ中的consumer


上述就是MQ中有關Consumer的類圖,下面來介紹一下每個類

 

 1.MQAdmin:底層類,上篇博客已經提過,就不再此重提

 2.MQConsumer:Consumer公共的接口,常用的方法如下

 

 如果消費失敗的話,消息將會返回到broker中,並且延遲一會消費的時間

   void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)  throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

 

 3.MQPushConsumer:Consumer的一種,應用通常向Consumer對象注冊一個Listener接口,一旦收到消息,Consumer對象立刻回調Listener接口方法

 

4.MQPullConsumer:Consumer的一種,應用通常主動調用Consumer的拉消息方法從Broker拉消息,主動權由應用控制

 

 在上圖中出現了兩類的消費者分別是PushConsumer和PullConsumer,下面來看一下

 

 PushConsumer:通過注冊監聽的方式來消費信息

 

[java]  view plain  copy
 
 print?
  1. <span style="font-family:Comic Sans MS;font-size:18px;">/**      
  2.  * @FileName: Consumer.java    
  3.  * @Package:com.test    
  4.  * @Description: TODO   
  5.  * @author: LUCKY     
  6.  * @date:2015年12月28日 下午2:43:23    
  7.  * @version V1.0      
  8.  */  
  9. package com.test;  
  10.   
  11. import java.util.List;  
  12.   
  13. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
  14. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
  15. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
  16. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
  17. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
  18. import com.alibaba.rocketmq.common.message.Message;  
  19. import com.alibaba.rocketmq.common.message.MessageExt;  
  20.   
  21. /** 
  22.  * @ClassName: Consumer 
  23.  * @Description: 模擬消費者 
  24.  * @author: LUCKY 
  25.  * @date:2015年12月28日 下午2:43:23 
  26.  */  
  27. public class ConsumerTest {  
  28.   
  29.     public static void main(String[] args) {  
  30.         DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");  
  31.         consumer.setNamesrvAddr("100.66.154.81:9876");  
  32.         try {  
  33.               
  34.             // 訂閱PushTopic下Tag為push的消息,都訂閱消息  
  35.             consumer.subscribe("PushTopic", "push");  
  36.               
  37.             // 程序第一次啟動從消息隊列頭獲取數據  
  38.             consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  39.             //可以修改每次消費消息的數量,默認設置是每次消費一條  
  40.             // consumer.setConsumeMessageBatchMaxSize(10);  
  41.   
  42.             //注冊消費的監聽  
  43.             consumer.registerMessageListener(new MessageListenerConcurrently() {  
  44.                //在此監聽中消費信息,並返回消費的狀態信息  
  45.                 public ConsumeConcurrentlyStatus consumeMessage(  
  46.                         List<MessageExt> msgs,  
  47.                         ConsumeConcurrentlyContext context) {  
  48.                       
  49.                     // msgs中只收集同一個topic,同一個tag,並且key相同的message  
  50.                     // 會把不同的消息分別放置到不同的隊列中  
  51.                     for(Message msg:msgs){  
  52.               
  53.                         System.out.println(new String(msg.getBody()));  
  54.                     }     
  55.                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
  56.                 }  
  57.             });  
  58.   
  59.             consumer.start();  
  60.             Thread.sleep(5000);  
  61.             //5秒后掛載消費端消費  
  62.             consumer.suspend();  
  63.               
  64.         } catch (Exception e) {  
  65.             e.printStackTrace();  
  66.         }  
  67.     }  
  68. }  
  69. </span>  

 

 

 PullConsumer:通過拉去的方式來消費消息

 

[java]  view plain  copy
 
 print?
  1. <span style="font-family:Comic Sans MS;font-size:18px;">/**      
  2.  * @FileName: Consumer.java    
  3.  * @Package:com.test    
  4.  * @Description: TODO   
  5.  * @author: LUCKY     
  6.  * @date:2015年12月28日 下午2:43:23    
  7.  * @version V1.0      
  8.  */  
  9. package com.test;  
  10.   
  11. import java.util.Set;  
  12.   
  13. import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;  
  14. import com.alibaba.rocketmq.client.consumer.MessageQueueListener;  
  15. import com.alibaba.rocketmq.common.message.MessageQueue;  
  16.   
  17. /** 
  18.  * @ClassName: Consumer 
  19.  * @Description: 模擬消費者 
  20.  * @author: LUCKY 
  21.  * @date:2015年12月28日 下午2:43:23 
  22.  */  
  23. public class ConsumerPullTest {  
  24.   
  25.     public static void main(String[] args) {  
  26.         DefaultMQPullConsumer consumer=new DefaultMQPullConsumer();  
  27.         consumer.setNamesrvAddr("100.66.154.81:9876");  
  28.        consumer.setConsumerGroup("broker");  
  29.         try {  
  30.             consumer.start();  
  31.         Set<MessageQueue> messageQueues=  consumer.fetchSubscribeMessageQueues("PushTopic");        
  32.   
  33.         for(MessageQueue messageQueue:messageQueues){  
  34.           
  35.             System.out.println(messageQueue.getTopic());  
  36.         }  
  37.           
  38.           
  39.         //消息隊列的監聽  
  40.         consumer.registerMessageQueueListener("", new MessageQueueListener() {  
  41.               
  42.             @Override  
  43.             //消息隊列有改變,就會觸發  
  44.             public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,  
  45.                     Set<MessageQueue> mqDivided) {  
  46.                 // TODO Auto-generated method stub  
  47.                   
  48.             }  
  49.         });  
  50.               
  51.       
  52.         } catch (Exception e) {  
  53.             e.printStackTrace();  
  54.         }  
  55.     }  
  56. }  
  57. </span>  


一般在應用中都會采用push的方法來自動的消費信息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM