rocketMQ消費者java消費代碼


執行main方法即可啟動(如果是spring項目,一般在構造方法調用啟動方法接口,記得把類注入到容器即可)

(啟動后 當消息有推送時會自動除發consumeMessage消費事件)

消費者名稱broker-a可隨意命名,但是要固定,不然會重新消費該主題所有消息
package com.chzfsd.controller;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;


public class TestMq {

    public static void main(String[] args) {
        rocketMQConsumer();
    }

    public static void rocketMQConsumer() {
        try {
            System.out.println("rocketMQConsumer  開始------");
            // 消費目標
            // 聲明一個消費者consumer,需要傳入一個組
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broker-a");
            // 設置集群的NameServer地址,多個地址之間以分號分隔
            consumer.setNamesrvAddr("127.0.0.1:9876");
            // 設置consumer的消費策略
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 集群模式消費,廣播消費不會重試
            consumer.setMessageModel(MessageModel.CLUSTERING);
            // 設置最大重試次數,默認是16次
            consumer.setMaxReconsumeTimes(5);
            // 設置consumer所訂閱的Topic和Tag,*代表全部的Tag
            consumer.subscribe("GD_runmode_syncRunmodeRecloseInfo", "*");
            // Listener,主要進行消息的邏輯處理,監聽topic,如果有消息就會立即去消費
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    // 獲取第一條消息,進行處理
                    try {
                        if (msgs != null && msgs.size() > 0) {
                              MessageExt messageExt = msgs.get(0);
                              String msgBody = new String(messageExt.getBody(), "utf-8");
                              System.out.println(" 接收消息整體為:" + msgBody);
                        }
                    } catch (Exception e) {
                        System.out.println("消息消費失敗,請嘗試重試!!!");
                        e.printStackTrace();
                        // 嘗試重新消費,直接第三次如果還不成功就放棄消費,進行消息消費失敗補償操作
                        if (msgs.get(0).getReconsumeTimes() == 3) {
                            System.out.println("消息記錄日志:" + msgs);
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        } else {
                            // 重試狀態碼,重試機制可配置
                            // System.out.println("消息消費失敗,嘗試重試!!!");
                            System.out.println("消息消費失敗,請嘗試重試!!!");
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        }
                    }
                    System.out.println("消息消費成功!!!");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 調用start()方法啟動consumer
            consumer.start();
            System.out.println("消費者啟動成功。。。");
            System.out.println("rocketMQConsumer 結束------");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("消息消費操作失敗--" + e.getMessage());
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM