rocketmq-順序消息


參考:

https://blog.csdn.net/zhaoming19870124/article/details/90900808

https://blog.csdn.net/hosaos/article/details/90675978

https://www.cnblogs.com/hzmark/p/orderly_message.html

 

 

 

 

 

RocketMQ局部順序消息實現原理

順序消息:是指消息的消費順序與消息的產生順序相同;順序消息分為全局順序消息和局部順序消息,全局順序消息是指:在某個topic下的所有消息都要保證消費順序與產生順序相同;部分順序消息是指:只要保證每一組消息被順序消費即可。在RocketMQ中,若要實現全局順序消息,首先把topic的讀寫隊列設置為一,然后把生產者producer和消費者consumer都設置成單線程即可。但這樣一來,就需要犧牲高並發和高吞吐量了。一般情況下,根據業務的需要,我們只需要實現局部順序消息即可。

在高並發情況下,RocketMQ實現局部順序消息是通過消息的生產者和消息的消費者協同完成的。發送端需要做的事情:把同一個小組內的消息發送到指定的隊列Message Queue中;消費端需要做的事情:僅用一個線程處理這個隊列中的消息。

默認情況下,消息的生產端實現負載均衡的做法是:輪流向各個消息隊列Message Queue中發送消息。消息的消費端實現負載均衡的做法是:把消息隊列的總數簡單的除以消費者的個數,每個消費者負責一些消息隊列(注意:消費者的數量不要超過消息隊列的個數,否則多余的消費者接收不到消息)。在我們人為不干涉的情況下,把一條消息投遞到哪個隊列以及被哪個消費者下的線程消費都是未知的。

為了實現局部順序消息的消費,發送端通過使用MessageQueueSelector類來控制把消息發往哪個消息隊列Message Queue中,其代碼如下:

SendResult result = null;
try {
    result = producer.send(message, new MessageQueueSelector() {
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Integer queueNumber = (Integer)arg;
            return mqs.get(queueNumber);
        }
    }, 2);
} catch (MQClientException e) {
    e.printStackTrace();
} catch (RemotingException e) {
    e.printStackTrace();
} catch (MQBrokerException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
}
System.out.println(result);

在我們初始化消費者時,需要指定監聽器的類型:

MessageListenerOrderly:在消息需要按局部順序消費時使用;

MessageListenerConcurrently:在消息不需要按局部順序消費時使用。

在MessageListenerOrderly的實現中,為每個Consumer Queue加個鎖,消費每個消息前,需要先獲得這個消息所在的Consumer Queue所對應的的鎖,這樣就可以保證在同一時間、同一個Consumer Queue的消息不被並發消費,但不同的Consumer Queue的消息可以並發處理。

為了實現局部順序消息的消費,消息的消費端需要指定監聽器類型為:MessageListenerOrderly,代碼如下:

