RocketMQ生產者和消費者案例


/**
* 生產者
*/

public class Provider {
public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException {
//創建一個生產者
DefaultMQProducer producer=new DefaultMQProducer("rmq-group");
//設置NameServer地址
producer.setNamesrvAddr("192.168.7.11:9876;192.168.7.22:9876");
//設置生產者實例名稱
producer.setInstanceName("provider");
//啟動生產者
producer.start();
//發送消息
for (int i = 1; i <=10 ; i++) {
Thread.sleep(1000); //模擬網絡延遲
//創建消息 topic代表主題名稱 tags代表小分類 body代表消息體
Message message=new Message("weksoft_topic","TagA",("wdksoft-"+i).getBytes());
//發送消息
SendResult send = producer.send(message);
System.out.println(send.toString());
}
}
}

 




/**
* 消費者:監聽消費
*/

public class Consumer {
public static void main(String[] args) throws MQClientException {
//創建消費者
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
//設置NameServer地址
consumer.setNamesrvAddr("192.168.7.11:9876;192.168.7.22:9876");
//設置實例名稱
consumer.setInstanceName("consumer");
//訂閱Topic
consumer.subscribe("weksoft_topic","TagA");
//監聽消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//獲取消息
for(MessageExt ext:msgs){
//RocketMQ由於是集群環境,所以產生的消息ID可能會重復
System.out.println(ext.getMsgId()+"----------"+new String(ext.getBody()));
}
//接受消息狀態 1.消費成功 2.消費失敗 隊列還有
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//啟動消費者
consumer.start();
}
}

 



2.RocketMQ重試機制
消費者重試:
報異常 int result = 5 / 0;


網絡延遲

try {
Thread.sleep(600000);
} catch (InterruptedException e) {
e.printStackTrace();
}

 


3.RocketMQ解決冪等性問題
//網絡延遲,可能造成重復消費,如何解決重復消費(冪等性)問題
/**
* activeMQ:根據MessageID判斷,獲取當前的MessageID,判斷和上一次是否一致,如果一致代表重復消費
* 如果不一致則進行消費
*
*
* RocketMQ:不能使用MessageID判斷,因為在集群環境中,可能出現消息ID相同情況
* 消息Key,唯一,手動設置
*/

/* try {
Thread.sleep(600000);
} catch (InterruptedException e) {
e.printStackTrace();
}
*/

 


3.1 生產者在生產數據時,指定數據的key,然后消費者進行數據消費時,獲取到key,與redis中保存的key做判斷,如果不相同
代表之前沒有人進行消費,處理消費,保存到redis當中,當有第二個消費者時,如果拿到的消息與redis中相同代表之前已
已經有人消費。就進行數據簽收,防止后續消費者同樣拿到重復消費數據

3.2 生產者生產message指定key

public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException {
//創建一個生產者
DefaultMQProducer producer=new DefaultMQProducer("rmq-group");
//設置NameServer地址
producer.setNamesrvAddr("192.168.7.11:9876;192.168.7.22:9876");
//設置生產者實例名稱
producer.setInstanceName("provider");
//啟動生產者
producer.start();
//發送消息
for (int i = 1; i <=1 ; i++) {
Thread.sleep(1000); //模擬網絡延遲
//創建消息 topic代表主題名稱 tags代表小分類 body代表消息體
Message message=new Message("weksoft_topic_10","TagA",("wdksoft10-"+i).getBytes());
//消息的唯一標識
message.setKeys("訂單編號"+i);
//發送消息
SendResult send = producer.send(message);
System.out.println(send.toString());
}
}

 


3.3 消費者

public static void main(String[] args) throws MQClientException {
//創建消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
//設置NameServer地址
consumer.setNamesrvAddr("192.168.7.11:9876;192.168.7.22:9876");
//設置實例名稱
consumer.setInstanceName("consumer");
//訂閱Topic
consumer.subscribe("weksoft_topic_10", "TagA");
//監聽消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//獲取消息
for (MessageExt ext : msgs) {
//判斷redis中有沒有當前消息key
if(map.containsKey(ext.getKeys())){
System.out.println("已經消費.......");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//RocketMQ由於是集群環境,所以產生的消息ID可能會重復
System.out.println(ext.getMsgId() + "----------" + new String(ext.getBody()));
//將當前Key保存到redis當中
map.put(ext.getKeys(),ext);
}

try{
int result=5/0;
}catch (Exception e){
//人工補償
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

//接受消息狀態 1.消費成功 2.消費失敗 隊列還有
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//啟動消費者
consumer.start();
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM