RocketMQ生產者和消費者


RocketMQ生產者和消費者

  注:生產者在生產數據時,指定數據的key,然后消費者進行數據消費時,獲取到key,與redis中保存的key做判斷

  如果不相同代表之前沒有人進行消費,處理消費,保存到redis當中

  當有第二個消費者時,如果拿到的消息與redis中相同代表之前已已經有人消費

  就進行數據簽收,防止后續消費者同樣拿到重復消費數據

 

  注:消費者的消費邏輯失敗時,可以通過設置返回狀態達到消息重試的結果。

  消息重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續消費新的消息。

  每次重試后,消息ID都不一致,所以不能使用消息ID判斷冪等。

 

生產者

 

 
private static Map<String,Object> map=new HashMap<>();
    public static void main(String[] args) throws Exception {
        //創建消費者
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("mckz-group");
        //設置NameServer地址
        consumer.setNamesrvAddr("192.168.3.100:9876;192.168.3.200:9876");
        //設置實例名稱
        consumer.setInstanceName("mckz");
        //訂閱Topic
        consumer.subscribe("mckz_topic","TagA");
        //監聽消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                //獲取消息
                for(MessageExt ext:msgs){
                    //判斷redis中有沒有當前消息key
                    if(map.containsKey(ext.getKeys())){
                        System.out.println("已經消費.......");
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    //RocketMQ由於是集群環境,所以產生的消息ID可能會重復
                    System.out.println(ext.getMsgId()+"----------"+new String(ext.getBody()));
                    //將當前Key保存到redis當中
                    map.put(ext.getKeys(),ext);
                }
          /*如果出現異常可進行try進行捕獲*/
/*try {
                    int a=5/0;
                }catch (Exception e){
                    //接受消息狀態 
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }*/

                /*try {
                    Thread.sleep(60000);
                }catch (Exception e){
                    e.printStackTrace();
                }*/

                //接受消息狀態 
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //啟動消費者
        consumer.start();
    }
 

 

 

消費者

 
public static void main(String[] args) throws Exception {
        //創建一個生產者
        DefaultMQProducer producer=new DefaultMQProducer("mckz_group");
        //設置NameServer地址
        producer.setNamesrvAddr("192.168.3.100:9876;192.168.3.200:9876");
        //設置生產者實例名稱
        producer.setInstanceName("mckz");
        //啟動生產者
        producer.start();
        //發送消息
        for (int i = 1; i <=10 ; i++) {
            Thread.sleep(1000); //模擬網絡延遲
            //創建消息  topic代表主題名稱     tags代表小分類     body代表消息體
            Message message=new Message("mckz_topic","TagA",("wdksoft-"+i).getBytes());
            //消息的唯一標識
            message.setKeys("訂單編號"+i);
            //發送消息
            SendResult send = producer.send(message);
            System.out.println(send.toString());
        }
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM