RocketMQ生產者和消費者
注:生產者在生產數據時,指定數據的key,然后消費者進行數據消費時,獲取到key,與redis中保存的key做判斷
如果不相同代表之前沒有人進行消費,處理消費,保存到redis當中
當有第二個消費者時,如果拿到的消息與redis中相同代表之前已已經有人消費
就進行數據簽收,防止后續消費者同樣拿到重復消費數據
注:消費者的消費邏輯失敗時,可以通過設置返回狀態達到消息重試的結果。
消息重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續消費新的消息。
每次重試后,消息ID都不一致,所以不能使用消息ID判斷冪等。
生產者
private static Map<String,Object> map=new HashMap<>();
public static void main(String[] args) throws Exception {
//創建消費者
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("mckz-group");
//設置NameServer地址
consumer.setNamesrvAddr("192.168.3.100:9876;192.168.3.200:9876");
//設置實例名稱
consumer.setInstanceName("mckz");
//訂閱Topic
consumer.subscribe("mckz_topic","TagA");
//監聽消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//獲取消息
for(MessageExt ext:msgs){
//判斷redis中有沒有當前消息key
if(map.containsKey(ext.getKeys())){
System.out.println("已經消費.......");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//RocketMQ由於是集群環境,所以產生的消息ID可能會重復
System.out.println(ext.getMsgId()+"----------"+new String(ext.getBody()));
//將當前Key保存到redis當中
map.put(ext.getKeys(),ext);
}
/*如果出現異常可進行try進行捕獲*/
/*try {
int a=5/0;
}catch (Exception e){
//接受消息狀態
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}*/
/*try {
Thread.sleep(60000);
}catch (Exception e){
e.printStackTrace();
}*/
//接受消息狀態
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//啟動消費者
consumer.start();
}
消費者
public static void main(String[] args) throws Exception {
//創建一個生產者
DefaultMQProducer producer=new DefaultMQProducer("mckz_group");
//設置NameServer地址
producer.setNamesrvAddr("192.168.3.100:9876;192.168.3.200:9876");
//設置生產者實例名稱
producer.setInstanceName("mckz");
//啟動生產者
producer.start();
//發送消息
for (int i = 1; i <=10 ; i++) {
Thread.sleep(1000); //模擬網絡延遲
//創建消息 topic代表主題名稱 tags代表小分類 body代表消息體
Message message=new Message("mckz_topic","TagA",("wdksoft-"+i).getBytes());
//消息的唯一標識
message.setKeys("訂單編號"+i);
//發送消息
SendResult send = producer.send(message);
System.out.println(send.toString());
}
}
