1、為什么用mq
優勢
主要有3個:
應用解耦(降低微服務之間的關聯)、
異步提速(微服務拿到mq消息后同時工作)、
削峰填谷(可以消息堆積)
劣勢
系統可用性降低(MQ一旦宕機整個系統不可用)
復雜度提高(需要解決系統消息一致性、重復消費...)
一致性問題(不同系統拿到mq中的消息后,部分系統處理失敗怎么辦)
2、rocketmq集群工作流程
由上圖可以看出,rocketMQ集群=消息服務器集群+命名服務器集群,其中消息服務器集群=生產者集群+broker集群+消費者集群。
命名服務器集群(nameserver cluster)
● 命名服務器集群是管理生產者、broker、消費者的紐帶,哪個生產者/broker/消費者可用都是通過命名服務器得知其信息,所以生產者/broker/消費者都需要定時發送心跳給命名服務器
● 命名服務器與生產者的關系:命名服務器記錄有許多broker的ip地址,每個生產者發送消息到broker前都需要先去命名服務器獲取某個broker的ip,然后再發送消息到broker
● 命名服務器和消息者的關系:命名服務器記錄有許多broker的ip地址,消費者想監聽broker中的消息,需要先去命名服務器獲取某個broker的ip,然后再監聽broker中的消息
生產者集群(producer cluster)
● 每個生產者部署在不同的IP上形成了集群
● 生產者的消息=topic+tag,topic用來區分消息類型,一種topic類型的消息可以分布在多個不同的broker中,同類型的消息就用tag區分,如我們系統里的佣金寶的topic是"topic-yjb",然后佣金寶下面可以划分多個tag
消費者集群(consumer cluster)
● 每個消費者部署在不同的IP上形成了集群
● 消費者獲取某個broker中的消息理論上有兩種方法:
○ pull拉取模式:消費者開啟線程定時訪問broker,如有消息存在則拉取,缺點是太消耗消費者的資源了,不管有沒有消息都會去訪問broker
○ push推送模式:消費者起一個監聽器監聽broker(與broker建立一個長鏈接),若broker中有消息,則broker會自動推送消息給消費者,一般用這種。其中push模式的底層也是通過消費者主動拉取的方式來實現的,只不過它的名字叫push而已,意思是Broker盡可能實時的推送消息給消費者,和pull模式相比,push模式都幫我們封裝了底層,而pull模式就要自己寫代碼去手動拉取消息,所以pull模式更像拉取,而封裝好的push更像是推送。
3、消息類型
同步/異步/單向消息
同步:發送消息是按順序發送
異步:發送消息是異步的,生產者發送完消息就干其他事情,消費者稍后會在生產者的回調函數中返回消費結果【new SendCallback(){} 】,及時性差
單項消息:生產者只管發出去,並不接收返回值
批量消息
特點:
● 同一批消息的topic應該相同;
● 消息內容大小=(topic+body+其他key/value屬性+日志固定20字節)<4M
● 不是延時消息
tag過濾消息
消費者指定特定的tag,則只接收該tag的消息
sql過濾消息
消費者支持類似於sql查詢語法那樣的消息過濾
//生產者
String msg="hello,小明同學";
Message message=new Message("topic",msg.getBytes("UTF-8"));
message.putUserProperty("name","xiaoming");
message.putUserProperty("age","27");
SendResult sendResult1 =defaultMQProducer.send(message);
//消費者用sql語法過濾出age>26歲的消息,即只接受age>26歲的消息
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
defaultMQPushConsumer.subscribe("topic", MessageSelector.bySql("age>26"));
4、消息的特殊處理
順序消息
場景:在某些業務系統中,一些業務流程處理的順序必須是按順序的,比如客戶下單:創建訂單 -> 付款 -> 推送消息 -> 訂單完成,在並發環境下,不可能只有一個客戶下單,當多個客戶下單時,他們的這四種消息有可能是混亂的。
解決方案:rocketmq默認每個topic在broker中都會有四個隊列存放該類數據,隊列是FIFO性質的,我們可以利用隊列去按序存放這些消息以達到按序消費的目的。
生產者主要代碼:
DefaultMQProducer defaultMQProducer=new DefaultMQProducer("group1");
defaultMQProducer.setNamesrvAddr("127.0.0.1:9876");
try {
defaultMQProducer.start();
List<Order> list=new ArrayList<>();
//模擬業務流程亂序提交:單個訂單消息有序,多個訂單間消息無序
Order order01=new Order(0,"創建訂單");
Order order11=new Order(1,"創建訂單");
Order order02=new Order(0,"付款");
Order order03=new Order(0,"推送");
Order order21=new Order(2,"創建訂單");
Order order12=new Order(1,"付款");
Order order04=new Order(0,"完成");
Order order13=new Order(1,"推送");
Order order22=new Order(2,"付款");
Order order14=new Order(1,"完成");
Order order23=new Order(2,"推送");
Order order24=new Order(2,"完成");
list.addAll(new ArrayList<Order>(Arrays.asList(order01,order11,order02,order03,order21,order12,order13,order22,order23,order04,order14,order24)));
for(Order order:list){
Message message=new Message("topic-order",order.toString().getBytes());
/*
*每個topic默認創建4個隊列,defaultMQProducer可以通過MessageQueueSelector的select方法設置
*當前Message發送到"topic-order"的哪個隊列:通過訂單的唯一屬性值,如orderId,對topic中的queue隊列數取模,
*這樣同一個訂單的不同消息就會被按序放進同一個queue中
*/
SendResult sendResult=defaultMQProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> queueList, Message msg, Object o) {
System.out.println("隊列數:"+queueList.size());
//獲取隊列下標
int size=order.getOrderId()%queueList.size();
//計算該message放在"topic-order"哪個隊列中
MessageQueue mq=queueList.get(size);
return mq;
}
},null);
System.out.println(sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
消費者主要代碼:
//使用MessageListenerConcurrently則多個線程服務一個隊列,而MessageListenerOrderly是一個線程服務一個隊列(topic默認四個隊列就是四個線程)
defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
System.out.println("線程:"+Thread.currentThread().getName()+",隊列:"+consumeOrderlyContext.getMessageQueue().getQueueId()+",該隊列消息數量:"+list.size());
for(MessageExt messageExt:list){
System.out.println(new String(messageExt.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
執行結果:
可以看出一個線程服務一個隊列,將同類業務的消息都推送到同一個隊列中,是可以實現消息的順序發送的.
事務消息
1. 為什么要用事務消息?
還是以用戶下單為例,用戶在producer中創建訂單(但未提交事務到mysql),然后把下單消息發送給broker(即MQ服務器),MQ服務器再把該消息發給所有訂閱了該類topic的消費者,可能出現如下情況:
(1)producer成功進行了數據庫操作(即提交事務到mysql),且MQ服務器接收消息成功,然后被消費者消費 -->皆大歡喜
(2)producer成功進行了數據庫操作(即提交事務到mysql),但發到MQ服務器失敗,進而消費者不能消費該類消息 -->不正常
(3)producer進行數據庫操作的時候發生了意外導致數據庫操作失敗(即提交事務到mysql),但發到MQ服務器成功,進而消費者會去消費該類消息 -->不正常
上面第2、3種情況都是不正常的,解決辦法就是引入事務消息,事務消息的過程如下:
第一階段producer先發送一條"half"型消息到MQ服務器,MQ服務器收到后隨即返回一個發送成功標識 ->
producer進行數據庫操作執行事務,執行成功則發送二次確認(Commit或Rollback)消息給服務器 ->
MQ服務器收到Commit則將第一階段的"half"型消息標記為可投遞,消費者若訂閱了該topic則能收到該消息;MQ服務器收到Rollback則刪除第一階段的消息,消費者將接收不到該消息,就當什么事也沒發生過 ->
MQ服務器會有一個事務補償機制:若服務器很久都沒有收到producer返回的二次確認commit/rollback,則會主動去調用producer的接口進行回查,然后producer再去數據庫中查看事務是否執行成功 ,如成功/失敗,則發送commit/rollback給MQ服務器,然后后面的操作同上一步
2. 事務消息和正常消息的區別
(1)事務消息有三種狀態:commit狀態、回滾狀態、中間狀態(producer發送了half型消息但未發送commit給到服務器,即未對);commit狀態的消息等價於正常消息(可以被消費者感知),但后兩種狀態的消息對於消費者是不可見的
(2)事務消息僅與生產者有關,僅當事務消息處於commit狀態時與正常消息一樣可以被消費者感知
3. 代碼實現事務消息
生產者主要代碼:
//事務消息使用的生產者是TransactionMQProducer
TransactionMQProducer transactionMQProducer=new TransactionMQProducer("group1");
transactionMQProducer.setNamesrvAddr("127.0.0.1:9876");
try {
//添加事務監聽
transactionMQProducer.setTransactionListener(new TransactionListener(){
//事務消息過程中包括正常事務(數據庫操作)、事務補償,正常事務在該方法執行
@Override
@Transactional
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
try{
//模擬數據庫操作事務正常提交:insert delete...
Long orderId= Long.valueOf(new String(message.getBody()));
insert(orderId);
System.out.println("本地數據庫事務提交成功!");
//本地執行完數據庫操作后(正常事務),事務消息有三種狀態:COMMIT_MESSAGE/ROLLBACK_MESSAGE/UNKNOW
return LocalTransactionState.COMMIT_MESSAGE;
}catch(Exception ex){
//模擬數據庫操作事務提交失敗:insert delete...
System.out.println("本地數據庫事務提交失敗,事務消息rollback:"+ex);
//本地執行完數據庫操作后(正常事務),事務消息有三種狀態:COMMIT_MESSAGE/ROLLBACK_MESSAGE/UNKNOW
return LocalTransactionState.ROLLBACK_MESSAGE;
}
//業務成功后,也可以直接返回UNKNOW,避免極端的情況下數據庫並沒有保存成功
//return LocalTransactionState.UNKNOW;
}
//事務補償在該方法執行:若executeLocalTransaction返回UNKNOW或長時間沒有返回消息給服務器
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("執行事務補償...");
Long orderId= Long.valueOf(new String(messageExt.getBody()));
if(null!=selectOne(orderId)){
return LocalTransactionState.COMMIT_MESSAGE;
}else{
//若事務不成功可以返回UNKNOW而不是ROLLBACK_MESSAGE,因為服務器默認回查15次后都是UNKNOW,則會自動回滾haLf型消息
return LocalTransactionState.UNKNOW;
}
}
});
transactionMQProducer.start();
Order order=new Order(0,"創建訂單");
Message message=new Message("topic-transaction",String.valueOf(order.getOrderId()).getBytes());
SendResult sendResult=transactionMQProducer.sendMessageInTransaction(message,null);
System.out.println("sendResult:"+sendResult);
由事務消息的整個流程可知,先執行executeLocalTransaction,再打印發送結果:
如果executeLocalTransaction返回UNKNOW,則服務器不斷嘗試事務補償:
消費者主要代碼:
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
defaultMQPushConsumer.subscribe("topic-transaction", "*");
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(msg->{
System.out.println("收到消息:"+new String(msg.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
defaultMQPushConsumer.start();
執行結果如下: