rocketmq消息及流程


1、為什么用mq
優勢
主要有3個:
應用解耦(降低微服務之間的關聯)、
異步提速(微服務拿到mq消息后同時工作)、
削峰填谷(可以消息堆積)

劣勢
系統可用性降低(MQ一旦宕機整個系統不可用)
復雜度提高(需要解決系統消息一致性、重復消費...)
一致性問題(不同系統拿到mq中的消息后,部分系統處理失敗怎么辦)

2、rocketmq集群工作流程
image

由上圖可以看出,rocketMQ集群=消息服務器集群+命名服務器集群,其中消息服務器集群=生產者集群+broker集群+消費者集群。

命名服務器集群(nameserver cluster)
● 命名服務器集群是管理生產者、broker、消費者的紐帶,哪個生產者/broker/消費者可用都是通過命名服務器得知其信息,所以生產者/broker/消費者都需要定時發送心跳給命名服務器
● 命名服務器與生產者的關系:命名服務器記錄有許多broker的ip地址,每個生產者發送消息到broker前都需要先去命名服務器獲取某個broker的ip,然后再發送消息到broker
● 命名服務器和消息者的關系:命名服務器記錄有許多broker的ip地址,消費者想監聽broker中的消息,需要先去命名服務器獲取某個broker的ip,然后再監聽broker中的消息

生產者集群(producer cluster)
● 每個生產者部署在不同的IP上形成了集群
● 生產者的消息=topic+tag,topic用來區分消息類型,一種topic類型的消息可以分布在多個不同的broker中,同類型的消息就用tag區分,如我們系統里的佣金寶的topic是"topic-yjb",然后佣金寶下面可以划分多個tag

消費者集群(consumer cluster)
● 每個消費者部署在不同的IP上形成了集群
● 消費者獲取某個broker中的消息理論上有兩種方法:
○ pull拉取模式:消費者開啟線程定時訪問broker,如有消息存在則拉取,缺點是太消耗消費者的資源了,不管有沒有消息都會去訪問broker
○ push推送模式:消費者起一個監聽器監聽broker(與broker建立一個長鏈接),若broker中有消息,則broker會自動推送消息給消費者,一般用這種。其中push模式的底層也是通過消費者主動拉取的方式來實現的,只不過它的名字叫push而已,意思是Broker盡可能實時的推送消息給消費者,和pull模式相比,push模式都幫我們封裝了底層,而pull模式就要自己寫代碼去手動拉取消息,所以pull模式更像拉取,而封裝好的push更像是推送。

3、消息類型
同步/異步/單向消息
同步:發送消息是按順序發送
異步:發送消息是異步的,生產者發送完消息就干其他事情,消費者稍后會在生產者的回調函數中返回消費結果【new SendCallback(){} 】,及時性差
單項消息:生產者只管發出去,並不接收返回值

批量消息
特點:
● 同一批消息的topic應該相同;
● 消息內容大小=(topic+body+其他key/value屬性+日志固定20字節)<4M
● 不是延時消息

tag過濾消息
消費者指定特定的tag,則只接收該tag的消息

sql過濾消息
消費者支持類似於sql查詢語法那樣的消息過濾

//生產者
String msg="hello,小明同學";
Message message=new Message("topic",msg.getBytes("UTF-8"));
message.putUserProperty("name","xiaoming");
message.putUserProperty("age","27");
SendResult sendResult1 =defaultMQProducer.send(message);

//消費者用sql語法過濾出age>26歲的消息,即只接受age>26歲的消息
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
defaultMQPushConsumer.subscribe("topic", MessageSelector.bySql("age>26"));

4、消息的特殊處理
順序消息
場景:在某些業務系統中,一些業務流程處理的順序必須是按順序的,比如客戶下單:創建訂單 -> 付款 -> 推送消息 -> 訂單完成,在並發環境下,不可能只有一個客戶下單,當多個客戶下單時,他們的這四種消息有可能是混亂的。
解決方案:rocketmq默認每個topic在broker中都會有四個隊列存放該類數據,隊列是FIFO性質的,我們可以利用隊列去按序存放這些消息以達到按序消費的目的。
image

生產者主要代碼:

DefaultMQProducer defaultMQProducer=new DefaultMQProducer("group1");
defaultMQProducer.setNamesrvAddr("127.0.0.1:9876");
try {
    defaultMQProducer.start();
    List<Order> list=new ArrayList<>();
    //模擬業務流程亂序提交:單個訂單消息有序,多個訂單間消息無序
    Order order01=new Order(0,"創建訂單");
    Order order11=new Order(1,"創建訂單");
    Order order02=new Order(0,"付款");

    Order order03=new Order(0,"推送");
    Order order21=new Order(2,"創建訂單");
    Order order12=new Order(1,"付款");

    Order order04=new Order(0,"完成");
    Order order13=new Order(1,"推送");
    Order order22=new Order(2,"付款");

    Order order14=new Order(1,"完成");
    Order order23=new Order(2,"推送");
    Order order24=new Order(2,"完成");

list.addAll(new ArrayList<Order>(Arrays.asList(order01,order11,order02,order03,order21,order12,order13,order22,order23,order04,order14,order24)));
for(Order order:list){
    Message message=new Message("topic-order",order.toString().getBytes());
   /*
    *每個topic默認創建4個隊列,defaultMQProducer可以通過MessageQueueSelector的select方法設置
    *當前Message發送到"topic-order"的哪個隊列:通過訂單的唯一屬性值,如orderId,對topic中的queue隊列數取模,
    *這樣同一個訂單的不同消息就會被按序放進同一個queue中
    */
   SendResult sendResult=defaultMQProducer.send(message, new MessageQueueSelector() {

            @Override
            public MessageQueue select(List<MessageQueue> queueList, Message msg, Object o) {
                System.out.println("隊列數:"+queueList.size());
                //獲取隊列下標
                int size=order.getOrderId()%queueList.size();
                //計算該message放在"topic-order"哪個隊列中
                MessageQueue mq=queueList.get(size);
                return mq;
            }
        },null);
        System.out.println(sendResult);
    }

} catch (Exception e) {
    e.printStackTrace();
}

消費者主要代碼:

