一.導入依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.1.0-incubating</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.1</version> </dependency>
二:生產者
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import redis.clients.jedis.Jedis; public class Producer { public static void main(String[] args) throws MQClientException { //創建生產者並指定組 DefaultMQProducer producer = new DefaultMQProducer("my-group"); //指定服務地址 producer.setNamesrvAddr("192.168.118.3:9876;192.168.118.4:9876"); //創建生產者實例 producer.setInstanceName("producer"); //啟動生成者 producer.start(); try { for (int i = 0; i < 10; i++) { Thread.sleep(1000); // 每秒發送一次MQ Message msg = new Message("my-topic", // topic 主題名稱 "TagA", // tag 臨時值 ("message-"+i).getBytes()// body 內容 ); SendResult sendResult = producer.send(msg); //打印消息的完整信息 System.out.println(sendResult.toString()); } } catch (Exception e) { e.printStackTrace(); } //發送完所有信息停掉生產者 producer.shutdown(); } }
三.消費者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group"); consumer.setNamesrvAddr("192.168.118.3:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("my-topic", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg.getMsgId()+"---"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
四:解決消息重復消費
在客戶端網絡延遲或者報錯的情況下導致消息無法成功簽收,其他的消費者能繼續監聽到這個消息,導致重復消費的情況
我們可以給沒一條消息一個獨一無二的標識,當作消息的keys,接受到消息之后,查看redis中有沒有這個key,如果有就代表重復消費了,反之正常執行業務邏輯,先將keys放到redis中,防止其他消費者進行重復消費
,在所有操作執行完之后將keys重redis中刪除
消費者代碼
package com.yjc.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import redis.clients.jedis.Jedis; import java.util.List; public class Consumer { public static void main(String[] args) throws MQClientException { Jedis jedis=new Jedis("192.168.118.3",6379); jedis.auth("admin"); jedis.select(0); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group"); consumer.setNamesrvAddr("192.168.118.3:9876;192.168.118.4"); consumer.setInstanceName("consumer"); consumer.subscribe("my-topic3", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("監聽到消息"); //查看此消息的key是否在緩存中 if (jedis.exists(msg.getKeys())) { System.out.println("重復消費!"); //重復消費的情況簽收失敗 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } jedis.set(msg.getKeys(),new String(msg.getBody())); System.out.println("放到緩存中"); //模擬出錯 int a=5/0; System.out.println(msg.getMsgId()+"---"+new String(msg.getBody())); //消費完從緩存中刪除 jedis.del(msg.getKeys()); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }