SpringBoot整合RocketMQ
上篇博客講解了服務器集群部署RocketMQ 博客地址:RocketMQ(2)---Docker部署RocketMQ集群
這篇在上篇搭建好的基礎上,將SpringBoot整合RocketMQ實現生產消費。
GitHub地址
: https://github.com/yudiandemingzi/spring-boot-study
一、搭建步驟
先說下技術大致架構
SpringBoot2.1.6 + Maven3.5.4 + rocketmq4.3.0 + JDK1.8 +Lombok(插件)
1、添加rocketmq包
<!--注意: 這里的版本,要和部署在服務器上的版本號一致-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
2、JmsConfig(配置類)
連接RocketMQ服務器配置類,這里為了方便直接寫成常量。
/**
* @Description: 安裝實際開發這里的信息 都是應該寫在配置里,來讀取,這里為了方便所以寫成常量
*/
public class JmsConfig {
/**
* Name Server 地址,因為是集群部署 所以有多個用 分號 隔開
*/
public static final String NAME_SERVER = "127.12.15.6:9876;127.12.15.6:9877";
/**
* 主題名稱 主題一般是服務器設置好 而不能在代碼里去新建topic( 如果沒有創建好,生產者往該主題發送消息 會報找不到topic錯誤)
*/
public static final String TOPIC = "topic_family";
}
3、Producer (生產者)
@Slf4j
@Component
public class Producer {
private String producerGroup = "test_producer";
private DefaultMQProducer producer;
public Producer(){
//示例生產者
producer = new DefaultMQProducer(producerGroup);
//不開啟vip通道 開通口端口會減2
producer.setVipChannelEnabled(false);
//綁定name server
producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
start();
}
/**
* 對象在使用之前必須要調用一次,只能初始化一次
*/
public void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public DefaultMQProducer getProducer(){
return this.producer;
}
/**
* 一般在應用上下文,使用上下文監聽器,進行關閉
*/
public void shutdown(){
this.producer.shutdown();
}
}
4、Consumer (消費者)
@Slf4j
@Component
public class Consumer {
/**
* 消費者實體對象
*/
private DefaultMQPushConsumer consumer;
/**
* 消費者組
*/
public static final String CONSUMER_GROUP = "test_consumer";
/**
* 通過構造函數 實例化對象
*/
public Consumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
//消費模式:一個新的訂閱組第一次啟動從隊列的最后位置開始消費 后續再啟動接着上次消費的進度開始消費
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//訂閱主題和 標簽( * 代表所有標簽)下信息
consumer.subscribe(JmsConfig.TOPIC, "*");
// //注冊消費的監聽 並在此監聽中消費信息,並返回消費的狀態信息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// msgs中只收集同一個topic,同一個tag,並且key相同的message
// 會把不同的消息分別放置到不同的隊列中
try {
for (Message msg : msgs) {
//消費者獲取消息 這里只輸出 不做后面邏輯處理
String body = new String(msg.getBody(), "utf-8");
log.info("Consumer-獲取消息-主題topic為={}, 消費消息為={}", msg.getTopic(), body);
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("消費者 啟動成功=======");
}
}
大致就是這邊簡單,下面就是測試。
二、測試
先寫個測試接口進行測試。
1、Controller
@Slf4j
@RestController
public class Controller {
@Autowired
private Producer producer;
private List<String> mesList;
/**
* 初始化消息
*/
public Controller() {
mesList = new ArrayList<>();
mesList.add("小小");
mesList.add("爸爸");
mesList.add("媽媽");
mesList.add("爺爺");
mesList.add("奶奶");
}
@RequestMapping("/text/rocketmq")
public Object callback() throws Exception {
//總共發送五次消息
for (String s : mesList) {
//創建生產信息
Message message = new Message(JmsConfig.TOPIC, "testtag", ("小小一家人的稱謂:" + s).getBytes());
//發送
SendResult sendResult = producer.getProducer().send(message);
log.info("輸出生產者信息={}",sendResult);
}
return "成功";
}
}
2、測試結果
很明顯生產發送消息已經成功,二消費者也成功接收了消息!
另外我們再來看下RocketMQ控制台是否也有消費記錄
很明顯在控制台這邊也會有消費記錄!
總結
這邊只是簡單的整合,后面會通過RocketMQ實現分布式事務,可以用於線上實際環境中,到時候會深入講解下源碼。
只要自己變優秀了,其他的事情才會跟着好起來(中將10)