執行main方法即可啟動(如果是spring項目,一般在構造方法調用啟動方法接口,記得把類注入到容器即可)
(啟動后 當消息有推送時會自動除發consumeMessage消費事件)
消費者名稱broker-a可隨意命名,但是要固定,不然會重新消費該主題所有消息
package com.chzfsd.controller; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; public class TestMq { public static void main(String[] args) { rocketMQConsumer(); } public static void rocketMQConsumer() { try { System.out.println("rocketMQConsumer 開始------"); // 消費目標 // 聲明一個消費者consumer,需要傳入一個組 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broker-a"); // 設置集群的NameServer地址,多個地址之間以分號分隔 consumer.setNamesrvAddr("127.0.0.1:9876"); // 設置consumer的消費策略 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 集群模式消費,廣播消費不會重試 consumer.setMessageModel(MessageModel.CLUSTERING); // 設置最大重試次數,默認是16次 consumer.setMaxReconsumeTimes(5); // 設置consumer所訂閱的Topic和Tag,*代表全部的Tag consumer.subscribe("GD_runmode_syncRunmodeRecloseInfo", "*"); // Listener,主要進行消息的邏輯處理,監聽topic,如果有消息就會立即去消費 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // 獲取第一條消息,進行處理 try { if (msgs != null && msgs.size() > 0) { MessageExt messageExt = msgs.get(0); String msgBody = new String(messageExt.getBody(), "utf-8"); System.out.println(" 接收消息整體為:" + msgBody); } } catch (Exception e) { System.out.println("消息消費失敗,請嘗試重試!!!"); e.printStackTrace(); // 嘗試重新消費,直接第三次如果還不成功就放棄消費,進行消息消費失敗補償操作 if (msgs.get(0).getReconsumeTimes() == 3) { System.out.println("消息記錄日志:" + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } else { // 重試狀態碼,重試機制可配置 // System.out.println("消息消費失敗,嘗試重試!!!"); System.out.println("消息消費失敗,請嘗試重試!!!"); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } System.out.println("消息消費成功!!!"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 調用start()方法啟動consumer consumer.start(); System.out.println("消費者啟動成功。。。"); System.out.println("rocketMQConsumer 結束------"); } catch (Exception e) { e.printStackTrace(); System.out.println("消息消費操作失敗--" + e.getMessage()); } } }