基本概念:
Producer:消息生產者,負責生產消息,一般由業務系統負責生產消息。
Consumer:消息消費者,負責消費消息,一般是后台系統負責異步消費。
Push Consumer:Consumer的一種,應用通常向Consumer對象注冊一個Listener接口,一旦收到消息,Consumer對象立刻回調Linsener接口方法
Pull Consumer:Consumer的一種,應用通常主動調用Consumer的拉消息方法從Broker拉消息,主動權由應用控制
Consumer Group:一類Consumer的集合名稱,這類Consumer通常消費一類消息,且消費邏輯一致。
Broker:消息中轉角色,負責存儲消息,轉發消息,一般也稱為Server,在JMS規范中成為Provider
Topic: 一個Topic有四個Queue
代碼示例:
生產者:
package org.hope.lee.producer; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendCallback; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; public class Producer { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("push_consumer"); // producer.setNamesrvAddr("192.168.31.176:9876"); producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); try { // 設置實例名稱 producer.setInstanceName("quick_start_producer"); // 設置重試次數 producer.setRetryTimesWhenSendFailed(3); // 開啟生產者 producer.start(); // 創建一條消息 Message msg = new Message("PushTopic_tt1", "TagB", "OrderID0034", "uniform_just_for_test".getBytes()); //目前發現3.2.6版本沒有這個方法,3.5.3版本有這個方法,並且必須要設置為false否則會報錯 // producer.setVipChannelEnabled(false); SendResult send = producer.send(msg); System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus()); //發送,並觸發回調函數 /*producer.send(msg, new SendCallback(){ //成功回調函數 @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult.getSendStatus()); System.out.println("成功"); } //異常回調函數 @Override public void onException(Throwable e) { System.out.println("失敗了" + e.getMessage()); } });*/ //獲取某個主題的消息隊列 /*List<MessageQueue> messageQueues = producer .fetchPublishMessageQueues("PushTopic"); System.out.println(messageQueues.size()); */ } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } producer.shutdown(); } }
消費者:
package org.hope.lee.consumer; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListener; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.sun.org.apache.xpath.internal.SourceTree; public class Consumer { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("push_consumer"); consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); //批量消費,每次拉取10條 consumer.setConsumeMessageBatchMaxSize(10); try { // consumer.setInstanceName("quick_start_consumer"); //3.2.6這個版本沒有這個方法,3.5.3版本要設置這個方法為false,否則取不到topic // consumer.setVipChannelEnabled(false); //程序第一次啟動從消息隊列頭取數據 //如果非第一次啟動,那么按照上次消費的位置繼續消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //訂閱PushTopic下Tag為push的消息 consumer.subscribe("PushTopic_tt1", "*"); // consumer.subscribe("PushTopic_tt1", "TagA || Tag B || Tage C"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for(MessageExt msg : msgs) { System.out.println("-------->" + msg.getKeys()); System.out.println("-------->" + msg.getMsgId()); System.out.println("-------->" + msg.getQueueId()); System.out.println("-------->" + msg.getQueueOffset()); System.out.println("-------->" + msg.getBody().toString()); System.out.println("-------->" + msg.toString()); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); // consumer.suspend(); } catch (MQClientException e) { e.printStackTrace(); } } }
注意:
在Consumer端,我們用的是DefaultMQPushConsumer這個類,
所以我們可以設置批量消費,
但是,List<MessageExt> msgs這里還是只消費一條,所以這段代碼的for循環會產生誤導,直接寫成MessageExt msg = msgs.get()就可以
那不是說consumer.setConsumeMessageBatchMaxSize(10);不就是沒用了么?其實是這樣的,我們正常的流程一般都是先啟動Consumer端,然后再啟動Producer端。Consumer端都是一條一條的消費的。但是有時候會出現先啟動Producer端,然后再啟動Consumer端這種情況,那這個時候就是會批量消費了,這個參數就會有作用了。
https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api
問題:
[1] producer生產消息報 “No route info of this topic” 異常。
解決方案:
阿里的github issues : https://github.com/alibaba/RocketMQ/issues/44
網上參考,把rocketmq的配置文件broker-a.properties中的autoCreateTopicEnable值改成true