1. 添加依賴
pom.xml如下:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-srvutil</artifactId> <version>4.3.1</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> <version>3.23.1-GA</version> </dependency> <dependency> <groupId>io.openmessaging</groupId> <artifactId>openmessaging-api</artifactId> <version>0.3.0-alpha</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-openmessaging</artifactId> <version>4.3.1</version> </dependency>
2. Producer 的開發步驟
1. 實例化Producer Group,如下:
DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");
2. 設置namesrvAddr,集群環境多個nameserver用;分割,如下:
producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");
3. 調用start()方法啟動:
producer.start();
4. 發送消息
for (int i = 0; i < 10; i++) { //構建實例,第一個參數為topic,第二個參數為tabs,第三個參數為消息體 Message message = new Message("MyQuickStartTopic","tabA",("Hello World:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.println(result); }
5. 關閉生產者(根據自己需求確定是夠需要關閉)
producer.shutdown();
完整示例如下:
package com.wangx.rocketmq.quickstart; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; /** * 創建一個消費者 */ public class Producer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException { //1. 實例化一個producer group DefaultMQProducer producer = new DefaultMQProducer("my-producer-group"); //2. 設置namesrvAddr,集群環境多個nameserver用;分割 producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876"); //3. 啟動 producer.start(); // 4. 發送消息 for (int i = 0; i < 10; i++) { //構建實例,第一個參數為topic,第二個參數為tabs,第三個參數為消息體 Message message = new Message("MyQuickStartTopic","tabA",("Hello World:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.println(result); } //關閉生產者 producer.shutdown(); } }
使用方式可以說非常簡單了。
3. Consumer開發步驟
1. 實例化Consumer Group,如下:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group");
2. 設置namesrvAddr,集群環境多個nameserver用;分割,如下:
producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");
3. 設置從什么位置開始都
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
4. 訂閱topic.
consumer.subscribe("MyQuickStartTopic", "*");
5. 注冊消息監聽器
consumer.registerMessageListener();
6. 重寫MessageListenerConcurrently接口的consumeMessage()方法
完整代碼如下:
package com.wangx.rocketmq.quickstart; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * 創建一個消費者 */ public class Consumer { public static void main(String[] args) throws MQClientException { //實例化一個consumer組 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group"); //設置setNamesrvAddr,同生產者 consumer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876"); //設置消息讀取方式,這里設置的是隊尾開始讀取 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //設置訂閱主題,第二個參數為過濾tabs的條件,可以寫為tabA|tabB過濾Tab,*表示接受所有 consumer.subscribe("MyQuickStartTopic", "*"); //注冊消息監聽 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { //的到MessageExt MessageExt messageExt = list.get(0); String topic = messageExt.getTopic(); String message = new String(messageExt.getBody(),"UTF-8"); int queueId = messageExt.getQueueId(); System.out.println("收到來自topic:" + topic + ", queueId:" + queueId + "的消息:" + message); } catch (Exception e) { //失敗,請求稍后重發 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } //成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
consumeMessage返回一個枚舉的兩種狀態,成功表示接受成功,否則返回稍后重發的狀態。這里注意,啟動的時候需要consumer先啟動,因為它需要在生產者之前先訂閱,否則將會收不到生產在consumer生產的消息,造成消息丟失。
啟動consumer,在啟動producer
producer控制台
consumer控制台:
rocketmq-console信息:
可以看到,我們前面部署的集群環境也是能夠實現消息的負載均衡的,會使兩個broker上都創建topic,且都能夠接收生產者生產的消息。
進入topic,可以看到新增了兩個我們自定義的topic
可能會出現的問題:
RemotingTooMuchRequestException: sendDefaultImpl call timeout
在客戶端運行Producer時,可能會出現如上異常,這是因為從 Windows 上開發連接 虛擬機中的 nameServer 時要經過 Linux 系統的防火牆,而防火牆一般都會有超時的機制,在網絡連接長時間不傳輸數據時,會關閉這個 TCP 的會話,關閉后再讀寫,就有可能導致這個異常。
解決辦法就是關閉防火牆,ubuntu下命令如下:
contOS下命令如下:
systemctl stop firewalld.service #停止firewall
systemctl disable firewalld.service #禁止firewall開機啟動
firewall-cmd --state #查看默認防火牆狀態(關閉后顯示notrunning,開啟后顯示running)