this.consumer.setMessageListener(new MessageListenerOrderly() {
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        try {
            //處理業務邏輯
            return ConsumeOrderlyStatus.SUCCESS;
        } catch (Exception e) {
            e.printStackTrace();
            //當消費消息的過程中,若是出現了異常,則稍后再重新消費
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
});

 

 

 

 

 

 

RocketMQ-順序消息Demo及實現原理分析

場景分析

順序消費是指消息的產生順序和消費順序相同

假設有個下單場景,每個階段需要發郵件通知用戶訂單狀態變化。用戶付款完成時系統給用戶發送訂單已付款郵件,訂單已發貨時給用戶發送訂單已發貨郵件,訂單完成時給用戶發送訂單已完成郵件。

發送郵件的操作為了不阻塞訂單主流程,可以通過mq消息來解耦,下游郵件服務器收到mq消息后發送具體郵件,已付款郵件、已發貨郵件、訂單已完成郵件這三個消息,下游的郵件服務器需要順序消費這3個消息並且順序發送郵件才有意義。否則就會出現已發貨郵件先發出,已付款郵件后發出的情況。

但是mq消費者往往是集群部署,一個消費組內存在多個消費者,同一個消費者內部,也可能存在多個消費線程並行消費,如何在消費者集群環境中,如何保證郵件mq消息發送與消費的順序性呢?

順序消費又分兩種,全局順序消費和局部順序消費

全局順序消費

什么是全局順序消費?所有發到mq的消息都被順序消費,類似數據庫中的binlog,需要嚴格保證全局操作的順序性

那么RocketMQ中如何做才能保證全局順序消費呢?

這就需要設置topic下讀寫隊列數量為1

為什么要設置讀寫隊列數量為1呢?
假設讀寫隊列有多個,消息就會存儲在多個隊列中,消費者負載時可能會分配到多個消費隊列同時進行消費,多隊列並發消費時,無法保證消息消費順序性

那么全局順序消費有必要么?
A、B都下了單,B用戶訂單的郵件先發送,A的后發送,不行么?其實,大多數場景下,mq下只需要保證局部消息順序即可,即A的付款消息先於A的發貨消息即可,A的消息和B的消息可以打亂,這樣系統的吞吐量會更好,將隊列數量置為1,極大的降低了系統的吞吐量,不符合mq的設計初衷

舉個例子來說明局部順序消費。假設訂單A的消息為A1,A2,A3,發送順序也如此。訂單B的消息為B1,B2,B3,A訂單消息先發送,B訂單消息后發送

消費順序如下
A1,A2,A3,B1,B2,B3是全局順序消息,嚴重降低了系統的並發度
A1,B1,A2,A3,B2,B3是局部順序消息,可以被接受
A2,B1,A1,B2,A3,B3不可接收,因為A2出現在了A1的前面

局部順序消費

那么在RocketMQ里局部順序消息又是如何怎么實現的呢?

要保證消息的順序消費,有三個關鍵點

  1. 消息順序發送
  2. 消息順序存儲
  3. 消息順序消費

第一點,消息順序發送,多線程發送的消息無法保證有序性,因此,需要業務方在發送時,針對同一個業務編號(如同一筆訂單)的消息需要保證在一個線程內順序發送,在上一個消息發送成功后,在進行下一個消息的發送。對應到mq中,消息發送方法就得使用同步發送,異步發送無法保證順序性

第二點,消息順序存儲,mq的topic下會存在多個queue,要保證消息的順序存儲,同一個業務編號的消息需要被發送到一個queue中。對應到mq中,需要使用MessageQueueSelector來選擇要發送的queue,即對業務編號進行hash,然后根據隊列數量對hash值取余,將消息發送到一個queue中

第三點,消息順序消費,要保證消息順序消費,同一個queue就只能被一個消費者所消費,因此對broker中消費隊列加鎖是無法避免的。同一時刻,一個消費隊列只能被一個消費者消費,消費者內部,也只能有一個消費線程來消費該隊列。即,同一時刻,一個消費隊列只能被一個消費者中的一個線程消費

上面第一、第二點中提到,要保證消息順序發送和消息順序存儲需要使用mq的同步發送和MessageQueueSelector來保證,具體Demo會有體現

至於第三點中的加鎖操作會結合源碼來具體分析

Demo

producer中模擬了兩個線程,並發順序發送100個消息的情況,發送的消息中,key為消息發送編號i,消息body為orderId,大家注意下MessageQueueSelector的使用

consumer的demo有兩個,第一個為正常集群消費的consumer,另外一個是順序消費的consumer,從結果中觀察消息消費順序

理想情況下消息順序消費的結果應該是,同一個orderId下的消息的編號i值應該順序遞增,但是不同orderId之間的消費可以並行,即局部有序即可

Producer Demo

public class Producer {
    public static void main(String[] args)  {
        try {
            MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            ((DefaultMQProducer) producer).setNamesrvAddr("111.231.110.149:9876");
            producer.start();
			
			//順序發送100條編號為0到99的,orderId為1 的消息
            new Thread(() -> {
                Integer orderId = 1;
                sendMessage(producer, orderId);
            }).start();
			//順序發送100條編號為0到99的,orderId為2 的消息
            new Thread(() -> {
                Integer orderId = 2;
                sendMessage(producer, orderId);
            }).start();
			//sleep 30秒讓消息都發送成功再關閉
            Thread.sleep(1000*30);

            producer.shutdown();
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void sendMessage(MQProducer producer, Integer orderId) {
        for (int i = 0; i < 100; i++) {
            try {
                Message msg =
                        new Message("TopicTestjjj", "TagA", i + "",
                                (orderId + "").getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);
                System.out.println("message send,orderId:"+orderId);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}


Normal Consumer Demo

模擬了一個消費者中多線程並行消費消息的情況,使用的消費監聽器為MessageListenerConcurrently

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        consumer.setNamesrvAddr("111.231.110.149:9876");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTestjjj", "*");
        //單個消費者中多線程並行消費
        consumer.setConsumeThreadMin(3);
        consumer.setConsumeThreadMin(6);

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
//                    System.out.println("收到消息," + new String(msg.getBody()));
                    System.out.println("queueId:"+msg.getQueueId()+",orderId:"+new String(msg.getBody())+",i:"+msg.getKeys());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}


看下結果輸出,如圖,同一個orderId下,編號為10的消息先於編號為9的消息被消費,不是正確的順序消費,即普通的並行消息消費,無法保證消息消費的順序性

在這里插入圖片描述

Order Consumer Demo

順序消費的消費者例子如下,使用的監聽器是MessageListenerOrderly

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("111.231.110.149:9876");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTestjjj", "TagA");

        //消費者並行消費
        consumer.setConsumeThreadMin(3);
        consumer.setConsumeThreadMin(6);

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
//                context.setAutoCommit(false);
                for (MessageExt msg : msgs) {
                    System.out.println("queueId:"+msg.getQueueId()+",orderId:"+new String(msg.getBody())+",i:"+msg.getKeys());
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

}


結果如下,同一個orderId下,消息順序消費,不同orderId並行消費,符合預期
在這里插入圖片描述

源碼分析

在源碼分析之前,先來思考下幾個問題

前面已經提到實現消息順序消費的關鍵點有三個,其中前兩點已經明確了解決思路

第一點,消息順序順序發送,可以由業務方在單線程使用同步發送消息的方式來保證
第二點,消息順序存儲,可以由業務方將同一個業務編號的消息發送到一個隊列中來實現

還剩下第三點,消息順序消費,實現消息順序消費的關鍵點又是什么呢?

舉個例子,假設業務方針對某個訂單發送了N個順序消息,這N個消息都發送到了mq服務端的一個隊列中,假設消費者集群中有3個消費者,每個消費者中又是開了N個線程多線程消費

第一種情形,假設3個消費者同時拉取一個隊列的消息進行消費,結果會怎么樣?N個消息可能會分配在3個消費者中進行消費,多機並行的情況下,消費能力的不同,無法保證這N個消息被順序消費,所以得保證一個消費隊列同一個時刻只能被一個消費者消費

假設又已經保證了一個隊列同一個時刻只能被一個消費者消費,那就能保證順序消費了?同一個消費者多線程進行消費,同樣會使得的N個消費被分配到N個線程中,一樣無法保證消息順序消費,所以還得保證一個隊列同一個時刻只能被一個消費者中一個線程消費

下面順序消息的源碼分析中就針對這兩點來進行分析,即

  1. 如何保證一個隊列只被一個消費者消費
  2. 如何保證一個消費者中只有一個線程能進行消費

鎖定MessageQueue

先看第一個問題,如何保證一個隊列只被一個消費者消費。

消費隊列存在於broker端,如果想保證一個隊列被一個消費者消費,那么消費者在進行消息拉取消費時就必須想mq服務器申請隊列鎖,消費者申請隊列鎖的代碼存在於RebalanceService消息隊列負載的實現代碼中

先明確一點,同一個消費組中的消費者共同承擔topic下所有消費者隊列的消費,因此每個消費者需要定時重新負載並分配其對應的消費隊列,具體為消費者分配消費隊列的代碼實現在RebalanceImpl#rebalanceByTopic中,本文不多講

客戶端實現

消費者重新負載,並且分配完消費隊列后,需要向mq服務器發起消息拉取請求,代碼實現在RebalanceImpl#updateProcessQueueTableInRebalance中,針對順序消息的消息拉取,mq做了如下判斷

在這里插入圖片描述
核心思想就是,消費客戶端先向broker端發起對messageQueue的加鎖請求,只有加鎖成功時才創建pullRequest進行消息拉取,下面看下lock加鎖請求方法
在這里插入圖片描述
代碼實現邏輯比較清晰,就是調用lockBatchMQ方法發送了一個加鎖請求,那么broker端收到加鎖請求后的處理邏輯又是怎么樣?

broker端實現

broker端收到加鎖請求的處理邏輯在RebalanceLockManager#tryLockBatch方法中,RebalanceLockManager中關鍵屬性如下

//默認鎖過期時間 60秒
    private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
        "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
 //重入鎖
    private final Lock lock = new ReentrantLock();
 //key為消費者組名稱,value是一個key為MessageQueue,value為LockEntry的map
    private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
        new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);


LockEntry對象中關鍵屬性如下

//消費者id
private String clientId;
//最后加鎖時間
private volatile long lastUpdateTimestamp = System.currentTimeMillis();


isLocked方法如下
public boolean isLocked(final String clientId) {
            boolean eq = this.clientId.equals(clientId);
            return eq && !this.isExpired();
        }

        public boolean isExpired() {
            boolean expired =
                (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;

            return expired;
        }


對messageQueue進行加鎖的關鍵邏輯如下:

如果messageQueue對應的lockEntry為空,標志隊列未加鎖,返回加鎖成功
在這里插入圖片描述
如果lockEntry對應clientId為自己並且沒過期,標志同一個客戶端重復加鎖,返回加鎖成功(可重入)
在這里插入圖片描述

如果鎖已經過期,返回加鎖成功
在這里插入圖片描述

總而言之,broker端通過對ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable的維護來達到messageQueue加鎖的目的,使得同一時刻,一個messageQueue只能被一個消費者消費

synchronized申請線程獨占鎖

假設消費者對messageQueue的加鎖已經成功,那么就進入到了第二個步驟,創建pullRequest進行消息拉取,消息拉取部分的代碼實現在PullMessageService中,消息拉取完后,需要提交到ConsumeMessageService中進行消費,順序消費的實現為ConsumeMessageOrderlyService,提交消息進行消費的方法為ConsumeMessageOrderlyService#submitConsumeRequest,具體實現如下
在這里插入圖片描述
可以看到,構建了一個ConsumeRequest對象,並提交給了ThreadPoolExecutor來並行消費,看下順序消費的ConsumeRequest的run方法實現

在這里插入圖片描述
里面先從messageQueueLock中獲取了messageQueue對應的一個鎖對象,看下messageQueueLock的實現

在這里插入圖片描述
其中維護了一個ConcurrentMap<MessageQueue, Object> mqLockTable,使得一個messageQueue對應一個鎖對象object

獲取到鎖對象后,使用synchronized嘗試申請線程級獨占鎖

  1. 如果加鎖成功,同一時刻只有一個線程進行消息消費
  2. 如果加鎖失敗,會延遲100ms重新嘗試向broker端申請鎖定messageQueue,鎖定成功后重新提交消費請求

至此,第三個關鍵點的解決思路也清晰了,基本上就兩個步驟

  1. 創建消息拉取任務時,消息客戶端向broker端申請鎖定MessageQueue,使得一個MessageQueue同一個時刻只能被一個消費客戶端消費
  2. 消息消費時,多線程針對同一個消息隊列的消費先嘗試使用synchronized申請獨占鎖,加鎖成功才能進行消費,使得一個MessageQueue同一個時刻只能被一個消費客戶端中一個線程消費

順序消息重試機制

在使用順序消息時,一定要注意其異常情況的出現,對於順序消息,當消費者消費消息失敗后,消息隊列 RocketMQ 版會自動不斷地進行消息重試(每次間隔時間為 1 秒),重試最大值是Integer.MAX_VALUE.這時,應用會出現消息消費被阻塞的情況。因此,建議您使用順序消息時,務必保證應用能夠及時監控並處理消費失敗的情況,避免阻塞現象的發生

重要的事再強調一次:在使用順序消息時,一定要注意其異常情況的出現!

 

 

 

 

 

 

 

 

聊一聊順序消息(RocketMQ順序消息的實現機制)

當我們說順序時,我們在說什么?

日常思維中,順序大部分情況會和時間關聯起來,即時間的先后表示事件的順序關系。

比如事件A發生在下午3點一刻,而事件B發生在下午4點,那么我們認為事件A發生在事件B之前,他們的順序關系為先A后B。

上面的例子之所以成立是因為他們有相同的參考系,即他們的時間是對應的同一個物理時鍾的時間。如果A發生的時間是北京時間,而B依賴的時間是東京時間,那么先A后B的順序關系還成立嗎?

如果沒有一個絕對的時間參考,那么A和B之間還有順序嗎,或者說怎么斷定A和B的順序?

顯而易見的,如果A、B兩個事件之間如果是有因果關系的,那么A一定發生在B之前(前因后果,有因才有果)。相反,在沒有一個絕對的時間的參考的情況下,若A、B之間沒有因果關系,那么A、B之間就沒有順序關系。

那么,我們在說順序時,其實說的是:

  • 有絕對時間參考的情況下,事件的發生時間的關系;

  • 和沒有時間參考下的,一種由因果關系推斷出來的happening before的關系;

在分布式環境中討論順序

當把順序放到分布式環境(多線程、多進程都可以認為是一個分布式的環境)中去討論時:

  • 同一線程上的事件順序是確定的,可以認為他們有相同的時間作為參考

  • 不同線程間的順序只能通過因果關系去推斷

 

(點表示事件,波浪線箭頭表示事件間的消息)

上圖中,進程P中的事件順序為p1->p2->p3->p4(時間推斷)。而因為p1給進程Q的q2發了消息,那么p1一定在q2之前(因果推斷)。但是無法確定p1和q1之間的順序關系。

推薦閱讀《Time, Clocks, and the Ordering of Events in a Distributed System》,會透徹的分析分布式系統中的順序問題。

消息中間件中的順序消息

什么是順序消息

有了上述的基礎之后,我們回到本篇文章的主題中,聊一聊消息中間件中的順序消息。

順序消息(FIFO 消息)是 MQ 提供的一種嚴格按照順序進行發布和消費的消息類型。順序消息由兩個部分組成:順序發布和順序消費。

順序消息包含兩種類型:

分區順序:一個Partition內所有的消息按照先進先出的順序進行發布和消費

全局順序:一個Topic內所有的消息按照先進先出的順序進行發布和消費

這是阿里雲上對順序消息的定義,把順序消息拆分成了順序發布和順序消費。那么多線程中發送消息算不算順序發布?

如上一部分介紹的,多線程中若沒有因果關系則沒有順序。那么用戶在多線程中去發消息就意味着用戶不關心那些在不同線程中被發送的消息的順序。即多線程發送的消息,不同線程間的消息不是順序發布的,同一線程的消息是順序發布的。這是需要用戶自己去保障的。

而對於順序消費,則需要保證哪些來自同一個發送線程的消息在消費時是按照相同的順序被處理的(為什么不說他們應該在一個線程中被消費呢?)。

全局順序其實是分區順序的一個特例,即使Topic只有一個分區(以下不在討論全局順序,因為全局順序將面臨性能的問題,而且絕大多數場景都不需要全局順序)。

如何保證順序

在MQ的模型中,順序需要由3個階段去保障:

  1. 消息被發送時保持順序

  2. 消息被存儲時保持和發送的順序一致

  3. 消息被消費時保持和存儲的順序一致

發送時保持順序意味着對於有順序要求的消息,用戶應該在同一個線程中采用同步的方式發送。存儲保持和發送的順序一致則要求在同一線程中被發送出來的消息A和B,存儲時在空間上A一定在B之前。而消費保持和存儲一致則要求消息A、B到達Consumer之后必須按照先A后B的順序被處理。

如下圖所示:

對於兩個訂單的消息的原始數據:a1、b1、b2、a2、a3、b3(絕對時間下發生的順序):

  • 在發送時,a訂單的消息需要保持a1、a2、a3的順序,b訂單的消息也相同,但是a、b訂單之間的消息沒有順序關系,這意味着a、b訂單的消息可以在不同的線程中被發送出去

  • 在存儲時,需要分別保證a、b訂單的消息的順序,但是a、b訂單之間的消息的順序可以不保證

    • a1、b1、b2、a2、a3、b3是可以接受的

    • a1、a2、b1、b2、a3、b3也是可以接受的

    • a1、a3、b1、b2、a2、b3是不能接受的

  • 消費時保證順序的簡單方式就是“什么都不做”,不對收到的消息的順序進行調整,即只要一個分區的消息只由一個線程處理即可;當然,如果a、b在一個分區中,在收到消息后也可以將他們拆分到不同線程中處理,不過要權衡一下收益

開源RocketMQ中順序的實現

上圖是RocketMQ順序消息原理的介紹,將不同訂單的消息路由到不同的分區中。文檔只是給出了Producer順序的處理,Consumer消費時通過一個分區只能有一個線程消費的方式來保證消息順序,具體實現如下。

Producer端

Producer端確保消息順序唯一要做的事情就是將消息路由到特定的分區,在RocketMQ中,通過MessageQueueSelector來實現分區的選擇。

  • List<MessageQueue> mqs:消息要發送的Topic下所有的分區

  • Message msg:消息對象

  • 額外的參數:用戶可以傳遞自己的參數

比如如下實現就可以保證相同的訂單的消息被路由到相同的分區:

long orderId = ((Order) object).getOrderId; return mqs.get(orderId % mqs.size());

Consumer端

RocketMQ消費端有兩種類型:MQPullConsumer和MQPushConsumer。

MQPullConsumer由用戶控制線程,主動從服務端獲取消息,每次獲取到的是一個MessageQueue中的消息。PullResult中的List msgFoundList自然和存儲順序一致,用戶需要再拿到這批消息后自己保證消費的順序。

對於PushConsumer,由用戶注冊MessageListener來消費消息,在客戶端中需要保證調用MessageListener時消息的順序性。RocketMQ中的實現如下:

  1. PullMessageService單線程的從Broker獲取消息

  2. PullMessageService將消息添加到ProcessQueue中(ProcessMessage是一個消息的緩存),之后提交一個消費任務到ConsumeMessageOrderService

  3. ConsumeMessageOrderService多線程執行,每個線程在消費消息時需要拿到MessageQueue的鎖

  4. 拿到鎖之后從ProcessQueue中獲取消息

保證消費順序的核心思想是:

  • 獲取到消息后添加到ProcessQueue中,單線程執行,所以ProcessQueue中的消息是順序的

  • 提交的消費任務時提交的是“對某個MQ進行一次消費”,這次消費請求是從ProcessQueue中獲取消息消費,所以也是順序的(無論哪個線程獲取到鎖,都是按照ProcessQueue中消息的順序進行消費)

順序和異常的關系

順序消息需要Producer和Consumer都保證順序。Producer需要保證消息被路由到正確的分區,消息需要保證每個分區的數據只有一個線程消息,那么就會有一些缺陷

  • 發送順序消息無法利用集群的Failover特性,因為不能更換MessageQueue進行重試

  • 因為發送的路由策略導致的熱點問題,可能某一些MessageQueue的數據量特別大

  • 消費的並行讀依賴於分區數量

  • 消費失敗時無法跳過

不能更換MessageQueue重試就需要MessageQueue有自己的副本,通過Raft、Paxos之類的算法保證有可用的副本,或者通過其他高可用的存儲設備來存儲MessageQueue。

熱點問題好像沒有什么好的解決辦法,只能通過拆分MessageQueue和優化路由方法來盡量均衡的將消息分配到不同的MessageQueue。

消費並行度理論上不會有太大問題,因為MessageQueue的數量可以調整。

消費失敗的無法跳過是不可避免的,因為跳過可能導致后續的數據處理都是錯誤的。不過可以提供一些策略,由用戶根據錯誤類型來決定是否跳過,並且提供重試隊列之類的功能,在跳過之后用戶可以在“其他”地方重新消費到這條消息。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM