/**
* 生產者
*/
public class Provider { public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException { //創建一個生產者 DefaultMQProducer producer=new DefaultMQProducer("rmq-group"); //設置NameServer地址 producer.setNamesrvAddr("192.168.7.11:9876;192.168.7.22:9876"); //設置生產者實例名稱 producer.setInstanceName("provider"); //啟動生產者 producer.start(); //發送消息 for (int i = 1; i <=10 ; i++) { Thread.sleep(1000); //模擬網絡延遲 //創建消息 topic代表主題名稱 tags代表小分類 body代表消息體 Message message=new Message("weksoft_topic","TagA",("wdksoft-"+i).getBytes()); //發送消息 SendResult send = producer.send(message); System.out.println(send.toString()); } } }
/**
* 消費者:監聽消費
*/
public class Consumer { public static void main(String[] args) throws MQClientException { //創建消費者 DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group"); //設置NameServer地址 consumer.setNamesrvAddr("192.168.7.11:9876;192.168.7.22:9876"); //設置實例名稱 consumer.setInstanceName("consumer"); //訂閱Topic consumer.subscribe("weksoft_topic","TagA"); //監聽消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //獲取消息 for(MessageExt ext:msgs){ //RocketMQ由於是集群環境,所以產生的消息ID可能會重復 System.out.println(ext.getMsgId()+"----------"+new String(ext.getBody())); } //接受消息狀態 1.消費成功 2.消費失敗 隊列還有 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //啟動消費者 consumer.start(); } }
2.RocketMQ重試機制
消費者重試:
報異常 int result = 5 / 0;
網絡延遲
try { Thread.sleep(600000); } catch (InterruptedException e) { e.printStackTrace(); }
3.RocketMQ解決冪等性問題
//網絡延遲,可能造成重復消費,如何解決重復消費(冪等性)問題
/**
* activeMQ:根據MessageID判斷,獲取當前的MessageID,判斷和上一次是否一致,如果一致代表重復消費
* 如果不一致則進行消費
*
*
* RocketMQ:不能使用MessageID判斷,因為在集群環境中,可能出現消息ID相同情況
* 消息Key,唯一,手動設置
*/
/* try { Thread.sleep(600000); } catch (InterruptedException e) { e.printStackTrace(); } */
3.1 生產者在生產數據時,指定數據的key,然后消費者進行數據消費時,獲取到key,與redis中保存的key做判斷,如果不相同
代表之前沒有人進行消費,處理消費,保存到redis當中,當有第二個消費者時,如果拿到的消息與redis中相同代表之前已
已經有人消費。就進行數據簽收,防止后續消費者同樣拿到重復消費數據
3.2 生產者生產message指定key
public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException { //創建一個生產者 DefaultMQProducer producer=new DefaultMQProducer("rmq-group"); //設置NameServer地址 producer.setNamesrvAddr("192.168.7.11:9876;192.168.7.22:9876"); //設置生產者實例名稱 producer.setInstanceName("provider"); //啟動生產者 producer.start(); //發送消息 for (int i = 1; i <=1 ; i++) { Thread.sleep(1000); //模擬網絡延遲 //創建消息 topic代表主題名稱 tags代表小分類 body代表消息體 Message message=new Message("weksoft_topic_10","TagA",("wdksoft10-"+i).getBytes()); //消息的唯一標識 message.setKeys("訂單編號"+i); //發送消息 SendResult send = producer.send(message); System.out.println(send.toString()); } }
3.3 消費者
public static void main(String[] args) throws MQClientException { //創建消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group"); //設置NameServer地址 consumer.setNamesrvAddr("192.168.7.11:9876;192.168.7.22:9876"); //設置實例名稱 consumer.setInstanceName("consumer"); //訂閱Topic consumer.subscribe("weksoft_topic_10", "TagA"); //監聽消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //獲取消息 for (MessageExt ext : msgs) { //判斷redis中有沒有當前消息key if(map.containsKey(ext.getKeys())){ System.out.println("已經消費......."); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //RocketMQ由於是集群環境,所以產生的消息ID可能會重復 System.out.println(ext.getMsgId() + "----------" + new String(ext.getBody())); //將當前Key保存到redis當中 map.put(ext.getKeys(),ext); } try{ int result=5/0; }catch (Exception e){ //人工補償 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } //接受消息狀態 1.消費成功 2.消費失敗 隊列還有 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //啟動消費者 consumer.start(); }