//使用MessageListenerConcurrently則多個線程服務一個隊列,而MessageListenerOrderly是一個線程服務一個隊列(topic默認四個隊列就是四個線程)
defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        System.out.println("線程:"+Thread.currentThread().getName()+",隊列:"+consumeOrderlyContext.getMessageQueue().getQueueId()+",該隊列消息數量:"+list.size());
        for(MessageExt messageExt:list){
            System.out.println(new String(messageExt.getBody()));

        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

執行結果:
image

可以看出一個線程服務一個隊列,將同類業務的消息都推送到同一個隊列中,是可以實現消息的順序發送的.

事務消息
1. 為什么要用事務消息?
還是以用戶下單為例,用戶在producer中創建訂單(但未提交事務到mysql),然后把下單消息發送給broker(即MQ服務器),MQ服務器再把該消息發給所有訂閱了該類topic的消費者,可能出現如下情況:

(1)producer成功進行了數據庫操作(即提交事務到mysql),且MQ服務器接收消息成功,然后被消費者消費 -->皆大歡喜

(2)producer成功進行了數據庫操作(即提交事務到mysql),但發到MQ服務器失敗,進而消費者不能消費該類消息 -->不正常

(3)producer進行數據庫操作的時候發生了意外導致數據庫操作失敗(即提交事務到mysql),但發到MQ服務器成功,進而消費者會去消費該類消息 -->不正常

上面第2、3種情況都是不正常的,解決辦法就是引入事務消息,事務消息的過程如下:

image

第一階段producer先發送一條"half"型消息到MQ服務器,MQ服務器收到后隨即返回一個發送成功標識 ->

producer進行數據庫操作執行事務,執行成功則發送二次確認(Commit或Rollback)消息給服務器 ->

MQ服務器收到Commit則將第一階段的"half"型消息標記為可投遞,消費者若訂閱了該topic則能收到該消息;MQ服務器收到Rollback則刪除第一階段的消息,消費者將接收不到該消息,就當什么事也沒發生過 ->

MQ服務器會有一個事務補償機制:若服務器很久都沒有收到producer返回的二次確認commit/rollback,則會主動去調用producer的接口進行回查,然后producer再去數據庫中查看事務是否執行成功 ,如成功/失敗,則發送commit/rollback給MQ服務器,然后后面的操作同上一步

2. 事務消息和正常消息的區別
(1)事務消息有三種狀態:commit狀態、回滾狀態、中間狀態(producer發送了half型消息但未發送commit給到服務器,即未對);commit狀態的消息等價於正常消息(可以被消費者感知),但后兩種狀態的消息對於消費者是不可見的
(2)事務消息僅與生產者有關,僅當事務消息處於commit狀態時與正常消息一樣可以被消費者感知

3. 代碼實現事務消息
生產者主要代碼:

//事務消息使用的生產者是TransactionMQProducer
TransactionMQProducer transactionMQProducer=new TransactionMQProducer("group1");
transactionMQProducer.setNamesrvAddr("127.0.0.1:9876");
try {
    //添加事務監聽
    transactionMQProducer.setTransactionListener(new TransactionListener(){

        //事務消息過程中包括正常事務(數據庫操作)、事務補償,正常事務在該方法執行
        @Override
        @Transactional
        public LocalTransactionState executeLocalTransaction(Message message, Object o) {
            try{
                
                //模擬數據庫操作事務正常提交:insert delete...
                Long orderId= Long.valueOf(new String(message.getBody()));
                insert(orderId);
                System.out.println("本地數據庫事務提交成功!");
                //本地執行完數據庫操作后(正常事務),事務消息有三種狀態:COMMIT_MESSAGE/ROLLBACK_MESSAGE/UNKNOW
                return LocalTransactionState.COMMIT_MESSAGE;
                
            }catch(Exception ex){
                
                //模擬數據庫操作事務提交失敗:insert delete...

                System.out.println("本地數據庫事務提交失敗,事務消息rollback:"+ex);
                //本地執行完數據庫操作后(正常事務),事務消息有三種狀態:COMMIT_MESSAGE/ROLLBACK_MESSAGE/UNKNOW
                return LocalTransactionState.ROLLBACK_MESSAGE;
                
            }

            //業務成功后,也可以直接返回UNKNOW,避免極端的情況下數據庫並沒有保存成功
            //return LocalTransactionState.UNKNOW;
            
        }

        //事務補償在該方法執行:若executeLocalTransaction返回UNKNOW或長時間沒有返回消息給服務器
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
            
            System.out.println("執行事務補償...");
            Long orderId= Long.valueOf(new String(messageExt.getBody()));
            if(null!=selectOne(orderId)){
                return LocalTransactionState.COMMIT_MESSAGE;
            }else{
                //若事務不成功可以返回UNKNOW而不是ROLLBACK_MESSAGE,因為服務器默認回查15次后都是UNKNOW,則會自動回滾haLf型消息
                return LocalTransactionState.UNKNOW;
            }
            
        }
    });
    
    transactionMQProducer.start();
    Order order=new Order(0,"創建訂單");
    Message message=new Message("topic-transaction",String.valueOf(order.getOrderId()).getBytes());
    SendResult sendResult=transactionMQProducer.sendMessageInTransaction(message,null);
    System.out.println("sendResult:"+sendResult);

由事務消息的整個流程可知,先執行executeLocalTransaction,再打印發送結果:
image

如果executeLocalTransaction返回UNKNOW,則服務器不斷嘗試事務補償:
image

消費者主要代碼:

DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
defaultMQPushConsumer.subscribe("topic-transaction", "*");
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        list.forEach(msg->{
            System.out.println("收到消息:"+new String(msg.getBody()));
        });
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
defaultMQPushConsumer.start();

執行結果如下:
image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM