上篇博客講解了服務器集群部署RocketMQ 博客地址:RocketMQ(2)---Docker部署RocketMQ集群
這篇在上篇搭建好的基礎上,將SpringBoot整合RocketMQ實現生產消費。
GitHub地址: https://github.com/yudiandemingzi/spring-boot-study
一、搭建步驟
先說下技術大致架構
SpringBoot2.1.6 + Maven3.5.4 + rocketmq4.3.0 + JDK1.8 +Lombok(插件)
1、添加rocketmq包
<!--注意: 這里的版本,要和部署在服務器上的版本號一致--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
2、JmsConfig(配置類)
連接RocketMQ服務器配置類,這里為了方便直接寫成常量。
/** * @Description: 安裝實際開發這里的信息 都是應該寫在配置里,來讀取,這里為了方便所以寫成常量 */ public class JmsConfig { /** * Name Server 地址,因為是集群部署 所以有多個用 分號 隔開 */ public static final String NAME_SERVER = "127.12.15.6:9876;127.12.15.6:9877"; /** * 主題名稱 主題一般是服務器設置好 而不能在代碼里去新建topic( 如果沒有創建好,生產者往該主題發送消息 會報找不到topic錯誤) */ public static final String TOPIC = "topic_family"; }
3、Producer (生產者)
@Slf4j @Component public class Producer { private String producerGroup = "test_producer"; private DefaultMQProducer producer; public Producer(){ //示例生產者 producer = new DefaultMQProducer(producerGroup); //不開啟vip通道 開通口端口會減2 producer.setVipChannelEnabled(false); //綁定name server producer.setNamesrvAddr(JmsConfig.NAME_SERVER); start(); } /** * 對象在使用之前必須要調用一次,只能初始化一次 */ public void start(){ try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } public DefaultMQProducer getProducer(){ return this.producer; } /** * 一般在應用上下文,使用上下文監聽器,進行關閉 */ public void shutdown(){ this.producer.shutdown(); } }
4、Consumer (消費者)
@Slf4j @Component public class Consumer { /** * 消費者實體對象 */ private DefaultMQPushConsumer consumer; /** * 消費者組 */ public static final String CONSUMER_GROUP = "test_consumer"; /** * 通過構造函數 實例化對象 */ public Consumer() throws MQClientException { consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); //消費模式:一個新的訂閱組第一次啟動從隊列的最后位置開始消費 后續再啟動接着上次消費的進度開始消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //訂閱主題和 標簽( * 代表所有標簽)下信息 consumer.subscribe(JmsConfig.TOPIC, "*"); // //注冊消費的監聽 並在此監聽中消費信息,並返回消費的狀態信息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { // msgs中只收集同一個topic,同一個tag,並且key相同的message // 會把不同的消息分別放置到不同的隊列中 try { for (Message msg : msgs) { //消費者獲取消息 這里只輸出 不做后面邏輯處理 String body = new String(msg.getBody(), "utf-8"); log.info("Consumer-獲取消息-主題topic為={}, 消費消息為={}", msg.getTopic(), body); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("消費者 啟動成功======="); } }
大致就是這邊簡單,下面就是測試。
二、測試
先寫個測試接口進行測試。
1、Controller
@Slf4j @RestController public class Controller { @Autowired private Producer producer; private List<String> mesList; /** * 初始化消息 */ public Controller() { mesList = new ArrayList<>(); mesList.add("小小"); mesList.add("爸爸"); mesList.add("媽媽"); mesList.add("爺爺"); mesList.add("奶奶"); } @RequestMapping("/text/rocketmq") public Object callback() throws Exception { //總共發送五次消息 for (String s : mesList) { //創建生產信息 Message message = new Message(JmsConfig.TOPIC, "testtag", ("小小一家人的稱謂:" + s).getBytes()); //發送 SendResult sendResult = producer.getProducer().send(message); log.info("輸出生產者信息={}",sendResult); } return "成功"; } }
2、測試結果

很明顯生產發送消息已經成功,二消費者也成功接收了消息!
另外我們再來看下RocketMQ控制台是否也有消費記錄

很明顯在控制台這邊也會有消費記錄!
總結這邊只是簡單的整合,后面會通過RocketMQ實現分布式事務,可以用於線上實際環境中,到時候會深入講解下源碼。
